toil.jobStores.aws.jobStore

This file contains the AWS jobstore, which has its own docstring defining its use.

This docstring is about the organization of the file.

All direct AWS boto calls should live in toil.lib.aws, except for creating the session instance and the resource/client (which should only be made ONCE in the jobstore).

Reasons for this
  • DRY.

  • All retries are on their individual boto functions, instead of here.

  • Simple clear functions => simple clear unit tests (ideally).

Variables defining part size, parallelization, and other constants should live in toil.lib.aws.config.

Attributes

DEFAULT_AWS_PART_SIZE

logger

Classes

AWSJobStore

The AWS jobstore can be thought of as an AWS s3 bucket, with functions to

Functions

parse_jobstore_identifier(jobstore_identifier)

Module Contents

toil.jobStores.aws.jobStore.DEFAULT_AWS_PART_SIZE = 52428800
toil.jobStores.aws.jobStore.logger
class toil.jobStores.aws.jobStore.AWSJobStore(locator, partSize=DEFAULT_AWS_PART_SIZE)

Bases: toil.jobStores.abstractJobStore.AbstractJobStore, toil.lib.url.URLAccess

The AWS jobstore can be thought of as an AWS s3 bucket, with functions to centralize, store, and track files for the workflow.

The AWS jobstore stores 4 things:

  1. Jobs: These are pickled as files, and contain the information necessary to run a job when unpickled. A job’s file is deleted when finished, and its absence means it completed.

  2. Files: The inputs and outputs of jobs. Each file is written in s3 with the file pattern: “files/{uuid4}/{original_filename}”, where the file prefix “files/{uuid4}” should only point to one file.

  3. Logs: The written log files of jobs that have run, plus the log file for the main Toil process.

  4. Shared Files: Files with himan=-readable names, used by Toil itself or Python workflows. These include:

    • environment.pickle (environment variables)

    • config.pickle (user options)

    • pid.log (process ID of the workflow; when it finishes, the workflow either succeeded/failed)

    • userScript (hot deployment; this is the job module)

    • rootJobReturnValue (workflow succeeded or not)

NOTES
  • The AWS jobstore does not use a database (directly, at least) currently. We can get away with this because:

    1. AWS s3 has strong consistency.

    2. s3’s filter/query speed is pretty good.

    However, there may be reasons in the future to provide users with a database:

    • s3 throttling has limits (3,500/5,000 requests (TODO: per second?); something like dynamodb supports 100,000+ requests).

    • Access and filtering would be sped up, though how much faster this would be needs testing.

    ALSO NOTE: The caching filestore uses a local (per node) database with a very similar structure that maybe could be synced up with this.

  • TODO: Etags are s3’s native checksum, so use that for file integrity checking since it’s free when fetching object headers from s3. Using an md5sum in addition to this would work well with the current filestore. WARNING: Etag values differ for the same file when the part size changes, so part size should always be Set In Stone, unless we hit s3’s 10,000 part limit, and we need to account for that.

  • This class fills in self.config only when initialized/restarted; it is None upon class instantiation. These are the options/config set by the user. When jobs are loaded/unpickled, they must re-incorporate this.

  • The config.sseKey field is the single source of truth for bucket encryption status. The key is never stored inside this class; it is always read from the file referenced by the config when needed. Modifying the config at runtime will modify whether encryption is used. Note that files written without encryption (i.e. config.pickle) can’t be read when encryption is enabled!

  • TODO: In general, job stores should log the version of Toil they were initialized with and warn the user if restarting with a different version.

Parameters:
  • locator (str)

  • partSize (int)

s3_resource
s3_client
part_size = 52428800
bucket = None
job_key_prefix = 'jobs/'
job_associations_key_prefix = 'job-associations/'
content_key_prefix = 'files/'
shared_key_prefix = ''
logs_key_prefix = 'logs/'
initialize(config)

Called when starting a new jobstore with a non-existent bucket.

Create bucket, raise if it already exists. Set options from config.

Parameters:

