toil.fileStores.cachingFileStore

Module Contents

Classes

CachingFileStore

A cache-enabled file store.

Attributes

logger

SQLITE_TIMEOUT_SECS

toil.fileStores.cachingFileStore.logger
toil.fileStores.cachingFileStore.SQLITE_TIMEOUT_SECS = 60.0
exception toil.fileStores.cachingFileStore.CacheError(message)[source]

Bases: Exception

Error Raised if the user attempts to add a non-local file to cache

exception toil.fileStores.cachingFileStore.CacheUnbalancedError[source]

Bases: CacheError

Raised if file store can’t free enough space for caching

message = 'Unable unable to free enough space for caching.  This error frequently arises due to jobs using...'
exception toil.fileStores.cachingFileStore.IllegalDeletionCacheError(deletedFile)[source]

Bases: CacheError

Error raised if the caching code discovers a file that represents a reference to a cached file to have gone missing.

This can be a big problem if a hard link is moved, because then the cache will be unable to evict the file it links to.

Remember that files read with readGlobalFile may not be deleted by the user and need to be deleted with deleteLocalFile.

exception toil.fileStores.cachingFileStore.InvalidSourceCacheError(message)[source]

Bases: CacheError

Error raised if the user attempts to add a non-local file to cache

class toil.fileStores.cachingFileStore.CachingFileStore(jobStore, jobDesc, file_store_dir, waitForPreviousCommit)[source]

Bases: toil.fileStores.abstractFileStore.AbstractFileStore

A cache-enabled file store.

Provides files that are read out as symlinks or hard links into a cache directory for the node, if permitted by the workflow.

Also attempts to write files back to the backing JobStore asynchronously, after quickly taking them into the cache. Writes are only required to finish when the job’s actual state after running is committed back to the job store.

Internaly, manages caching using a database. Each node has its own database, shared between all the workers on the node. The database contains several tables:

files contains one entry for each file in the cache. Each entry knows the path to its data on disk. It also knows its global file ID, its state, and its owning worker PID. If the owning worker dies, another worker will pick it up. It also knows its size.

File states are:

  • “cached”: happily stored in the cache. Reads can happen immediately. Owner is null. May be adopted and moved to state “deleting” by anyone, if it has no outstanding immutable references.

  • “downloading”: in the process of being saved to the cache by a non-null owner. Reads must wait for the state to become “cached”. If the worker dies, goes to state “deleting”, because we don’t know if it was fully downloaded or if anyone still needs it. No references can be created to a “downloading” file except by the worker responsible for downloading it.

  • “uploadable”: stored in the cache and ready to be written to the job store by a non-null owner. Transitions to “uploading” when a (thread of) the owning worker process picks it up and begins uploading it, to free cache space or to commit a completed job. If the worker dies, goes to state “cached”, because it may have outstanding immutable references from the dead-but-not-cleaned-up job that was going to write it.

  • “uploading”: stored in the cache and being written to the job store by a non-null owner. Transitions to “cached” when successfully uploaded. If the worker dies, goes to state “cached”, because it may have outstanding immutable references from the dead-but-not-cleaned-up job that was writing it.

  • “deleting”: in the process of being removed from the cache by a non-null owner. Will eventually be removed from the database.

refs contains one entry for each outstanding reference to a cached file (hard link, symlink, or full copy). The table name is refs instead of references because references is an SQL reserved word. It remembers what job ID has the reference, and the path the reference is at. References have three states:

  • “immutable”: represents a hardlink or symlink to a file in the cache. Dedicates the file’s size in bytes of the job’s disk requirement to the cache, to be used to cache this file or to keep around other files without references. May be upgraded to “copying” if the link can’t actually be created.

  • “copying”: records that a file in the cache is in the process of being copied to a path. Will be upgraded to a mutable reference eventually.

  • “mutable”: records that a file from the cache was copied to a certain path. Exist only to support deleteLocalFile’s API. Only files with only mutable references (or no references) are eligible for eviction.

jobs contains one entry for each job currently running. It keeps track of the job’s ID, the worker that is supposed to be running the job, the job’s disk requirement, and the job’s local temp dir path that will need to be cleaned up. When workers check for jobs whose workers have died, they null out the old worker, and grab ownership of and clean up jobs and their references until the null-worker jobs are gone.

properties contains key, value pairs for tracking total space available, and whether caching is free for this run.

Parameters:
property con: sqlite3.Connection

Get the database connection to be used for the current thread.

Return type:

sqlite3.Connection

property cur: sqlite3.Cursor

Get the main cursor to be used for the current thread.

Return type:

sqlite3.Cursor

as_process()[source]

Assume the process’s identity to act on the caching database.

Yields the process’s name in the caching database, and holds onto a lock while your thread has it.

Return type:

Generator[str, None, None]

getCacheLimit()[source]

Return the total number of bytes to which the cache is limited.

If no limit is available, raises an error.

getCacheUsed()[source]

Return the total number of bytes used in the cache.

If no value is available, raises an error.

getCacheExtraJobSpace()[source]

Return the total number of bytes of disk space requested by jobs running against this cache but not yet used.

We can get into a situation where the jobs on the node take up all its space, but then they want to write to or read from the cache. So when that happens, we need to debit space from them somehow…

If no value is available, raises an error.

getCacheAvailable()[source]

