toil.server.wes.tasks¶
Attributes¶
Classes¶
A context manager that represents a Toil workflow. |
|
Class for managing the WES workflow state machine. |
|
A class to represent a workflow runner to run the requested workflow. |
|
Abstraction over the Celery API. Runs our run_wes task and allows canceling it. |
|
Version of TaskRunner that just runs tasks with Multiprocessing. |
Functions¶
|
Generate a random locator for a job store of the given type. Raises an |
|
Connect to a place to store state for the given workflow, in the state |
|
Download a file from the Internet and write it to dest. |
|
Download a file from Amazon S3 and write it to dest. |
|
Return the type of the file as a human readable string. |
Return the current time in ISO 8601 format. |
|
|
Create a link to a file from src to dest. |
|
Run a requested workflow. |
|
Send a SIGTERM signal to the process that is running task_id. |
Module Contents¶
- class toil.server.wes.tasks.Toil(options)¶
Bases:
ContextManager[Toil]A context manager that represents a Toil workflow.
Specifically the batch system, job store, and its configuration.
- Parameters:
options (argparse.Namespace)
- __enter__()¶
Derive configuration from the command line options.
Then load the job store and, on restart, consolidate the derived configuration with the one from the previous invocation of the workflow.
- Return type:
- __exit__(exc_type, exc_val, exc_tb)¶
Clean up after a workflow invocation.
Depending on the configuration, delete the job store.
- Parameters:
exc_type (Optional[Type[BaseException]])
exc_val (Optional[BaseException])
exc_tb (Optional[types.TracebackType])
- Return type:
Literal[False]
- start(rootJob)¶
Invoke a Toil workflow with the given job as the root for an initial run.
This method must be called in the body of a
with Toil(...) as toil:statement. This method should not be called more than once for a workflow that has not finished.- Parameters:
rootJob (toil.job.Job) – The root job of the workflow
- Returns:
The root job’s return value
- Return type:
Any
- restart()¶
Restarts a workflow that has been interrupted.
- Returns:
The root job’s return value
- Return type:
Any
- classmethod getJobStore(locator)¶
Create an instance of the concrete job store implementation that matches the given locator.
- Parameters:
locator (str) – The location of the job store to be represent by the instance
- Returns:
an instance of a concrete subclass of AbstractJobStore
- Return type:
- static createBatchSystem(config)¶
Create an instance of the batch system specified in the given config.
- Parameters:
config (Config) – the current configuration
- Returns:
an instance of a concrete subclass of AbstractBatchSystem
- Return type:
- importFile(srcUrl: str, sharedFileName: str, symlink: bool = True) None¶
- importFile(srcUrl: str, sharedFileName: None = None, symlink: bool = True) toil.fileStores.FileID
- import_file(src_uri: str, shared_file_name: str, symlink: bool = True, check_existence: bool = True) None¶
- import_file(src_uri: str, shared_file_name: None = None, symlink: bool = True, check_existence: bool = True) toil.fileStores.FileID
Import the file at the given URL into the job store.
By default, returns None if the file does not exist.
- Parameters:
check_existence – If true, raise FileNotFoundError if the file does not exist. If false, return None when the file does not exist.
See
toil.jobStores.abstractJobStore.AbstractJobStore.importFile()for a full description
- exportFile(jobStoreFileID, dstUrl)¶
- Parameters:
jobStoreFileID (toil.fileStores.FileID)
dstUrl (str)
- Return type:
None
- export_file(file_id, dst_uri)¶
Export file to destination pointed at by the destination URL.
See
toil.jobStores.abstractJobStore.AbstractJobStore.exportFile()for a full description- Parameters:
file_id (toil.fileStores.FileID)
dst_uri (str)
- Return type:
None
- static normalize_uri(uri, check_existence=False)¶
Given a URI, if it has no scheme, prepend “file:”.
- static getToilWorkDir(configWorkDir=None)¶
Return a path to a writable directory under which per-workflow directories exist.
This directory is always required to exist on a machine, even if the Toil worker has not run yet. If your workers and leader have different temp directories, you may need to set TOIL_WORKDIR.
- classmethod get_toil_coordination_dir(config_work_dir, config_coordination_dir)¶
Return a path to a writable directory, which will be in memory if convenient. Ought to be used for file locking and coordination.
- Parameters:
- Returns:
Path to the Toil coordination directory. Ought to be on a POSIX filesystem that allows directories containing open files to be deleted.
- Return type:
- static get_workflow_path_component(workflow_id)¶
Get a safe filesystem path component for a workflow.
Will be consistent for all processes on a given machine, and different for all processes on different machines.
- classmethod getLocalWorkflowDir(workflowID, configWorkDir=None)¶
Return the directory where worker directories and the cache will be located for this workflow on this machine.
- classmethod get_local_workflow_coordination_dir(workflow_id, config_work_dir, config_coordination_dir)¶
Return the directory where coordination files should be located for this workflow on this machine. These include internal Toil databases and lock files for the machine.
If an in-memory filesystem is available, it is used. Otherwise, the local workflow directory, which may be on a shared network filesystem, is used.
- Parameters:
- Returns:
Path to the local workflow coordination directory on this machine.
- Return type:
- toil.server.wes.tasks.generate_locator(job_store_type, local_suggestion=None, decoration=None)¶
Generate a random locator for a job store of the given type. Raises an JobStoreUnavailableException if that job store cannot be used.
- Parameters:
- Return str:
Job store locator for a usable job store.
- Return type:
- toil.server.wes.tasks.celery¶
- class toil.server.wes.tasks.WorkflowStateMachine(store)¶
Class for managing the WES workflow state machine.
This is the authority on the WES “state” of a workflow. You need one to read or change the state.
Guaranteeing that only certain transitions can be observed is possible but not worth it. Instead, we just let updates clobber each other and grab and cache the first terminal state we see forever. If it becomes important that clients never see e.g. CANCELED -> COMPLETE or COMPLETE -> SYSTEM_ERROR, we can implement a real distributed state machine here.
We do handle making sure that tasks don’t get stuck in CANCELING.
State can be:
“UNKNOWN” “QUEUED” “INITIALIZING” “RUNNING” “PAUSED” “COMPLETE” “EXECUTOR_ERROR” “SYSTEM_ERROR” “CANCELED” “CANCELING”
Uses the state store’s local cache to prevent needing to read things we’ve seen already.
- Parameters:
store (WorkflowStateStore)
- send_enqueue()¶
Send an enqueue message that would move from UNKNOWN to QUEUED.
- Return type:
None
- send_initialize()¶
Send an initialize message that would move from QUEUED to INITIALIZING.
- Return type:
None
- send_run()¶
Send a run message that would move from INITIALIZING to RUNNING.
- Return type:
None
- send_cancel()¶
Send a cancel message that would move to CANCELING from any non-terminal state.
- Return type:
None
- send_canceled()¶
Send a canceled message that would move to CANCELED from CANCELLING.
- Return type:
None
- send_complete()¶
Send a complete message that would move from RUNNING to COMPLETE.
- Return type:
None
- send_executor_error()¶
Send an executor_error message that would move from QUEUED, INITIALIZING, or RUNNING to EXECUTOR_ERROR.
- Return type:
None
- send_system_error()¶
- Send a system_error message that would move from QUEUED, INITIALIZING,
or RUNNING to SYSTEM_ERROR.
- Return type:
None
- toil.server.wes.tasks.connect_to_workflow_state_store(url, workflow_id)¶
Connect to a place to store state for the given workflow, in the state store defined by the given URL.
- Parameters:
- Return type:
- toil.server.wes.tasks.download_file_from_internet(src, dest, content_type=None)¶
Download a file from the Internet and write it to dest.
- toil.server.wes.tasks.download_file_from_s3(src, dest, content_type=None)¶
Download a file from Amazon S3 and write it to dest.
- toil.server.wes.tasks.get_file_class(path)¶
Return the type of the file as a human readable string.
- toil.server.wes.tasks.link_file(src, dest)¶
Create a link to a file from src to dest.
- toil.server.wes.tasks.logger¶
- toil.server.wes.tasks.WAIT_FOR_DEATH_TIMEOUT = 20¶
- class toil.server.wes.tasks.ToilWorkflowRunner(base_scratch_dir, state_store_url, workflow_id, request, engine_options)¶
A class to represent a workflow runner to run the requested workflow.
Responsible for parsing the user request into a shell command, executing that command, and collecting the outputs of the resulting workflow run.
- Parameters:
- write_scratch_file(filename, contents)¶
Write a file to the scratch directory.
- write_workflow(src_url)¶
Fetch the workflow file from its source and write it to a destination file.
- sort_options(workflow_engine_parameters=None)¶
Sort the command line arguments in the order that can be recognized by the workflow execution engine.
- initialize_run()¶
Write workflow and input files and construct a list of shell commands to be executed. Return that list of shell commands that should be executed in order to complete this workflow run.
- Return type:
List[str]
- call_cmd(cmd, cwd)¶
Calls a command with Popen. Writes stdout, stderr, and the command to separate files.
- Parameters:
- Return type:
- run()¶
Construct a command to run a the requested workflow with the options, run it, and deposit the outputs in the output directory.
- Return type:
None
- write_output_files()¶
Fetch all the files that this workflow generated and output information about them to outputs.json.
- Return type:
None
- toil.server.wes.tasks.run_wes_task(base_scratch_dir, state_store_url, workflow_id, request, engine_options)¶
Run a requested workflow.
- Parameters:
base_scratch_dir (str) – Directory where the workflow’s scratch dir will live, under the workflow’s ID.
state_store_url (str) – URL/path at which the server and Celery task communicate about workflow state.
workflow_id (str) – ID of the workflow run.
request (Dict[str, Any])
engine_options (List[str])
- Returns:
the state of the workflow run.
- Return type:
- toil.server.wes.tasks.run_wes¶
- toil.server.wes.tasks.cancel_run(task_id)¶
Send a SIGTERM signal to the process that is running task_id.
- Parameters:
task_id (str)
- Return type:
None
- class toil.server.wes.tasks.TaskRunner¶
Abstraction over the Celery API. Runs our run_wes task and allows canceling it.
We can swap this out in the server to allow testing without Celery.
- static run(args, task_id)¶
Run the given task args with the given ID on Celery.
- class toil.server.wes.tasks.MultiprocessingTaskRunner¶
Bases:
TaskRunnerVersion of TaskRunner that just runs tasks with Multiprocessing.
Can’t use threading because there’s no way to send a cancel signal or exception to a Python thread, if loops in the task (i.e. ToilWorkflowRunner) don’t poll for it.
- static set_up_and_run_task(output_path, args)¶
Set up logging for the process into the given file and then call run_wes_task with the given arguments.
If the process finishes successfully, it will clean up the log, but if the process crashes, the caller must clean up the log.
- classmethod run(args, task_id)¶
Run the given task args with the given ID.