toil.common¶
Attributes¶
Exceptions¶
Common base class for all non-exit exceptions. |
|
Common base class for all non-exit exceptions. |
Classes¶
Produced by the Toil-integrated autoscaler to describe the number of |
|
Produced by the Toil-integrated autoscaler describe the number of |
|
Produced when a job is completed, whether successful or not. |
|
Produced when a job is completely failed, and will not be retried again. |
|
Produced when a job is issued to run on the batch system. |
|
Produced when a job goes missing and should be in the batch system but isn't. |
|
Holds messages that should cause jobs to change their scheduling states. |
|
Produced to describe the size of the queue of jobs issued but not yet |
|
A small wrapper around Python's builtin string class. |
|
Provide a logger that logs over UDP to the leader. |
|
Class to represent configuration operations for a toil workflow run. |
|
A context manager that represents a Toil workflow. |
|
Functions¶
|
Add base Toil command line options to the parser. |
|
Add CWL options to the parser. This only adds nonpositional CWL arguments. |
|
Add WDL options to a parser. This only adds nonpositional WDL arguments |
|
|
|
Look up environment variables that control Toil and log the result. |
|
Call set_option for all the options for the given named batch system, or |
|
|
|
Try to use the given path. Return it if it exists or can be made, |
|
Context manager to create a temporary file. Entering returns path to |
|
Retry a function if it fails with any Exception defined in "errors". |
|
|
|
Find and instantiate the appropriate provisioner instance to make clusters in the given cloud. |
|
Add logging options to set the global log level. |
|
|
Ensure that TOIL_HOME_DIR exists. |
|
If the default config file does not exist, create it in the Toil home directory. Create the Toil home directory |
|
|
If the config file at the filepath does not exist, try creating it. |
|
Write a Toil config file to the given path. |
|
|
|
Add all Toil command line options to a parser. |
Return unique ID of the current node (host). The resulting string will be convertible to a uuid.UUID. |
|
|
|
|
This method will return the cumulative number of bytes occupied by the files |
|
Return the free space, and total size of the file system hosting dirPath. |
|
Module Contents¶
- toil.common.add_base_toil_options(parser, jobstore_as_flag=False, cwl=False)[source]¶
Add base Toil command line options to the parser. :param parser: Argument parser to add options to :param jobstore_as_flag: make the job store option a –jobStore flag instead of a required jobStore positional argument. :param cwl: whether CWL should be included or not
- Parameters:
parser (argparse.ArgumentParser)
jobstore_as_flag (bool)
cwl (bool)
- Return type:
None
- toil.common.JOBSTORE_HELP = Multiline-String¶
Show Value
"""The location of the job store for the workflow. A job store holds persistent information about the jobs, stats, and files in a workflow. If the workflow is run with a distributed batch system, the job store must be accessible by all worker nodes. Depending on the desired job store implementation, the location should be formatted according to one of the following schemes: file:<path> where <path> points to a directory on the file system aws:<region>:<prefix> where <region> is the name of an AWS region like us-west-2 and <prefix> will be prepended to the names of any top-level AWS resources in use by job store, e.g. S3 buckets. google:<project_id>:<prefix> TODO: explain For backwards compatibility, you may also specify ./foo (equivalent to file:./foo or just file:foo) or /bar (equivalent to file:/bar)."""
- toil.common.add_cwl_options(parser, suppress=True)[source]¶
Add CWL options to the parser. This only adds nonpositional CWL arguments.
- Parameters:
parser (argparse.ArgumentParser) – Parser to add options to
suppress (bool) – Suppress help output
- Returns:
None
- Return type:
None
- toil.common.add_wdl_options(parser, suppress=True)[source]¶
Add WDL options to a parser. This only adds nonpositional WDL arguments :param parser: Parser to add options to :param suppress: Suppress help output :return: None
- Parameters:
parser (argparse.ArgumentParser)
suppress (bool)
- Return type:
None
- toil.common.logProcessContext(config)[source]¶
- Parameters:
config (common.Config)
- Return type:
None
- toil.common.lookupEnvVar(name, envName, defaultValue)[source]¶
Look up environment variables that control Toil and log the result.
- toil.common.set_batchsystem_options(batch_system, set_option)[source]¶
Call set_option for all the options for the given named batch system, or all batch systems if no name is provided.
- Parameters:
batch_system (Optional[str])
set_option (OptionSetter)
- Return type:
None
- class toil.common.ClusterDesiredSizeMessage[source]¶
Bases:
NamedTupleProduced by the Toil-integrated autoscaler to describe the number of instances of a certain type that it thinks will be needed.
- class toil.common.ClusterSizeMessage[source]¶
Bases:
NamedTupleProduced by the Toil-integrated autoscaler describe the number of instances of a certain type in a cluster.
- class toil.common.JobCompletedMessage[source]¶
Bases:
NamedTupleProduced when a job is completed, whether successful or not.
- class toil.common.JobFailedMessage[source]¶
Bases:
NamedTupleProduced when a job is completely failed, and will not be retried again.
- class toil.common.JobIssuedMessage[source]¶
Bases:
NamedTupleProduced when a job is issued to run on the batch system.
- class toil.common.JobMissingMessage[source]¶
Bases:
NamedTupleProduced when a job goes missing and should be in the batch system but isn’t.
- class toil.common.MessageBus[source]¶
Holds messages that should cause jobs to change their scheduling states. Messages are put in and buffered, and can be taken out and handled as batches when convenient.
All messages are NamedTuple objects of various subtypes.
Message order is guaranteed to be preserved within a type.
- publish(message)[source]¶
Put a message onto the bus. Can be called from any thread.
- Parameters:
message (Any)
- Return type:
None
- check()[source]¶
If we are in the owning thread, deliver any messages that are in the queue for us. Must be called every once in a while in the main thread, possibly through inbox objects.
- Return type:
None
- MessageType¶
- subscribe(message_type, handler)[source]¶
Register the given callable to be called when messages of the given type are sent. It will be called with messages sent after the subscription is created. Returns a subscription object; when the subscription object is GC’d the subscription will end.
- Parameters:
message_type (Type[MessageType])
handler (Callable[[MessageType], Any])
- Return type:
pubsub.core.listener.Listener
- connect(wanted_types)[source]¶
Get a connection object that serves as an inbox for messages of the given types. Messages of those types will accumulate in the inbox until it is destroyed. You can check for them at any time.
- Parameters:
wanted_types (List[type])
- Return type:
- connect_output_file(file_path)[source]¶
Send copies of all messages to the given output file.
Returns connection data which must be kept alive for the connection to persist. That data is opaque: the user is not supposed to look at it or touch it or do anything with it other than store it somewhere or delete it.
- Parameters:
file_path (str)
- Return type:
Any
- class toil.common.QueueSizeMessage[source]¶
Bases:
NamedTupleProduced to describe the size of the queue of jobs issued but not yet completed. Theoretically recoverable from other messages.
- class toil.common.FileID(fileStoreID, size, executable=False)[source]¶
Bases:
strA small wrapper around Python’s builtin string class.
It is used to represent a file’s ID in the file store, and has a size attribute that is the file’s size in bytes. This object is returned by importFile and writeGlobalFile.
Calls into the file store can use bare strings; size will be queried from the job store if unavailable in the ID.
- pack()[source]¶
Pack the FileID into a string so it can be passed through external code.
- Return type:
- toil.common.deprecated(new_function_name)[source]¶
- Parameters:
new_function_name (str)
- Return type:
Callable[Ellipsis, Any]
- toil.common.try_path(path, min_size=100 * 1024 * 1024)[source]¶
Try to use the given path. Return it if it exists or can be made, and we can make things within it, or None otherwise.
- toil.common.AtomicFileCreate(final_path, keep=False)[source]¶
Context manager to create a temporary file. Entering returns path to the temporary file in the same directory as finalPath. If the code in context succeeds, the file renamed to its actually name. If an error occurs, the file is not installed and is removed unless keep is specified.
- toil.common.retry(intervals=None, infinite_retries=False, errors=None, log_message=None, prepare=None)[source]¶
Retry a function if it fails with any Exception defined in “errors”.
Does so every x seconds, where x is defined by a list of numbers (ints or floats) in “intervals”. Also accepts ErrorCondition events for more detailed retry attempts.
- Parameters:
intervals (Optional[List]) – A list of times in seconds we keep retrying until returning failure. Defaults to retrying with the following exponential back-off before failing: 1s, 1s, 2s, 4s, 8s, 16s
infinite_retries (bool) – If this is True, reset the intervals when they run out. Defaults to: False.
errors (Optional[Sequence[Union[ErrorCondition, Type[Exception]]]]) –
A list of exceptions OR ErrorCondition objects to catch and retry on. ErrorCondition objects describe more detailed error event conditions than a plain error. An ErrorCondition specifies: - Exception (required) - Error codes that must match to be retried (optional; defaults to not checking) - A string that must be in the error message to be retried (optional; defaults to not checking) - A bool that can be set to False to always error on this condition.
If not specified, this will default to a generic Exception.
log_message (Optional[Tuple[Callable, str]]) – Optional tuple of (“log/print function()”, “message string”) that will precede each attempt.
prepare (Optional[List[Callable]]) – Optional list of functions to call, with the function’s arguments, between retries, to reset state.
- Returns:
The result of the wrapped function or raise.
- Return type:
Callable[[Callable[Ellipsis, RT]], Callable[Ellipsis, RT]]
- toil.common.add_provisioner_options(parser)[source]¶
- Parameters:
parser (argparse.ArgumentParser)
- Return type:
None
- toil.common.cluster_factory(provisioner, clusterName=None, clusterType='mesos', zone=None, nodeStorage=50, nodeStorageOverrides=None, sseKey=None, enable_fuse=False)[source]¶
Find and instantiate the appropriate provisioner instance to make clusters in the given cloud.
Raises ClusterTypeNotSupportedException if the given provisioner does not implement clusters of the given type.
- Parameters:
provisioner (str) – The cloud type of the cluster.
clusterName (Optional[str]) – The name of the cluster.
clusterType (str) – The type of cluster: ‘mesos’ or ‘kubernetes’.
zone (Optional[str]) – The cloud zone
nodeStorage (int)
nodeStorageOverrides (Optional[List[str]])
sseKey (Optional[str])
enable_fuse (bool)
- Returns:
A cluster object for the the cloud type.
- Return type:
Union[aws.awsProvisioner.AWSProvisioner, gceProvisioner.GCEProvisioner]
- class toil.common.RealtimeLogger(batchSystem, level=defaultLevel)[source]¶
Provide a logger that logs over UDP to the leader.
To use in a Toil job, do:
>>> from toil.realtimeLogger import RealtimeLogger >>> RealtimeLogger.info("This logging message goes straight to the leader")
That’s all a user of Toil would need to do. On the leader, Job.Runner.startToil() automatically starts the UDP server by using an instance of this class as a context manager.
- Parameters:
batchSystem (toil.batchSystems.abstractBatchSystem.AbstractBatchSystem)
level (str)
- envPrefix = 'TOIL_RT_LOGGING_'¶
- defaultLevel = 'INFO'¶
- lock¶
- loggingServer = None¶
- serverThread = None¶
- initialized = 0¶
- logger = None¶
- classmethod getLogger()[source]¶
Get the logger that logs real-time to the leader.
Note that if the returned logger is used on the leader, you will see the message twice, since it still goes to the normal log handlers, too.
- Return type:
- __exit__(exc_type, exc_val, exc_tb)[source]¶
- Parameters:
exc_type (Optional[Type[BaseException]])
exc_val (Optional[BaseException])
exc_tb (Optional[types.TracebackType])
- Return type:
None
- toil.common.add_logging_options(parser, default_level=None)[source]¶
Add logging options to set the global log level.
- Parameters:
default_level (Optional[int]) – A logging level, like logging.INFO, to use as the default.
parser (argparse.ArgumentParser)
- Return type:
None
- toil.common.set_logging_from_options(options)[source]¶
- Parameters:
options (Union[toil.common.Config, argparse.Namespace])
- Return type:
None
- toil.common.dockerRegistry = 'quay.io/ucsc_cgl'¶
- toil.common.dockerTag = '7.0.0-d569ea5711eb310ffd5703803f7250ebf7c19576-py3.9'¶
- toil.common.version = '7.0.0-d569ea5711eb310ffd5703803f7250ebf7c19576'¶
- toil.common.UUID_LENGTH = 32¶
- toil.common.logger¶
- class toil.common.Config[source]¶
Class to represent configuration operations for a toil workflow run.
- batch_logs_dir: str | None¶
The backing scheduler will be instructed, if possible, to save logs to this directory, where the leader can read them.
- workflowID: str | None¶
This attribute uniquely identifies the job store and therefore the workflow. It is necessary in order to distinguish between two consecutive workflows for which self.jobStore is the same, e.g. when a job store name is reused after a previous run has finished successfully and its job store has been clean up.
- defaultAccelerators: List[toil.job.AcceleratorRequirement]¶
- prepare_start()[source]¶
After options are set, prepare for initial start of workflow.
- Return type:
None
- prepare_restart()[source]¶
Before restart options are set, prepare for a restart of a workflow. Set up any execution-specific parameters and clear out any stale ones.
- Return type:
None
- setOptions(options)[source]¶
Creates a config object from the options object.
- Parameters:
options (argparse.Namespace)
- Return type:
None
- toil.common.check_and_create_toil_home_dir()[source]¶
Ensure that TOIL_HOME_DIR exists.
Raises an error if it does not exist and cannot be created. Safe to run simultaneously in multiple processes.
- Return type:
None
- toil.common.check_and_create_default_config_file()[source]¶
If the default config file does not exist, create it in the Toil home directory. Create the Toil home directory if needed
Raises an error if the default config file cannot be created. Safe to run simultaneously in multiple processes. If this process runs this function, it will always see the default config file existing with parseable contents, even if other processes are racing to create it.
No process will see an empty or partially-written default config file.
- Return type:
None
- toil.common.check_and_create_config_file(filepath)[source]¶
If the config file at the filepath does not exist, try creating it. The parent directory should be created prior to calling this :param filepath: path to config file :return: None
- Parameters:
filepath (str)
- Return type:
None
- toil.common.generate_config(filepath)[source]¶
Write a Toil config file to the given path.
Safe to run simultaneously in multiple processes. No process will see an empty or partially-written file at the given path.
Set include to “cwl” or “wdl” to include cwl options and wdl options respectfully
- Parameters:
filepath (str)
- Return type:
None
- toil.common.parser_with_common_options(provisioner_options=False, jobstore_option=True, prog=None, default_log_level=None)[source]¶
- toil.common.addOptions(parser, jobstore_as_flag=False, cwl=False, wdl=False)[source]¶
Add all Toil command line options to a parser.
Support for config files if using configargparse. This will also check and set up the default config file.
- Parameters:
jobstore_as_flag (bool) – make the job store option a –jobStore flag instead of a required jobStore positional argument.
cwl (bool) – Whether CWL options are expected. If so, CWL options won’t be suppressed.
wdl (bool) – Whether WDL options are expected. If so, WDL options won’t be suppressed.
parser (argparse.ArgumentParser)
- Return type:
None
- toil.common.getNodeID()[source]¶
Return unique ID of the current node (host). The resulting string will be convertible to a uuid.UUID.
Tries several methods until success. The returned ID should be identical across calls from different processes on the same node at least until the next OS reboot.
The last resort method is uuid.getnode() that in some rare OS configurations may return a random ID each time it is called. However, this method should never be reached on a Linux system, because reading from /proc/sys/kernel/random/boot_id will be tried prior to that. If uuid.getnode() is reached, it will be called twice, and exception raised if the values are not identical.
- Return type:
- class toil.common.Toil(options)[source]¶
Bases:
ContextManager[Toil]A context manager that represents a Toil workflow.
Specifically the batch system, job store, and its configuration.
- Parameters:
options (argparse.Namespace)
- __enter__()[source]¶
Derive configuration from the command line options.
Then load the job store and, on restart, consolidate the derived configuration with the one from the previous invocation of the workflow.
- Return type:
- __exit__(exc_type, exc_val, exc_tb)[source]¶
Clean up after a workflow invocation.
Depending on the configuration, delete the job store.
- Parameters:
exc_type (Optional[Type[BaseException]])
exc_val (Optional[BaseException])
exc_tb (Optional[types.TracebackType])
- Return type:
Literal[False]
- start(rootJob)[source]¶
Invoke a Toil workflow with the given job as the root for an initial run.
This method must be called in the body of a
with Toil(...) as toil:statement. This method should not be called more than once for a workflow that has not finished.- Parameters:
rootJob (toil.job.Job) – The root job of the workflow
- Returns:
The root job’s return value
- Return type:
Any
- restart()[source]¶
Restarts a workflow that has been interrupted.
- Returns:
The root job’s return value
- Return type:
Any
- classmethod getJobStore(locator)[source]¶
Create an instance of the concrete job store implementation that matches the given locator.
- Parameters:
locator (str) – The location of the job store to be represent by the instance
- Returns:
an instance of a concrete subclass of AbstractJobStore
- Return type:
- static createBatchSystem(config)[source]¶
Create an instance of the batch system specified in the given config.
- Parameters:
config (Config) – the current configuration
- Returns:
an instance of a concrete subclass of AbstractBatchSystem
- Return type:
- importFile(srcUrl: str, sharedFileName: str, symlink: bool = True) None[source]¶
- importFile(srcUrl: str, sharedFileName: None = None, symlink: bool = True) toil.fileStores.FileID
- import_file(src_uri: str, shared_file_name: str, symlink: bool = True, check_existence: bool = True) None[source]¶
- import_file(src_uri: str, shared_file_name: None = None, symlink: bool = True, check_existence: bool = True) toil.fileStores.FileID
Import the file at the given URL into the job store.
By default, returns None if the file does not exist.
- Parameters:
check_existence – If true, raise FileNotFoundError if the file does not exist. If false, return None when the file does not exist.
See
toil.jobStores.abstractJobStore.AbstractJobStore.importFile()for a full description
- exportFile(jobStoreFileID, dstUrl)[source]¶
- Parameters:
jobStoreFileID (toil.fileStores.FileID)
dstUrl (str)
- Return type:
None
- export_file(file_id, dst_uri)[source]¶
Export file to destination pointed at by the destination URL.
See
toil.jobStores.abstractJobStore.AbstractJobStore.exportFile()for a full description- Parameters:
file_id (toil.fileStores.FileID)
dst_uri (str)
- Return type:
None
- static normalize_uri(uri, check_existence=False)[source]¶
Given a URI, if it has no scheme, prepend “file:”.
- static getToilWorkDir(configWorkDir=None)[source]¶
Return a path to a writable directory under which per-workflow directories exist.
This directory is always required to exist on a machine, even if the Toil worker has not run yet. If your workers and leader have different temp directories, you may need to set TOIL_WORKDIR.
- classmethod get_toil_coordination_dir(config_work_dir, config_coordination_dir)[source]¶
Return a path to a writable directory, which will be in memory if convenient. Ought to be used for file locking and coordination.
- Parameters:
- Returns:
Path to the Toil coordination directory. Ought to be on a POSIX filesystem that allows directories containing open files to be deleted.
- Return type:
- static get_workflow_path_component(workflow_id)[source]¶
Get a safe filesystem path component for a workflow.
Will be consistent for all processes on a given machine, and different for all processes on different machines.
- classmethod getLocalWorkflowDir(workflowID, configWorkDir=None)[source]¶
Return the directory where worker directories and the cache will be located for this workflow on this machine.
- classmethod get_local_workflow_coordination_dir(workflow_id, config_work_dir, config_coordination_dir)[source]¶
Return the directory where coordination files should be located for this workflow on this machine. These include internal Toil databases and lock files for the machine.
If an in-memory filesystem is available, it is used. Otherwise, the local workflow directory, which may be on a shared network filesystem, is used.
- Parameters:
- Returns:
Path to the local workflow coordination directory on this machine.
- Return type:
- exception toil.common.ToilRestartException(message)[source]¶
Bases:
ExceptionCommon base class for all non-exit exceptions.
- Parameters:
message (str)
- exception toil.common.ToilContextManagerException[source]¶
Bases:
ExceptionCommon base class for all non-exit exceptions.
- class toil.common.ToilMetrics(bus, provisioner=None)[source]¶
- Parameters:
bus (toil.bus.MessageBus)
provisioner (Optional[toil.provisioners.abstractProvisioner.AbstractProvisioner])
- toil.common.getDirSizeRecursively(dirPath)[source]¶
This method will return the cumulative number of bytes occupied by the files on disk in the directory and its subdirectories.
If the method is unable to access a file or directory (due to insufficient permissions, or due to the file or directory having been removed while this function was attempting to traverse it), the error will be handled internally, and a (possibly 0) lower bound on the size of the directory will be returned.