Return the total number of free bytes available for caching, or, if negative, the total number of bytes of cached files that need to be evicted to free up enough space for all the currently scheduled jobs.

If no value is available, raises an error.

getSpaceUsableForJobs()[source]

Return the total number of bytes that are not taken up by job requirements, ignoring files and file usage. We can’t ever run more jobs than we actually have room for, even with caching.

If not retrievable, raises an error.

getCacheUnusedJobRequirement()[source]

Return the total number of bytes of disk space requested by the current job and not used by files the job is using in the cache.

Mutable references don’t count, but immutable/uploading ones do.

If no value is available, raises an error.

adjustCacheLimit(newTotalBytes)[source]

Adjust the total cache size limit to the given number of bytes.

fileIsCached(fileID)[source]

Return true if the given file is currently cached, and false otherwise.

Note that this can’t really be relied upon because a file may go cached -> deleting after you look at it. If you need to do something with the file you need to do it in a transaction.

getFileReaderCount(fileID)[source]

Return the number of current outstanding reads of the given file.

Counts mutable references too.

cachingIsFree()[source]

Return true if files can be cached for free, without taking up space. Return false otherwise.

This will be true when working with certain job stores in certain configurations, most notably the FileJobStore.

open(job)[source]

This context manager decorated method allows cache-specific operations to be conducted before and after the execution of a job in worker.py

Parameters:

job (toil.job.Job)

Return type:

Generator[None, None, None]

writeGlobalFile(localFileName, cleanup=False, executable=False)[source]

Creates a file in the jobstore and returns a FileID reference.

readGlobalFile(fileStoreID, userPath=None, cache=True, mutable=False, symlink=False)[source]

Make the file associated with fileStoreID available locally.

If mutable is True, then a copy of the file will be created locally so that the original is not modified and does not change the file for other jobs. If mutable is False, then a link can be created to the file, saving disk resources. The file that is downloaded will be executable if and only if it was originally uploaded from an executable file on the local filesystem.

If a user path is specified, it is used as the destination. If a user path isn’t specified, the file is stored in the local temp directory with an encoded name.

The destination file must not be deleted by the user; it can only be deleted through deleteLocalFile.

Implementations must call logAccess() to report the download.

Parameters:
  • fileStoreID – job store id for the file

  • userPath – a path to the name of file to which the global file will be copied or hard-linked (see below).

  • cache – Described in toil.fileStores.CachingFileStore.readGlobalFile()

  • mutable – Described in toil.fileStores.CachingFileStore.readGlobalFile()

  • symlink – True if caller can accept symlink, False if caller can only accept a normal file or hardlink

Returns:

An absolute path to a local, temporary copy of the file keyed by fileStoreID.

readGlobalFileStream(fileStoreID, encoding=None, errors=None)[source]

Read a stream from the job store; similar to readGlobalFile.

The yielded file handle does not need to and should not be closed explicitly.

Parameters:
  • encoding – the name of the encoding used to decode the file. Encodings are the same as for decode(). Defaults to None which represents binary mode.

  • errors – an optional string that specifies how encoding errors are to be handled. Errors are the same as for open(). Defaults to ‘strict’ when an encoding is specified.

Implementations must call logAccess() to report the download.

Returns:

a context manager yielding a file handle which can be read from.

deleteLocalFile(fileStoreID)[source]

Delete local copies of files associated with the provided job store ID.

Raises an OSError with an errno of errno.ENOENT if no such local copies exist. Thus, cannot be called multiple times in succession.

The files deleted are all those previously read from this file ID via readGlobalFile by the current job into the job’s file-store-provided temp directory, plus the file that was written to create the given file ID, if it was written by the current job from the job’s file-store-provided temp directory.

Parameters:

fileStoreID – File Store ID of the file to be deleted.

deleteGlobalFile(fileStoreID)[source]

Delete local files and then permanently deletes them from the job store.

To ensure that the job can be restarted if necessary, the delete will not happen until after the job’s run method has completed.

Parameters:

fileStoreID – the File Store ID of the file to be deleted.

exportFile(jobStoreFileID, dstUrl)[source]
Parameters:
Return type:

None

export_file(file_id, dst_uri)[source]
Parameters:
Return type:

None

waitForCommit()[source]

Blocks while startCommit is running.

This function is called by this job’s successor to ensure that it does not begin modifying the job store until after this job has finished doing so.

Might be called when startCommit is never called on a particular instance, in which case it does not block.

Returns:

Always returns True

Return type:

bool

startCommit(jobState=False)[source]

Update the status of the job on the disk.

May bump the version number of the job.

May start an asynchronous process. Call waitForCommit() to wait on that process. You must waitForCommit() before committing any further updates to the job. During the asynchronous process, it is safe to modify the job; modifications after this call will not be committed until the next call.

Parameters:

jobState – If True, commit the state of the FileStore’s job, and file deletes. Otherwise, commit only file creates/updates.

startCommitThread(state_to_commit)[source]

Run in a thread to actually commit the current job.

Parameters:

state_to_commit (Optional[toil.job.JobDescription])

classmethod shutdown(shutdown_info)[source]
Parameters:

shutdown_info (Tuple[str, str]) – Tuple of the coordination directory (where the cache database is) and the cache directory (where the cached data is).

Return type:

None

Job local temp directories will be removed due to their appearance in the database.

__del__()[source]

Cleanup function that is run when destroying the class instance that ensures that all the file writing threads exit.