toil.common

Module Contents

Classes

Config

Class to represent configuration operations for a toil workflow run.

Toil

A context manager that represents a Toil workflow.

ToilMetrics

Functions

check_and_create_toil_home_dir()

Ensure that TOIL_HOME_DIR exists.

check_and_create_default_config_file()

If the default config file does not exist, create it in the Toil home directory. Create the Toil home directory

check_and_create_config_file(filepath)

If the config file at the filepath does not exist, try creating it.

generate_config(filepath)

Write a Toil config file to the given path.

parser_with_common_options([provisioner_options, ...])

addOptions(parser[, jobstore_as_flag, cwl, wdl])

Add all Toil command line options to a parser.

getNodeID()

Return unique ID of the current node (host). The resulting string will be convertable to a uuid.UUID.

cacheDirName(workflowID)

return:

Name of the cache directory.

getDirSizeRecursively(dirPath)

This method will return the cumulative number of bytes occupied by the files

getFileSystemSize(dirPath)

Return the free space, and total size of the file system hosting dirPath.

safeUnpickleFromStream(stream)

Attributes

UUID_LENGTH

logger

TOIL_HOME_DIR

DEFAULT_CONFIG_FILE

toil.common.UUID_LENGTH = 32
toil.common.logger
toil.common.TOIL_HOME_DIR: str
toil.common.DEFAULT_CONFIG_FILE: str
class toil.common.Config[source]

Class to represent configuration operations for a toil workflow run.

logFile: str | None
logRotating: bool
cleanWorkDir: str
max_jobs: int
max_local_jobs: int
manualMemArgs: bool
run_local_jobs_on_workers: bool
coalesceStatusCalls: bool
mesos_endpoint: str | None
mesos_framework_id: str | None
mesos_role: str | None
mesos_name: str
kubernetes_host_path: str | None
kubernetes_owner: str | None
kubernetes_service_account: str | None
kubernetes_pod_timeout: float
tes_endpoint: str
tes_user: str
tes_password: str
tes_bearer_token: str
aws_batch_region: str | None
aws_batch_queue: str | None
aws_batch_job_role_arn: str | None
scale: float
batchSystem: str
batch_logs_dir: str | None

The backing scheduler will be instructed, if possible, to save logs to this directory, where the leader can read them.

statePollingWait: int
disableAutoDeployment: bool
workflowID: str | None

This attribute uniquely identifies the job store and therefore the workflow. It is necessary in order to distinguish between two consecutive workflows for which self.jobStore is the same, e.g. when a job store name is reused after a previous run has finished successfully and its job store has been clean up.

workflowAttemptNumber: int
jobStore: str
logLevel: str
colored_logs: bool
workDir: str | None
coordination_dir: str | None
noStdOutErr: bool
stats: bool
clean: str | None
clusterStats: str
restart: bool
caching: bool | None
symlinkImports: bool
moveOutputs: bool
provisioner: str | None
nodeTypes: List[Tuple[Set[str], float | None]]
minNodes: List[int]
maxNodes: List[int]
targetTime: float
betaInertia: float
scaleInterval: int
preemptibleCompensation: float
nodeStorage: int
nodeStorageOverrides: List[str]
metrics: bool
assume_zero_overhead: bool
maxPreemptibleServiceJobs: int
maxServiceJobs: int
deadlockWait: float | int
deadlockCheckInterval: float | int
defaultMemory: int
defaultCores: float | int
defaultDisk: int
defaultPreemptible: bool
defaultAccelerators: List[toil.job.AcceleratorRequirement]
maxCores: int
maxMemory: int
maxDisk: int
retryCount: int
enableUnlimitedPreemptibleRetries: bool
doubleMem: bool
maxJobDuration: int
rescueJobsFrequency: int
maxLogFileSize: int
writeLogs: str
writeLogsGzip: str
writeLogsFromAllJobs: bool
write_messages: str | None
realTimeLogging: bool
environment: Dict[str, str]
disableChaining: bool
disableJobStoreChecksumVerification: bool
sseKey: str | None
servicePollingInterval: int
useAsync: bool
forceDockerAppliance: bool
statusWait: int
disableProgress: bool
readGlobalFileMutableByDefault: bool
debugWorker: bool
disableWorkerOutputCapture: bool
badWorker: float
badWorkerFailInterval: float
kill_polling_interval: int
cwl: bool
set_from_default_config()[source]
Return type:

