toil.batchSystems.cleanup_support¶
Attributes¶
Classes¶
Partial implementation of AbstractBatchSystem, support methods. |
|
Adds a local queue for helper jobs, useful for CWL & others. |
|
Class to represent configuration operations for a toil workflow run. |
|
A context manager that represents a Toil workflow. |
|
Class that lets a bunch of processes detect and elect a last process |
|
Adds cleanup support when the last running job leaves a node, for batch |
|
Context manager used by |
Module Contents¶
- class toil.batchSystems.cleanup_support.BatchSystemSupport(config, maxCores, maxMemory, maxDisk)[source]¶
Bases:
AbstractBatchSystemPartial implementation of AbstractBatchSystem, support methods.
- Parameters:
config (toil.common.Config)
maxCores (float)
maxMemory (int)
maxDisk (int)
- check_resource_request(requirer)[source]¶
Check resource request is not greater than that available or allowed.
- Parameters:
requirer (toil.job.Requirer) – Object whose requirements are being checked
job_name (str) – Name of the job being checked, for generating a useful error report.
detail (str) – Batch-system-specific message to include in the error.
- Raises:
InsufficientSystemResources – raised when a resource is requested in an amount greater than allowed
- Return type:
None
- setEnv(name, value=None)[source]¶
Set an environment variable for the worker process before it is launched. The worker process will typically inherit the environment of the machine it is running on but this method makes it possible to override specific variables in that inherited environment before the worker is launched. Note that this mechanism is different to the one used by the worker internally to set up the environment of a job. A call to this method affects all jobs issued after this method returns. Note to implementors: This means that you would typically need to copy the variables before enqueuing a job.
If no value is provided it will be looked up from the current environment.
- Parameters:
- Raises:
RuntimeError – if value is None and the name cannot be found in the environment
- Return type:
None
- set_message_bus(message_bus)[source]¶
Give the batch system an opportunity to connect directly to the message bus, so that it can send informational messages about the jobs it is running to other Toil components.
- Parameters:
message_bus (toil.bus.MessageBus)
- Return type:
None
- get_batch_logs_dir()[source]¶
Get the directory where the backing batch system should save its logs.
Only really makes sense if the backing batch system actually saves logs to a filesystem; Kubernetes for example does not. Ought to be a directory shared between the leader and the workers, if the backing batch system writes logs onto the worker’s view of the filesystem, like many HPC schedulers do.
- Return type:
- format_std_out_err_path(toil_job_id, cluster_job_id, std)[source]¶
Format path for batch system standard output/error and other files generated by the batch system itself.
Files will be written to the batch logs directory (–batchLogsDir, defaulting to the Toil work directory) with names containing both the Toil and batch system job IDs, for ease of debugging job failures.
- Param:
int toil_job_id : The unique id that Toil gives a job.
- Param:
cluster_job_id : What the cluster, for example, GridEngine, uses as its internal job id.
- Param:
string std : The provenance of the stream (for example: ‘err’ for ‘stderr’ or ‘out’ for ‘stdout’)
- Return type:
string : Formatted filename; however if self.config.noStdOutErr is true, returns ‘/dev/null’ or equivalent.
- Parameters:
- format_std_out_err_glob(toil_job_id)[source]¶
Get a glob string that will match all file paths generated by format_std_out_err_path for a job.
- static workerCleanup(info)[source]¶
Cleans up the worker node on batch system shutdown.
Also see
supportsWorkerCleanup().- Parameters:
info (WorkerCleanupInfo) – A named tuple consisting of all the relevant information for cleaning up the worker.
- Return type:
None
- class toil.batchSystems.cleanup_support.WorkerCleanupInfo[source]¶
Bases:
NamedTuple
- class toil.batchSystems.cleanup_support.BatchSystemLocalSupport(config, maxCores, maxMemory, maxDisk)[source]¶
Bases:
toil.batchSystems.abstractBatchSystem.BatchSystemSupportAdds a local queue for helper jobs, useful for CWL & others.
- Parameters:
config (toil.common.Config)
maxCores (float)
maxMemory (int)
maxDisk (int)
- handleLocalJob(command, jobDesc)[source]¶
To be called by issueBatchJob.
Returns the jobID if the jobDesc has been submitted to the local queue, otherwise returns None
- Parameters:
command (str)
jobDesc (toil.job.JobDescription)
- Return type:
Optional[int]
- killLocalJobs(jobIDs)[source]¶
Will kill all local jobs that match the provided jobIDs.
To be called by killBatchJobs.
- Parameters:
jobIDs (List[int])
- Return type:
None
- getUpdatedLocalJob(maxWait)[source]¶
To be called by getUpdatedBatchJob().
- Parameters:
maxWait (int)
- Return type:
Optional[toil.batchSystems.abstractBatchSystem.UpdatedBatchJobInfo]
- class toil.batchSystems.cleanup_support.Config[source]¶
Class to represent configuration operations for a toil workflow run.
- batch_logs_dir: str | None¶
The backing scheduler will be instructed, if possible, to save logs to this directory, where the leader can read them.
- workflowID: str | None¶
This attribute uniquely identifies the job store and therefore the workflow. It is necessary in order to distinguish between two consecutive workflows for which self.jobStore is the same, e.g. when a job store name is reused after a previous run has finished successfully and its job store has been clean up.
- defaultAccelerators: List[toil.job.AcceleratorRequirement]¶
- prepare_start()[source]¶
After options are set, prepare for initial start of workflow.
- Return type:
None
- prepare_restart()[source]¶
Before restart options are set, prepare for a restart of a workflow. Set up any execution-specific parameters and clear out any stale ones.
- Return type:
None
- setOptions(options)[source]¶
Creates a config object from the options object.
- Parameters:
options (argparse.Namespace)
- Return type:
None
- class toil.batchSystems.cleanup_support.Toil(options)[source]¶
Bases:
ContextManager[Toil]A context manager that represents a Toil workflow.
Specifically the batch system, job store, and its configuration.
- Parameters:
options (argparse.Namespace)
- __enter__()[source]¶
Derive configuration from the command line options.
Then load the job store and, on restart, consolidate the derived configuration with the one from the previous invocation of the workflow.
- Return type:
- __exit__(exc_type, exc_val, exc_tb)[source]¶
Clean up after a workflow invocation.
Depending on the configuration, delete the job store.
- Parameters:
exc_type (Optional[Type[BaseException]])
exc_val (Optional[BaseException])
exc_tb (Optional[types.TracebackType])
- Return type:
Literal[False]
- start(rootJob)[source]¶
Invoke a Toil workflow with the given job as the root for an initial run.
This method must be called in the body of a
with Toil(...) as toil:statement. This method should not be called more than once for a workflow that has not finished.- Parameters:
rootJob (toil.job.Job) – The root job of the workflow
- Returns:
The root job’s return value
- Return type:
Any
- restart()[source]¶
Restarts a workflow that has been interrupted.
- Returns:
The root job’s return value
- Return type:
Any
- classmethod getJobStore(locator)[source]¶
Create an instance of the concrete job store implementation that matches the given locator.
- Parameters:
locator (str) – The location of the job store to be represent by the instance
- Returns:
an instance of a concrete subclass of AbstractJobStore
- Return type:
- static createBatchSystem(config)[source]¶
Create an instance of the batch system specified in the given config.
- Parameters:
config (Config) – the current configuration
- Returns:
an instance of a concrete subclass of AbstractBatchSystem
- Return type:
- importFile(srcUrl: str, sharedFileName: str, symlink: bool = True) None[source]¶
- importFile(srcUrl: str, sharedFileName: None = None, symlink: bool = True) toil.fileStores.FileID
- import_file(src_uri: str, shared_file_name: str, symlink: bool = True, check_existence: bool = True) None[source]¶
- import_file(src_uri: str, shared_file_name: None = None, symlink: bool = True, check_existence: bool = True) toil.fileStores.FileID
Import the file at the given URL into the job store.
By default, returns None if the file does not exist.
- Parameters:
check_existence – If true, raise FileNotFoundError if the file does not exist. If false, return None when the file does not exist.
See
toil.jobStores.abstractJobStore.AbstractJobStore.importFile()for a full description
- exportFile(jobStoreFileID, dstUrl)[source]¶
- Parameters:
jobStoreFileID (toil.fileStores.FileID)
dstUrl (str)
- Return type:
None
- export_file(file_id, dst_uri)[source]¶
Export file to destination pointed at by the destination URL.
See
toil.jobStores.abstractJobStore.AbstractJobStore.exportFile()for a full description- Parameters:
file_id (toil.fileStores.FileID)
dst_uri (str)
- Return type:
None
- static normalize_uri(uri, check_existence=False)[source]¶
Given a URI, if it has no scheme, prepend “file:”.
- static getToilWorkDir(configWorkDir=None)[source]¶
Return a path to a writable directory under which per-workflow directories exist.
This directory is always required to exist on a machine, even if the Toil worker has not run yet. If your workers and leader have different temp directories, you may need to set TOIL_WORKDIR.
- classmethod get_toil_coordination_dir(config_work_dir, config_coordination_dir)[source]¶
Return a path to a writable directory, which will be in memory if convenient. Ought to be used for file locking and coordination.
- Parameters:
- Returns:
Path to the Toil coordination directory. Ought to be on a POSIX filesystem that allows directories containing open files to be deleted.
- Return type:
- static get_workflow_path_component(workflow_id)[source]¶
Get a safe filesystem path component for a workflow.
Will be consistent for all processes on a given machine, and different for all processes on different machines.
- classmethod getLocalWorkflowDir(workflowID, configWorkDir=None)[source]¶
Return the directory where worker directories and the cache will be located for this workflow on this machine.
- classmethod get_local_workflow_coordination_dir(workflow_id, config_work_dir, config_coordination_dir)[source]¶
Return the directory where coordination files should be located for this workflow on this machine. These include internal Toil databases and lock files for the machine.
If an in-memory filesystem is available, it is used. Otherwise, the local workflow directory, which may be on a shared network filesystem, is used.
- Parameters:
- Returns:
Path to the local workflow coordination directory on this machine.
- Return type:
- class toil.batchSystems.cleanup_support.LastProcessStandingArena(base_dir, name)[source]¶
Class that lets a bunch of processes detect and elect a last process standing.
Process enter and leave (sometimes due to sudden existence failure). We guarantee that the last process to leave, if it leaves properly, will get a chance to do some cleanup. If new processes try to enter during the cleanup, they will be delayed until after the cleanup has happened and the previous “last” process has finished leaving.
The user is responsible for making sure you always leave if you enter! Consider using a try/finally; this class is not a context manager.
- enter()[source]¶
This process is entering the arena. If cleanup is in progress, blocks until it is finished.
You may not enter the arena again before leaving it.
- Return type:
None
- leave()[source]¶
This process is leaving the arena. If this process happens to be the last process standing, yields something, with other processes blocked from joining the arena until the loop body completes and the process has finished leaving. Otherwise, does not yield anything.
Should be used in a loop:
- for _ in arena.leave():
# If we get here, we were the last process. Do the cleanup pass
- Return type:
Iterator[bool]
- toil.batchSystems.cleanup_support.logger¶
- class toil.batchSystems.cleanup_support.BatchSystemCleanupSupport(config, maxCores, maxMemory, maxDisk)[source]¶
Bases:
toil.batchSystems.local_support.BatchSystemLocalSupportAdds cleanup support when the last running job leaves a node, for batch systems that can’t provide it using the backing scheduler.
- Parameters:
config (toil.common.Config)
maxCores (float)
maxMemory (int)
maxDisk (int)
- classmethod supportsWorkerCleanup()[source]¶
Whether this batch system supports worker cleanup.
Indicates whether this batch system invokes
BatchSystemSupport.workerCleanup()after the last job for a particular workflow invocation finishes. Note that the term worker refers to an entire node, not just a worker process. A worker process may run more than one job sequentially, and more than one concurrent worker process may exist on a worker node, for the same workflow. The batch system is said to shut down after the last worker process terminates.- Return type:
- getWorkerContexts()[source]¶
Get a list of picklable context manager objects to wrap worker work in, in order.
Can be used to ask the Toil worker to do things in-process (such as configuring environment variables, hot-deploying user scripts, or cleaning up a node) that would otherwise require a wrapping “executor” process.
- Return type:
List[ContextManager[Any]]
- class toil.batchSystems.cleanup_support.WorkerCleanupContext(workerCleanupInfo)[source]¶
Context manager used by
BatchSystemCleanupSupportto implement cleanup on a node after the last worker is done working.Gets wrapped around the worker’s work.
- Parameters:
workerCleanupInfo (toil.batchSystems.abstractBatchSystem.WorkerCleanupInfo)
- __exit__(type, value, traceback)[source]¶
- Parameters:
type (Optional[Type[BaseException]])
value (Optional[BaseException])
traceback (Optional[types.TracebackType])
- Return type:
None