toil.batchSystems.cleanup_support

Attributes

logger

Classes

BatchSystemSupport

Partial implementation of AbstractBatchSystem, support methods.

WorkerCleanupInfo

BatchSystemLocalSupport

Adds a local queue for helper jobs, useful for CWL & others.

Config

Class to represent configuration operations for a toil workflow run.

Toil

A context manager that represents a Toil workflow.

LastProcessStandingArena

Class that lets a bunch of processes detect and elect a last process

BatchSystemCleanupSupport

Adds cleanup support when the last running job leaves a node, for batch

WorkerCleanupContext

Context manager used by BatchSystemCleanupSupport to implement

Module Contents

class toil.batchSystems.cleanup_support.BatchSystemSupport(config, maxCores, maxMemory, maxDisk)[source]

Bases: AbstractBatchSystem

Partial implementation of AbstractBatchSystem, support methods.

Parameters:
check_resource_request(requirer)[source]

Check resource request is not greater than that available or allowed.

Parameters:
  • requirer (toil.job.Requirer) – Object whose requirements are being checked

  • job_name (str) – Name of the job being checked, for generating a useful error report.

  • detail (str) – Batch-system-specific message to include in the error.

Raises:

InsufficientSystemResources – raised when a resource is requested in an amount greater than allowed

Return type:

None

setEnv(name, value=None)[source]

Set an environment variable for the worker process before it is launched. The worker process will typically inherit the environment of the machine it is running on but this method makes it possible to override specific variables in that inherited environment before the worker is launched. Note that this mechanism is different to the one used by the worker internally to set up the environment of a job. A call to this method affects all jobs issued after this method returns. Note to implementors: This means that you would typically need to copy the variables before enqueuing a job.

If no value is provided it will be looked up from the current environment.

Parameters:
  • name (str) – the environment variable to be set on the worker.

  • value (Optional[str]) – if given, the environment variable given by name will be set to this value. If None, the variable’s current value will be used as the value on the worker

Raises:

RuntimeError – if value is None and the name cannot be found in the environment

Return type:

None

set_message_bus(message_bus)[source]

Give the batch system an opportunity to connect directly to the message bus, so that it can send informational messages about the jobs it is running to other Toil components.

Parameters:

message_bus (toil.bus.MessageBus)

Return type:

None

get_batch_logs_dir()[source]

Get the directory where the backing batch system should save its logs.

Only really makes sense if the backing batch system actually saves logs to a filesystem; Kubernetes for example does not. Ought to be a directory shared between the leader and the workers, if the backing batch system writes logs onto the worker’s view of the filesystem, like many HPC schedulers do.

Return type:

str

format_std_out_err_path(toil_job_id, cluster_job_id, std)[source]

Format path for batch system standard output/error and other files generated by the batch system itself.

Files will be written to the batch logs directory (–batchLogsDir, defaulting to the Toil work directory) with names containing both the Toil and batch system job IDs, for ease of debugging job failures.

Param:

int toil_job_id : The unique id that Toil gives a job.

Param:

cluster_job_id : What the cluster, for example, GridEngine, uses as its internal job id.

Param:

string std : The provenance of the stream (for example: ‘err’ for ‘stderr’ or ‘out’ for ‘stdout’)

Return type:

string : Formatted filename; however if self.config.noStdOutErr is true, returns ‘/dev/null’ or equivalent.

Parameters:
  • toil_job_id (int)

  • cluster_job_id (str)

  • std (str)

format_std_out_err_glob(toil_job_id)[source]

Get a glob string that will match all file paths generated by format_std_out_err_path for a job.

Parameters:

toil_job_id (int)

Return type:

str

static workerCleanup(info)[source]

Cleans up the worker node on batch system shutdown.

Also see supportsWorkerCleanup().

Parameters:

info (WorkerCleanupInfo) – A named tuple consisting of all the relevant information for cleaning up the worker.

Return type:

None

class toil.batchSystems.cleanup_support.WorkerCleanupInfo[source]

Bases: NamedTuple

work_dir: str | None

Work directory path (where the cache would go) if specified by user

coordination_dir: str | None

Coordination directory path (where lock files would go) if specified by user

workflow_id: str

Used to identify files specific to this workflow

clean_work_dir: str

When to clean up the work and coordination directories for a job (‘always’, ‘onSuccess’, ‘onError’, ‘never’)

class toil.batchSystems.cleanup_support.BatchSystemLocalSupport(config, maxCores, maxMemory, maxDisk)[source]