config (toil.common.Config)

Return type:

None

resume()

Called when reusing an old jobstore with an existing bucket.

Raises:

NoSuchJobStoreException – if the bucket doesn’t exist.

Return type:

None

destroy()

The inverse of initialize(), this method deletes the physical storage represented by this instance. While not being atomic, this method is at least idempotent, as a means to counteract potential issues with eventual consistency exhibited by the underlying storage mechanisms. This means that if the method fails (raises an exception), it may (and should be) invoked again. If the underlying storage mechanism is eventually consistent, even a successful invocation is not an ironclad guarantee that the physical storage vanished completely and immediately. A successful invocation only guarantees that the deletion will eventually happen. It is therefore recommended to not immediately reuse the same job store location for a new Toil workflow.

Return type:

None

is_in_bucket(identifier, prefix, bucket=None)

Check if the key for the given identifier and prefix is in the bucket.

Parameters:
  • identifier (str)

  • prefix (str)

  • bucket (Optional[str])

Return type:

bool

write_to_bucket(identifier, prefix, data, bucket=None, encrypted=None)

Write something directly to a bucket.

Use for small files. Does not parallelize or use multipart.

Parameters:
  • encrypted (Optional[bool]) – Can be set to False to disable encryption.

  • identifier (str)

  • prefix (str)

  • data (Optional[Union[bytes, str, Dict[str, Any]]])

  • bucket (Optional[str])

Return type:

None

read_from_bucket(identifier, prefix, bucket=None)

Read something directly from a bucket.

Use for small files. Does not parallelize or use multipart.

Raises:
  • NoSuchJobException – if the prefix is the job prefix and the identifier is not found.

  • NoSuchFileException – if the prefix is the content prefix and the identifier is not found.

  • self.s3_client.exceptions.NoSuchKey – in other cases where the identifier is not found.

Parameters:
  • identifier (str)

  • prefix (str)

  • bucket (Optional[str])

Return type:

bytes

assign_job_id(jobDescription)

Get a new jobStoreID to be used by the described job, and assigns it to the JobDescription.

Files associated with the assigned ID will be accepted even if the JobDescription has never been created or updated.

Parameters:
Return type:

None

create_job(jobDescription)

Pickle a jobDescription object and write it to the jobstore as a file.

Responsible for calling toil.job.JobDescription.pre_update_hook() on the job description.

Parameters:

jobDescription (toil.job.JobDescription)

Return type:

toil.job.JobDescription

job_exists(job_id, check=False)

Checks if the job_id is found in s3.

Parameters:
  • check (bool) – If True, raise an exception instead of returning false when a job does not exist.

  • job_id (str)

Return type:

bool

jobs()

Best effort attempt to return iterator on JobDescriptions for all jobs in the store. The iterator may not return all jobs and may also contain orphaned jobs that have already finished successfully and should not be rerun. To guarantee you get any and all jobs that can be run instead construct a more expensive ToilState object

Returns:

Returns iterator on jobs in the store. The iterator may or may not contain all jobs and may contain invalid jobs

Return type:

Iterator[toil.job.jobDescription]

load_job(job_id)

Use a job_id to get a job from the jobstore’s s3 bucket, unpickle, and return it.

Parameters:

job_id (str)

Return type:

toil.job.JobDescription

update_job(jobDescription)

Persists changes to the state of the given JobDescription in this store atomically.

Must call jobDescription.pre_update_hook()

Parameters:
Return type:

None

delete_job(job_id)

Removes the JobDescription from the store atomically. You may not then subsequently call load(), write(), update(), etc. with the same jobStoreID or any JobDescription bearing it.

This operation is idempotent, i.e. deleting a job twice or deleting a non-existent job will succeed silently.

Parameters:

job_id (str) – the ID of the job to delete from this job store

Return type:

None

associate_job_with_file(job_id, file_id)
Parameters:
Return type:

None

write_file(local_path, job_id=None, cleanup=False)

Write a local file into the jobstore and return a file_id referencing it.