None

prepare_start()[source]

After options are set, prepare for initial start of workflow.

Return type:

None

prepare_restart()[source]

Before restart options are set, prepare for a restart of a workflow. Set up any execution-specific parameters and clear out any stale ones.

Return type:

None

setOptions(options)[source]

Creates a config object from the options object.

Parameters:

options (argparse.Namespace) –

Return type:

None

check_configuration_consistency()[source]

Old checks that cannot be fit into an action class for argparse

Return type:

None

__eq__(other)[source]

Return self==value.

Parameters:

other (object) –

Return type:

bool

__hash__()[source]

Return hash(self).

Return type:

int

toil.common.check_and_create_toil_home_dir()[source]

Ensure that TOIL_HOME_DIR exists.

Raises an error if it does not exist and cannot be created. Safe to run simultaneously in multiple processes.

Return type:

None

toil.common.check_and_create_default_config_file()[source]

If the default config file does not exist, create it in the Toil home directory. Create the Toil home directory if needed

Raises an error if the default config file cannot be created. Safe to run simultaneously in multiple processes. If this process runs this function, it will always see the default config file existing with parseable contents, even if other processes are racing to create it.

No process will see an empty or partially-written default config file.

Return type:

None

toil.common.check_and_create_config_file(filepath)[source]

If the config file at the filepath does not exist, try creating it. The parent directory should be created prior to calling this :param filepath: path to config file :return: None

Parameters:

filepath (str) –

Return type:

None

toil.common.generate_config(filepath)[source]

Write a Toil config file to the given path.

Safe to run simultaneously in multiple processes. No process will see an empty or partially-written file at the given path.

Set include to “cwl” or “wdl” to include cwl options and wdl options respectfully

Parameters:

filepath (str) –

Return type:

None

toil.common.parser_with_common_options(provisioner_options=False, jobstore_option=True, prog=None)[source]
Parameters:
  • provisioner_options (bool) –

  • jobstore_option (bool) –

  • prog (Optional[str]) –

Return type:

configargparse.ArgParser

toil.common.addOptions(parser, jobstore_as_flag=False, cwl=False, wdl=False)[source]

Add all Toil command line options to a parser.

Support for config files if using configargparse. This will also check and set up the default config file.

Parameters:
  • jobstore_as_flag (bool) – make the job store option a –jobStore flag instead of a required jobStore positional argument.

  • cwl (bool) – Whether CWL options are expected. If so, CWL options won’t be suppressed.

  • wdl (bool) – Whether WDL options are expected. If so, WDL options won’t be suppressed.

  • parser (argparse.ArgumentParser) –

Return type:

None

toil.common.getNodeID()[source]

Return unique ID of the current node (host). The resulting string will be convertable to a uuid.UUID.

Tries several methods until success. The returned ID should be identical across calls from different processes on the same node at least until the next OS reboot.

The last resort method is uuid.getnode() that in some rare OS configurations may return a random ID each time it is called. However, this method should never be reached on a Linux system, because reading from /proc/sys/kernel/random/boot_id will be tried prior to that. If uuid.getnode() is reached, it will be called twice, and exception raised if the values are not identical.

Return type:

str

class toil.common.Toil(options)[source]

Bases: ContextManager[Toil]

A context manager that represents a Toil workflow.

Specifically the batch system, job store, and its configuration.

Parameters:

options (argparse.Namespace) –

config: Config
__enter__()[source]