Bases: toil.batchSystems.abstractBatchSystem.BatchSystemSupport

Adds a local queue for helper jobs, useful for CWL & others.

Parameters:
handleLocalJob(command, jobDesc)[source]

To be called by issueBatchJob.

Returns the jobID if the jobDesc has been submitted to the local queue, otherwise returns None

Parameters:
Return type:

Optional[int]

killLocalJobs(jobIDs)[source]

Will kill all local jobs that match the provided jobIDs.

To be called by killBatchJobs.

Parameters:

jobIDs (List[int])

Return type:

None

getIssuedLocalJobIDs()[source]

To be called by getIssuedBatchJobIDs.

Return type:

List[int]

getRunningLocalJobIDs()[source]

To be called by getRunningBatchJobIDs().

Return type:

Dict[int, float]

getUpdatedLocalJob(maxWait)[source]

To be called by getUpdatedBatchJob().

Parameters:

maxWait (int)

Return type:

Optional[toil.batchSystems.abstractBatchSystem.UpdatedBatchJobInfo]

getNextJobID()[source]

Must be used to get job IDs so that the local and batch jobs do not conflict.

Return type:

int

shutdownLocal()[source]

To be called from shutdown().

Return type:

None

class toil.batchSystems.cleanup_support.Config[source]

Class to represent configuration operations for a toil workflow run.

logFile: str | None
logRotating: bool
cleanWorkDir: str
max_jobs: int
max_local_jobs: int
manualMemArgs: bool
run_local_jobs_on_workers: bool
coalesceStatusCalls: bool
mesos_endpoint: str | None
mesos_framework_id: str | None
mesos_role: str | None
mesos_name: str
kubernetes_host_path: str | None
kubernetes_owner: str | None
kubernetes_service_account: str | None
kubernetes_pod_timeout: float
kubernetes_privileged: bool
tes_endpoint: str
tes_user: str
tes_password: str
tes_bearer_token: str
aws_batch_region: str | None
aws_batch_queue: str | None
aws_batch_job_role_arn: str | None
scale: float
batchSystem: str
batch_logs_dir: str | None

The backing scheduler will be instructed, if possible, to save logs to this directory, where the leader can read them.

statePollingWait: int
state_polling_timeout: int
disableAutoDeployment: bool
workflowID: str | None

This attribute uniquely identifies the job store and therefore the workflow. It is necessary in order to distinguish between two consecutive workflows for which self.jobStore is the same, e.g. when a job store name is reused after a previous run has finished successfully and its job store has been clean up.

workflowAttemptNumber: int
jobStore: str
logLevel: str
colored_logs: bool
workDir: str | None
coordination_dir: str | None
noStdOutErr: bool
stats: bool
clean: str | None
clusterStats: str
restart: bool
caching: bool | None
symlinkImports: bool
moveOutputs: bool
provisioner: str | None
nodeTypes: List[Tuple[Set[str], float | None]]
minNodes: List[int]
maxNodes: List[int]
targetTime: float
betaInertia: float
scaleInterval: int
preemptibleCompensation: float
nodeStorage: int
nodeStorageOverrides: List[str]
metrics: bool
assume_zero_overhead: bool
maxPreemptibleServiceJobs: int
maxServiceJobs: int
deadlockWait: float | int
deadlockCheckInterval: float | int
defaultMemory: int
defaultCores: float | int
defaultDisk: int
defaultPreemptible: bool
defaultAccelerators: List[toil.job.AcceleratorRequirement]
maxCores: int
maxMemory: int
maxDisk: int
retryCount: int
enableUnlimitedPreemptibleRetries: bool
doubleMem: bool
maxJobDuration: int
rescueJobsFrequency: int
job_store_timeout: float
maxLogFileSize: int
writeLogs: str
writeLogsGzip: str
writeLogsFromAllJobs: bool
write_messages: str | None
realTimeLogging: bool
environment: Dict[str, str]
disableChaining: bool
disableJobStoreChecksumVerification: bool
sseKey: str | None
servicePollingInterval: int
useAsync: bool
forceDockerAppliance: bool
statusWait: int
disableProgress: bool
readGlobalFileMutableByDefault: bool
debugWorker: bool
disableWorkerOutputCapture: bool
badWorker: float
badWorkerFailInterval: float
kill_polling_interval: int
cwl: bool
set_from_default_config()[source]
Return type:

None

prepare_start()[source]