Parameters:
  • job_id (Optional[str]) – If job_id AND cleanup are supplied, associate this file with that job. When the job is deleted, the file will be deleted as well.

  • cleanup (bool) – If job_id AND cleanup are supplied, associate this file with that job. When the job is deleted, the file will be deleted as well. TODO: we don’t need cleanup; remove it and only use job_id

  • local_path (str)

Return type:

toil.fileStores.FileID

find_s3_key_from_file_id(file_id)

This finds an s3 key for which file_id is the prefix, and which already exists.

Parameters:

file_id (str)

Return type:

str

write_file_stream(job_id=None, cleanup=False, basename=None, encoding=None, errors=None)

Similar to writeFile, but returns a context manager yielding a tuple of 1) a file handle which can be written to and 2) the ID of the resulting file in the job store. The yielded file handle does not need to and should not be closed explicitly. The file is written in a atomic manner. It will not appear in the jobStore until the write has successfully completed.

Parameters:
  • job_id (str) – the id of a job, or None. If specified, the may be associated with that job in a job-store-specific way. This may influence the returned ID.

  • cleanup (bool) – Whether to attempt to delete the file when the job whose jobStoreID was given as jobStoreID is deleted with jobStore.delete(job). If jobStoreID was not given, does nothing.

  • basename (str) – If supported by the implementation, use the given file basename so that when searching the job store with a query matching that basename, the file will be detected.

  • encoding (str) – the name of the encoding used to encode the file. Encodings are the same as for encode(). Defaults to None which represents binary mode.

  • errors (str) – an optional string that specifies how encoding errors are to be handled. Errors are the same as for open(). Defaults to ‘strict’ when an encoding is specified.

Raises:
Return type:

Iterator[tuple[IO[bytes], str]]

FIXME: some implementations may not raise this

Returns:

a context manager yielding a file handle which can be written to and an ID that references the newly created file and can be used to read the file in the future.

Return type:

Iterator[Tuple[IO[bytes], str]]

Parameters:
  • job_id (Optional[str])

  • cleanup (bool)

  • basename (Optional[str])

  • encoding (Optional[str])

  • errors (Optional[str])

update_file_stream(file_id, encoding=None, errors=None)

Replaces the existing version of a file in the job store. Similar to writeFile, but returns a context manager yielding a file handle which can be written to. The yielded file handle does not need to and should not be closed explicitly.

Parameters:
  • file_id (str) – the ID of the file in the job store to be updated

  • encoding (str) – the name of the encoding used to encode the file. Encodings are the same as for encode(). Defaults to None which represents binary mode.

  • errors (str) – an optional string that specifies how encoding errors are to be handled. Errors are the same as for open(). Defaults to ‘strict’ when an encoding is specified.

Raises:
Return type:

Iterator[IO[Any]]

write_shared_file_stream(shared_file_name, encrypted=None, encoding=None, errors=None)

Returns a context manager yielding a writable file handle to the global file referenced by the given name. File will be created in an atomic manner.

Parameters:
  • shared_file_name (str) – A file name matching AbstractJobStore.fileNameRegex, unique within this job store

  • encrypted (bool) – True if the file must be encrypted, None if it may be encrypted or False if it must be stored in the clear.

  • encoding (str) – the name of the encoding used to encode the file. Encodings are the same as for encode(). Defaults to None which represents binary mode.

  • errors (str) – an optional string that specifies how encoding errors are to be handled. Errors are the same as for open(). Defaults to ‘strict’ when an encoding is specified.

Raises:

ConcurrentFileModificationException – if the file was modified concurrently during an invocation of this method

Returns:

a context manager yielding a writable file handle

Return type:

Iterator[IO[bytes]]

update_file(file_id, local_path)

Replaces the existing version of a file in the job store.

Throws an exception if the file does not exist.

Parameters:
  • file_id (str) – the ID of the file in the job store to be updated

  • local_path (str) – the local path to a file that will overwrite the current version in the job store

Raises:
Return type:

None

file_exists(file_id)

Determine whether a file exists in this job store.

Parameters:

file_id (str) – an ID referencing the file to be checked

Return type:

bool

get_file_size(file_id)

Do we need both get_file_size and _get_size???

Parameters:

file_id (str)

Return type:

int

read_file(file_id, local_path, symlink=False)

Copies or hard links the file referenced by jobStoreFileID to the given local file path. The version will be consistent with the last copy of the file written/updated. If the file in the job store is later modified via updateFile or updateFileStream, it is implementation-defined whether those writes will be visible at localFilePath. The file is copied in an atomic manner. It will not appear in the local file system until the copy has completed.

The file at the given local path may not be modified after this method returns!

Note! Implementations of readFile need to respect/provide the executable attribute on FileIDs.

Parameters:
  • file_id (str) – ID of the file to be copied

  • local_path (str) – the local path indicating where to place the contents of the given file in the job store

  • symlink (bool) – whether the reader can tolerate a symlink. If set to true, the job store may create a symlink instead of a full copy of the file or a hard link.

Return type:

None

read_file_stream(file_id, encoding=None, errors=None)

Similar to readFile, but returns a context manager yielding a file handle which can be read from. The yielded file handle does not need to and should not be closed explicitly.

Parameters:
  • file_id (str) – ID of the file to get a readable file handle for

  • encoding (str) – the name of the encoding used to decode the file. Encodings are the same as for decode(). Defaults to None which represents binary mode.

  • errors (str) – an optional string that specifies how encoding errors are to be handled. Errors are the same as for open(). Defaults to ‘strict’ when an encoding is specified.

Returns:

a context manager yielding a file handle which can be read from

Return type:

Iterator[Union[IO[bytes], IO[str]]]

read_shared_file_stream(shared_file_name: str, encoding: str, errors: str | None = None) Iterator[IO[str]]
read_shared_file_stream(shared_file_name: str, encoding: Literal[None] = None, errors: str | None = None) Iterator[IO[bytes]]

Returns a context manager yielding a readable file handle to the global file referenced by the given name.

Parameters:
  • shared_file_name (str) – A file name matching AbstractJobStore.fileNameRegex, unique within this job store

  • encoding (str) – the name of the encoding used to decode the file. Encodings are the same as for decode(). Defaults to None which represents binary mode.

  • errors (str) – an optional string that specifies how encoding errors are to be handled. Errors are the same as for open(). Defaults to ‘strict’ when an encoding is specified.

Returns:

a context manager yielding a readable file handle

delete_file(file_id)

Deletes the file with the given ID from this job store. This operation is idempotent, i.e. deleting a file twice or deleting a non-existent file will succeed silently.

Parameters:

file_id (str) – ID of the file to delete

Return type:

None

get_public_url(file_id)

Turn s3:// into http:// and put a public-read ACL on it.

Parameters:

file_id (str)

Return type:

str

get_shared_public_url(file_id)

Turn s3:// into http:// and put a public-read ACL on it.

Parameters:

file_id (str)

Return type:

str

get_empty_file_store_id(job_id=None, cleanup=False, basename=None)

Create an empty file in s3 and return a bare string file ID.

Parameters:
  • job_id (Optional[str])

  • cleanup (bool)

  • basename (Optional[str])

Return type:

str

write_logs(log_msg)

Stores a message as a log in the jobstore.

Parameters:
  • msg (str) – the string to be written

  • log_msg (Union[bytes, str])

Raises:

ConcurrentFileModificationException – if the file was modified concurrently during an invocation of this method

Return type:

None

read_logs(callback, read_all=False)

This fetches all referenced logs in the database from s3 as readable objects and runs “callback()” on them.

Parameters:
  • callback (Callable[Ellipsis, Any])

  • read_all (bool)

Return type:

int

toil.jobStores.aws.jobStore.parse_jobstore_identifier(jobstore_identifier)
Parameters:

jobstore_identifier (str)

Return type:

Tuple[str, str]