Derive configuration from the command line options.

Then load the job store and, on restart, consolidate the derived configuration with the one from the previous invocation of the workflow.

Return type:

Toil

__exit__(exc_type, exc_val, exc_tb)[source]

Clean up after a workflow invocation.

Depending on the configuration, delete the job store.

Parameters:
Return type:

Literal[False]

start(rootJob)[source]

Invoke a Toil workflow with the given job as the root for an initial run.

This method must be called in the body of a with Toil(...) as toil: statement. This method should not be called more than once for a workflow that has not finished.

Parameters:

rootJob (toil.job.Job) – The root job of the workflow

Returns:

The root job’s return value

Return type:

Any

restart()[source]

Restarts a workflow that has been interrupted.

Returns:

The root job’s return value

Return type:

Any

classmethod getJobStore(locator)[source]

Create an instance of the concrete job store implementation that matches the given locator.

Parameters:

locator (str) – The location of the job store to be represent by the instance

Returns:

an instance of a concrete subclass of AbstractJobStore

Return type:

toil.jobStores.abstractJobStore.AbstractJobStore

static parseLocator(locator)[source]
Parameters:

locator (str) –

Return type:

Tuple[str, str]

static buildLocator(name, rest)[source]
Parameters:
  • name (str) –

  • rest (str) –

Return type:

str

classmethod resumeJobStore(locator)[source]
Parameters:

locator (str) –

Return type:

toil.jobStores.abstractJobStore.AbstractJobStore

static createBatchSystem(config)[source]

Create an instance of the batch system specified in the given config.

Parameters:

config (Config) – the current configuration

Returns:

an instance of a concrete subclass of AbstractBatchSystem

Return type:

toil.batchSystems.abstractBatchSystem.AbstractBatchSystem

importFile(srcUrl: str, sharedFileName: str, symlink: bool = True) None[source]
importFile(srcUrl: str, sharedFileName: None = None, symlink: bool = True) toil.fileStores.FileID
import_file(src_uri: str, shared_file_name: str, symlink: bool = True, check_existence: bool = True) None[source]
import_file(src_uri: str, shared_file_name: None = None, symlink: bool = True, check_existence: bool = True) toil.fileStores.FileID

Import the file at the given URL into the job store.

By default, returns None if the file does not exist.

Parameters:

check_existence – If true, raise FileNotFoundError if the file does not exist. If false, return None when the file does not exist.

See toil.jobStores.abstractJobStore.AbstractJobStore.importFile() for a full description

exportFile(jobStoreFileID, dstUrl)[source]
Parameters:
Return type:

None

export_file(file_id, dst_uri)[source]

Export file to destination pointed at by the destination URL.

See toil.jobStores.abstractJobStore.AbstractJobStore.exportFile() for a full description

Parameters:
Return type:

None

static normalize_uri(uri, check_existence=False)[source]

Given a URI, if it has no scheme, prepend “file:”.

Parameters:
  • check_existence (bool) – If set, raise FileNotFoundError if a URI points to a local file that does not exist.

  • uri (str) –

Return type:

str

static getToilWorkDir(configWorkDir=None)[source]

Return a path to a writable directory under which per-workflow directories exist.

This directory is always required to exist on a machine, even if the Toil worker has not run yet. If your workers and leader have different temp directories, you may need to set TOIL_WORKDIR.

Parameters:

configWorkDir (Optional[str]) – Value passed to the program using the –workDir flag

Returns:

Path to the Toil work directory, constant across all machines

Return type:

str

classmethod get_toil_coordination_dir(config_work_dir, config_coordination_dir)[source]

Return a path to a writable directory, which will be in memory if convenient. Ought to be used for file locking and coordination.

Parameters:
  • config_work_dir (Optional[str]) – Value passed to the program using the –workDir flag

  • config_coordination_dir (Optional[str]) – Value passed to the program using the –coordinationDir flag

Returns:

Path to the Toil coordination directory. Ought to be on a POSIX filesystem that allows directories containing open files to be deleted.

Return type:

str

static get_workflow_path_component(workflow_id)[source]

Get a safe filesystem path component for a workflow.

Will be consistent for all processes on a given machine, and different for all processes on different machines.

Parameters:

workflow_id (str) – The ID of the current Toil workflow.

Return type:

str

classmethod getLocalWorkflowDir(workflowID, configWorkDir=None)[source]

Return the directory where worker directories and the cache will be located for this workflow on this machine.

Parameters:
  • configWorkDir (Optional[str]) – Value passed to the program using the –workDir flag

  • workflowID (str) –

Returns:

Path to the local workflow directory on this machine

Return type:

str

classmethod get_local_workflow_coordination_dir(workflow_id, config_work_dir, config_coordination_dir)[source]

Return the directory where coordination files should be located for this workflow on this machine. These include internal Toil databases and lock files for the machine.

If an in-memory filesystem is available, it is used. Otherwise, the local workflow directory, which may be on a shared network filesystem, is used.

Parameters:
  • workflow_id (str) – Unique ID of the current workflow.

  • config_work_dir (Optional[str]) – Value used for the work directory in the current Toil Config.

  • config_coordination_dir (Optional[str]) – Value used for the coordination directory in the current Toil Config.

Returns:

Path to the local workflow coordination directory on this machine.

Return type:

str

exception toil.common.ToilRestartException(message)[source]

Bases: Exception

Common base class for all non-exit exceptions.

Parameters:

message (str) –

exception toil.common.ToilContextManagerException[source]

Bases: Exception

Common base class for all non-exit exceptions.

class toil.common.ToilMetrics(bus, provisioner=None)[source]
Parameters:
startDashboard(clusterName, zone)[source]
Parameters:
  • clusterName (str) –

  • zone (str) –

Return type:

None

add_prometheus_data_source()[source]
Return type:

None

log(message)[source]
Parameters:

message (str) –

Return type:

None

logClusterSize(m)[source]
Parameters:

m (toil.bus.ClusterSizeMessage) –

Return type:

None

logClusterDesiredSize(m)[source]
Parameters:

m (toil.bus.ClusterDesiredSizeMessage) –

Return type:

None

logQueueSize(m)[source]
Parameters:

m (toil.bus.QueueSizeMessage) –

Return type:

None

logMissingJob(m)[source]
Parameters:

m (toil.bus.JobMissingMessage) –

Return type:

None

logIssuedJob(m)[source]
Parameters:

m (toil.bus.JobIssuedMessage) –

Return type:

None

logFailedJob(m)[source]
Parameters:

m (toil.bus.JobFailedMessage) –

Return type:

None

logCompletedJob(m)[source]
Parameters:

m (toil.bus.JobCompletedMessage) –

Return type:

None

shutdown()[source]
Return type:

None

toil.common.cacheDirName(workflowID)[source]
Returns:

Name of the cache directory.

Parameters:

workflowID (str) –

Return type:

str

toil.common.getDirSizeRecursively(dirPath)[source]

This method will return the cumulative number of bytes occupied by the files on disk in the directory and its subdirectories.

If the method is unable to access a file or directory (due to insufficient permissions, or due to the file or directory having been removed while this function was attempting to traverse it), the error will be handled internally, and a (possibly 0) lower bound on the size of the directory will be returned.

Parameters:

dirPath (str) – A valid path to a directory or file.

Returns:

Total size, in bytes, of the file or directory at dirPath.

Return type:

int

toil.common.getFileSystemSize(dirPath)[source]

Return the free space, and total size of the file system hosting dirPath.

Parameters:

dirPath (str) – A valid path to a directory.

Returns:

free space and total size of file system

Return type:

Tuple[int, int]

toil.common.safeUnpickleFromStream(stream)[source]
Parameters:

stream (IO[Any]) –

Return type:

Any