After options are set, prepare for initial start of workflow.

Return type:

None

prepare_restart()[source]

Before restart options are set, prepare for a restart of a workflow. Set up any execution-specific parameters and clear out any stale ones.

Return type:

None

setOptions(options)[source]

Creates a config object from the options object.

Parameters:

options (argparse.Namespace)

Return type:

None

check_configuration_consistency()[source]

Old checks that cannot be fit into an action class for argparse

Return type:

None

__eq__(other)[source]

Return self==value.

Parameters:

other (object)

Return type:

bool

__hash__()[source]

Return hash(self).

Return type:

int

class toil.batchSystems.cleanup_support.Toil(options)[source]

Bases: ContextManager[Toil]

A context manager that represents a Toil workflow.

Specifically the batch system, job store, and its configuration.

Parameters:

options (argparse.Namespace)

config: Config
__enter__()[source]

Derive configuration from the command line options.

Then load the job store and, on restart, consolidate the derived configuration with the one from the previous invocation of the workflow.

Return type:

Toil

__exit__(exc_type, exc_val, exc_tb)[source]

Clean up after a workflow invocation.

Depending on the configuration, delete the job store.

Parameters:
Return type:

Literal[False]

start(rootJob)[source]

Invoke a Toil workflow with the given job as the root for an initial run.

This method must be called in the body of a with Toil(...) as toil: statement. This method should not be called more than once for a workflow that has not finished.

Parameters:

rootJob (toil.job.Job) – The root job of the workflow

Returns:

The root job’s return value

Return type:

Any

restart()[source]

Restarts a workflow that has been interrupted.

Returns:

The root job’s return value

Return type:

Any

classmethod getJobStore(locator)[source]

Create an instance of the concrete job store implementation that matches the given locator.

Parameters:

locator (str) – The location of the job store to be represent by the instance

Returns:

an instance of a concrete subclass of AbstractJobStore

Return type:

toil.jobStores.abstractJobStore.AbstractJobStore

static parseLocator(locator)[source]
Parameters:

locator (str)

Return type:

Tuple[str, str]

static buildLocator(name, rest)[source]
Parameters:
Return type:

str

classmethod resumeJobStore(locator)[source]
Parameters:

locator (str)

Return type:

toil.jobStores.abstractJobStore.AbstractJobStore

static createBatchSystem(config)[source]

Create an instance of the batch system specified in the given config.

Parameters:

config (Config) – the current configuration

Returns:

an instance of a concrete subclass of AbstractBatchSystem

Return type:

toil.batchSystems.abstractBatchSystem.AbstractBatchSystem

importFile(srcUrl: str, sharedFileName: str, symlink: bool = True) None[source]
importFile(srcUrl: str, sharedFileName: None = None, symlink: bool = True) toil.fileStores.FileID
import_file(src_uri: str, shared_file_name: str, symlink: bool = True, check_existence: bool = True) None[source]
import_file(src_uri: str, shared_file_name: None = None, symlink: bool = True, check_existence: bool = True) toil.fileStores.FileID

Import the file at the given URL into the job store.

By default, returns None if the file does not exist.

Parameters:

check_existence – If true, raise FileNotFoundError if the file does not exist. If false, return None when the file does not exist.

See toil.jobStores.abstractJobStore.AbstractJobStore.importFile() for a full description

exportFile(jobStoreFileID, dstUrl)[source]
Parameters:
Return type:

None

export_file(file_id, dst_uri)[source]

Export file to destination pointed at by the destination URL.

See toil.jobStores.abstractJobStore.AbstractJobStore.exportFile() for a full description

Parameters:
Return type:

None

static normalize_uri(uri, check_existence=False)[source]

Given a URI, if it has no scheme, prepend “file:”.

Parameters:
  • check_existence (bool) – If set, raise FileNotFoundError if a URI points to a local file that does not exist.

  • uri (str)

Return type:

str

static getToilWorkDir(configWorkDir=None)[source]

Return a path to a writable directory under which per-workflow directories exist.

This directory is always required to exist on a machine, even if the Toil worker has not run yet. If your workers and leader have different temp directories, you may need to set TOIL_WORKDIR.

Parameters:

configWorkDir (Optional[str]) – Value passed to the program using the –workDir flag

Returns:

Path to the Toil work directory, constant across all machines

Return type:

str

classmethod get_toil_coordination_dir(config_work_dir, config_coordination_dir)[source]

Return a path to a writable directory, which will be in memory if convenient. Ought to be used for file locking and coordination.

Parameters:
  • config_work_dir (Optional[str]) – Value passed to the program using the –workDir flag

  • config_coordination_dir (Optional[str]) – Value passed to the program using the –coordinationDir flag

  • workflow_id – Used if a tmpdir_prefix exists to create full directory paths unique per workflow

Returns:

Path to the Toil coordination directory. Ought to be on a POSIX filesystem that allows directories containing open files to be deleted.

Return type:

str

static get_workflow_path_component(workflow_id)[source]

Get a safe filesystem path component for a workflow.

Will be consistent for all processes on a given machine, and different for all processes on different machines.

Parameters:

workflow_id (str) – The ID of the current Toil workflow.

Return type:

str

classmethod getLocalWorkflowDir(workflowID, configWorkDir=None)[source]

Return the directory where worker directories and the cache will be located for this workflow on this machine.

Parameters:
  • configWorkDir (Optional[str]) – Value passed to the program using the –workDir flag

  • workflowID (str)

Returns:

Path to the local workflow directory on this machine

Return type:

str

classmethod get_local_workflow_coordination_dir(workflow_id, config_work_dir, config_coordination_dir)[source]

Return the directory where coordination files should be located for this workflow on this machine. These include internal Toil databases and lock files for the machine.

If an in-memory filesystem is available, it is used. Otherwise, the local workflow directory, which may be on a shared network filesystem, is used.

Parameters:
  • workflow_id (str) – Unique ID of the current workflow.

  • config_work_dir (Optional[str]) – Value used for the work directory in the current Toil Config.

  • config_coordination_dir (Optional[str]) – Value used for the coordination directory in the current Toil Config.

Returns:

Path to the local workflow coordination directory on this machine.

Return type:

str

class toil.batchSystems.cleanup_support.LastProcessStandingArena(base_dir, name)[source]

Class that lets a bunch of processes detect and elect a last process standing.

Process enter and leave (sometimes due to sudden existence failure). We guarantee that the last process to leave, if it leaves properly, will get a chance to do some cleanup. If new processes try to enter during the cleanup, they will be delayed until after the cleanup has happened and the previous “last” process has finished leaving.

The user is responsible for making sure you always leave if you enter! Consider using a try/finally; this class is not a context manager.

Parameters:
enter()[source]

This process is entering the arena. If cleanup is in progress, blocks until it is finished.

You may not enter the arena again before leaving it.

Return type:

None

leave()[source]

This process is leaving the arena. If this process happens to be the last process standing, yields something, with other processes blocked from joining the arena until the loop body completes and the process has finished leaving. Otherwise, does not yield anything.

Should be used in a loop:

for _ in arena.leave():

# If we get here, we were the last process. Do the cleanup pass

Return type:

Iterator[bool]

toil.batchSystems.cleanup_support.logger
class toil.batchSystems.cleanup_support.BatchSystemCleanupSupport(config, maxCores, maxMemory, maxDisk)[source]

Bases: toil.batchSystems.local_support.BatchSystemLocalSupport

Adds cleanup support when the last running job leaves a node, for batch systems that can’t provide it using the backing scheduler.

Parameters:
classmethod supportsWorkerCleanup()[source]

Whether this batch system supports worker cleanup.

Indicates whether this batch system invokes BatchSystemSupport.workerCleanup() after the last job for a particular workflow invocation finishes. Note that the term worker refers to an entire node, not just a worker process. A worker process may run more than one job sequentially, and more than one concurrent worker process may exist on a worker node, for the same workflow. The batch system is said to shut down after the last worker process terminates.

Return type:

bool

getWorkerContexts()[source]

Get a list of picklable context manager objects to wrap worker work in, in order.

Can be used to ask the Toil worker to do things in-process (such as configuring environment variables, hot-deploying user scripts, or cleaning up a node) that would otherwise require a wrapping “executor” process.

Return type:

List[ContextManager[Any]]

class toil.batchSystems.cleanup_support.WorkerCleanupContext(workerCleanupInfo)[source]

Context manager used by BatchSystemCleanupSupport to implement cleanup on a node after the last worker is done working.

Gets wrapped around the worker’s work.

Parameters:

workerCleanupInfo (toil.batchSystems.abstractBatchSystem.WorkerCleanupInfo)

__enter__()[source]
Return type:

None

__exit__(type, value, traceback)[source]
Parameters:
Return type:

None