job.fileStore API¶
The AbstractFileStore is an abstraction of a Toil run’s shared storage.
- class toil.fileStores.abstractFileStore.AbstractFileStore(jobStore, jobDesc, file_store_dir, waitForPreviousCommit)[source]
Interface used to allow user code run by Toil to read and write files.
Also provides the interface to other Toil facilities used by user code, including:
normal (non-real-time) logging
finding the correct temporary directory for scratch work
importing and exporting files into and out of the workflow
Stores user files in the jobStore, but keeps them separate from actual jobs.
May implement caching.
Passed as argument to the
toil.job.Job.run()
method.Access to files is only permitted inside the context manager provided by
toil.fileStores.abstractFileStore.AbstractFileStore.open()
.Also responsible for committing completed jobs back to the job store with an update operation, and allowing that commit operation to be waited for.
- Parameters:
jobStore (
AbstractJobStore
)jobDesc (
JobDescription
)file_store_dir (
str
)
- __init__(jobStore, jobDesc, file_store_dir, waitForPreviousCommit)[source]
Create a new file store object.
- Parameters:
jobStore (
AbstractJobStore
) – the job store in use for the current Toil run.jobDesc (
JobDescription
) – the JobDescription object for the currently running job.file_store_dir (
str
) – the per-worker local temporary directory where the file store should store local files. Per-job directories will be created under here by the file store.waitForPreviousCommit (
Callable
[[],Any
]) – the waitForCommit method of the previous job’s file store, when jobs are running in sequence on the same worker. Used to prevent this file store’s startCommit and the previous job’s startCommit methods from running at the same time and racing. If they did race, it might be possible for the later job to be fully marked as completed in the job store before the eralier job was.
- Return type:
None
- static createFileStore(jobStore, jobDesc, file_store_dir, waitForPreviousCommit, caching)[source]
Create a concreate FileStore.
- Parameters:
jobStore (
AbstractJobStore
)jobDesc (
JobDescription
)file_store_dir (
str
)
- Return type:
- static shutdownFileStore(workflowID, config_work_dir, config_coordination_dir)[source]
Carry out any necessary filestore-specific cleanup.
This is a destructive operation and it is important to ensure that there are no other running processes on the system that are modifying or using the file store for this workflow.
This is the intended to be the last call to the file store in a Toil run, called by the batch system cleanup function upon batch system shutdown.
- open(job)[source]
Create the context manager around tasks prior and after a job has been run.
File operations are only permitted inside the context manager.
Implementations must only yield from within with super().open(job):.
- get_disk_usage()[source]
Get the number of bytes of disk used by the last job run under open().
Disk usage is measured at the end of the job. TODO: Sample periodically and record peak usage.
- getLocalTempDir()[source]
Get a new local temporary directory in which to write files.
The directory will only persist for the duration of the job.
- Return type:
- Returns:
The absolute path to a new local temporary directory. This directory will exist for the duration of the job only, and is guaranteed to be deleted once the job terminates, removing all files it contains recursively.
- getLocalTempFile(suffix=None, prefix=None)[source]
Get a new local temporary file that will persist for the duration of the job.
- Parameters:
- Return type:
- Returns:
The absolute path to a local temporary file. This file will exist for the duration of the job only, and is guaranteed to be deleted once the job terminates.
- getLocalTempFileName(suffix=None, prefix=None)[source]
Get a valid name for a new local file. Don’t actually create a file at the path.
- Parameters:
- Return type:
- Returns:
Path to valid file
- abstract writeGlobalFile(localFileName, cleanup=False)[source]
Upload a file (as a path) to the job store.
If the file is in a FileStore-managed temporary directory (i.e. from
toil.fileStores.abstractFileStore.AbstractFileStore.getLocalTempDir()
), it will become a local copy of the file, eligible for deletion bytoil.fileStores.abstractFileStore.AbstractFileStore.deleteLocalFile()
.If an executable file on the local filesystem is uploaded, its executability will be preserved when it is downloaded again.
- Parameters:
localFileName (
str
) – The path to the local file to upload. The last path component (basename of the file) will remain associated with the file in the file store, if supported by the backing JobStore, so that the file can be searched for by name or name glob.cleanup (
bool
) – if True then the copy of the global file will be deleted once the job and all its successors have completed running. If not the global file must be deleted manually.
- Return type:
- Returns:
an ID that can be used to retrieve the file.
- writeGlobalFileStream(cleanup=False, basename=None, encoding=None, errors=None)[source]
Similar to writeGlobalFile, but allows the writing of a stream to the job store. The yielded file handle does not need to and should not be closed explicitly.
- Parameters:
encoding (
Optional
[str
]) – The name of the encoding used to decode the file. Encodings are the same as for decode(). Defaults to None which represents binary mode.errors (
Optional
[str
]) – Specifies how encoding errors are to be handled. Errors are the same as for open(). Defaults to ‘strict’ when an encoding is specified.cleanup (
bool
) – is as intoil.fileStores.abstractFileStore.AbstractFileStore.writeGlobalFile()
.basename (
Optional
[str
]) – If supported by the backing JobStore, use the given file basename so that when searching the job store with a query matching that basename, the file will be detected.
- Return type:
- Returns:
A context manager yielding a tuple of 1) a file handle which can be written to and 2) the toil.fileStores.FileID of the resulting file in the job store.
- logAccess(fileStoreID, destination=None)[source]
Record that the given file was read by the job.
(to be announced if the job fails)
If destination is not None, it gives the path that the file was downloaded to. Otherwise, assumes that the file was streamed.
Must be called by
readGlobalFile()
andreadGlobalFileStream()
implementations.
- abstract readGlobalFile(fileStoreID, userPath=None, cache=True, mutable=False, symlink=False)[source]
Make the file associated with fileStoreID available locally.
If mutable is True, then a copy of the file will be created locally so that the original is not modified and does not change the file for other jobs. If mutable is False, then a link can be created to the file, saving disk resources. The file that is downloaded will be executable if and only if it was originally uploaded from an executable file on the local filesystem.
If a user path is specified, it is used as the destination. If a user path isn’t specified, the file is stored in the local temp directory with an encoded name.
The destination file must not be deleted by the user; it can only be deleted through deleteLocalFile.
Implementations must call
logAccess()
to report the download.- Parameters:
fileStoreID (
str
) – job store id for the fileuserPath (
Optional
[str
]) – a path to the name of file to which the global file will be copied or hard-linked (see below).cache (
bool
) – Described intoil.fileStores.CachingFileStore.readGlobalFile()
mutable (
bool
) – Described intoil.fileStores.CachingFileStore.readGlobalFile()
symlink (
bool
) – True if caller can accept symlink, False if caller can only accept a normal file or hardlink
- Return type:
- Returns:
An absolute path to a local, temporary copy of the file keyed by fileStoreID.
- abstract readGlobalFileStream(fileStoreID, encoding=None, errors=None)[source]
Read a stream from the job store; similar to readGlobalFile.
The yielded file handle does not need to and should not be closed explicitly.
- Parameters:
encoding (
Optional
[str
]) – the name of the encoding used to decode the file. Encodings are the same as for decode(). Defaults to None which represents binary mode.errors (
Optional
[str
]) – an optional string that specifies how encoding errors are to be handled. Errors are the same as for open(). Defaults to ‘strict’ when an encoding is specified.fileStoreID (str)
- Return type:
ContextManager[IO[bytes] | IO[str]]
Implementations must call
logAccess()
to report the download.
- getGlobalFileSize(fileStoreID)[source]
Get the size of the file pointed to by the given ID, in bytes.
If a FileID or something else with a non-None ‘size’ field, gets that.
Otherwise, asks the job store to poll the file’s size.
Note that the job store may overestimate the file’s size, for example if it is encrypted and had to be augmented with an IV or other encryption framing.
- abstract deleteLocalFile(fileStoreID)[source]
Delete local copies of files associated with the provided job store ID.
Raises an OSError with an errno of errno.ENOENT if no such local copies exist. Thus, cannot be called multiple times in succession.
The files deleted are all those previously read from this file ID via readGlobalFile by the current job into the job’s file-store-provided temp directory, plus the file that was written to create the given file ID, if it was written by the current job from the job’s file-store-provided temp directory.
- abstract deleteGlobalFile(fileStoreID)[source]
Delete local files and then permanently deletes them from the job store.
To ensure that the job can be restarted if necessary, the delete will not happen until after the job’s run method has completed.
- log_to_leader(text, level=20)[source]
Send a logging message to the leader. The message will also be logged by the worker at the same level.
- log_user_stream(name, stream)[source]
Send a stream of UTF-8 text to the leader as a named log stream.
Useful for things like the error logs of Docker containers. The leader will show it to the user or organize it appropriately for user-level log information.
- abstract startCommit(jobState=False)[source]
Update the status of the job on the disk.
May bump the version number of the job.
May start an asynchronous process. Call waitForCommit() to wait on that process. You must waitForCommit() before committing any further updates to the job. During the asynchronous process, it is safe to modify the job; modifications after this call will not be committed until the next call.
- abstract waitForCommit()[source]
Blocks while startCommit is running.
This function is called by this job’s successor to ensure that it does not begin modifying the job store until after this job has finished doing so.
Might be called when startCommit is never called on a particular instance, in which case it does not block.
- Return type:
- Returns:
Always returns True
- abstract classmethod shutdown(shutdown_info)[source]
Shutdown the filestore on this node.
This is intended to be called on batch system shutdown.
- class toil.fileStores.FileID(fileStoreID, size, executable=False)[source]
A small wrapper around Python’s builtin string class.
It is used to represent a file’s ID in the file store, and has a size attribute that is the file’s size in bytes. This object is returned by importFile and writeGlobalFile.
Calls into the file store can use bare strings; size will be queried from the job store if unavailable in the ID.
- __init__(fileStoreID, size, executable=False)[source]
- pack()[source]
Pack the FileID into a string so it can be passed through external code.
- Return type: