toil.server.wes.tasks

Attributes

celery

logger

WAIT_FOR_DEATH_TIMEOUT

run_wes

Classes

Toil

A context manager that represents a Toil workflow.

WorkflowStateMachine

Class for managing the WES workflow state machine.

ToilWorkflowRunner

A class to represent a workflow runner to run the requested workflow.

TaskRunner

Abstraction over the Celery API. Runs our run_wes task and allows canceling it.

MultiprocessingTaskRunner

Version of TaskRunner that just runs tasks with Multiprocessing.

Functions

generate_locator(job_store_type[, local_suggestion, ...])

Generate a random locator for a job store of the given type. Raises an

connect_to_workflow_state_store(url, workflow_id)

Connect to a place to store state for the given workflow, in the state

download_file_from_internet(src, dest[, content_type])

Download a file from the Internet and write it to dest.

download_file_from_s3(src, dest[, content_type])

Download a file from Amazon S3 and write it to dest.

get_file_class(path)

Return the type of the file as a human readable string.

get_iso_time()

Return the current time in ISO 8601 format.

link_file(src, dest)

Create a link to a file from src to dest.

run_wes_task(base_scratch_dir, state_store_url, ...)

Run a requested workflow.

cancel_run(task_id)

Send a SIGTERM signal to the process that is running task_id.

Module Contents

class toil.server.wes.tasks.Toil(options)

Bases: ContextManager[Toil]

A context manager that represents a Toil workflow.

Specifically the batch system, job store, and its configuration.

Parameters:

options (argparse.Namespace)

config: Config
__enter__()

Derive configuration from the command line options.

Then load the job store and, on restart, consolidate the derived configuration with the one from the previous invocation of the workflow.

Return type:

Toil

__exit__(exc_type, exc_val, exc_tb)

Clean up after a workflow invocation.

Depending on the configuration, delete the job store.

Parameters:
Return type:

Literal[False]

start(rootJob)

Invoke a Toil workflow with the given job as the root for an initial run.

This method must be called in the body of a with Toil(...) as toil: statement. This method should not be called more than once for a workflow that has not finished.

Parameters:

rootJob (toil.job.Job) – The root job of the workflow

Returns:

The root job’s return value

Return type:

Any

restart()

Restarts a workflow that has been interrupted.

Returns:

The root job’s return value

Return type:

Any

classmethod getJobStore(locator)

Create an instance of the concrete job store implementation that matches the given locator.

Parameters:

locator (str) – The location of the job store to be represent by the instance

Returns:

an instance of a concrete subclass of AbstractJobStore

Return type:

toil.jobStores.abstractJobStore.AbstractJobStore

static parseLocator(locator)
Parameters:

locator (str)

Return type:

Tuple[str, str]

static buildLocator(name, rest)
Parameters:
Return type:

str

classmethod resumeJobStore(locator)
Parameters:

locator (str)

Return type:

toil.jobStores.abstractJobStore.AbstractJobStore

static createBatchSystem(config)

Create an instance of the batch system specified in the given config.

Parameters:

config (Config) – the current configuration

Returns:

an instance of a concrete subclass of AbstractBatchSystem

Return type:

toil.batchSystems.abstractBatchSystem.AbstractBatchSystem

importFile(srcUrl: str, sharedFileName: str, symlink: bool = True) None
importFile(srcUrl: str, sharedFileName: None = None, symlink: bool = True) toil.fileStores.FileID
import_file(src_uri: str, shared_file_name: str, symlink: bool = True, check_existence: bool = True) None
import_file(src_uri: str, shared_file_name: None = None, symlink: bool = True, check_existence: bool = True) toil.fileStores.FileID

Import the file at the given URL into the job store.

By default, returns None if the file does not exist.

Parameters:

check_existence – If true, raise FileNotFoundError if the file does not exist. If false, return None when the file does not exist.

See toil.jobStores.abstractJobStore.AbstractJobStore.importFile() for a full description

exportFile(jobStoreFileID, dstUrl)
Parameters:
Return type:

None

export_file(file_id, dst_uri)

Export file to destination pointed at by the destination URL.

See toil.jobStores.abstractJobStore.AbstractJobStore.exportFile() for a full description

Parameters:
Return type:

None

static normalize_uri(uri, check_existence=False)

Given a URI, if it has no scheme, prepend “file:”.

Parameters:
  • check_existence (bool) – If set, raise FileNotFoundError if a URI points to a local file that does not exist.

  • uri (str)

Return type:

str

static getToilWorkDir(configWorkDir=None)

Return a path to a writable directory under which per-workflow directories exist.

This directory is always required to exist on a machine, even if the Toil worker has not run yet. If your workers and leader have different temp directories, you may need to set TOIL_WORKDIR.

Parameters:

configWorkDir (Optional[str]) – Value passed to the program using the –workDir flag

Returns:

Path to the Toil work directory, constant across all machines

Return type:

str

classmethod get_toil_coordination_dir(config_work_dir, config_coordination_dir)

Return a path to a writable directory, which will be in memory if convenient. Ought to be used for file locking and coordination.

Parameters:
  • config_work_dir (Optional[str]) – Value passed to the program using the –workDir flag

  • config_coordination_dir (Optional[str]) – Value passed to the program using the –coordinationDir flag

  • workflow_id – Used if a tmpdir_prefix exists to create full directory paths unique per workflow

Returns:

Path to the Toil coordination directory. Ought to be on a POSIX filesystem that allows directories containing open files to be deleted.

Return type:

str

static get_workflow_path_component(workflow_id)

Get a safe filesystem path component for a workflow.

Will be consistent for all processes on a given machine, and different for all processes on different machines.

Parameters:

workflow_id (str) – The ID of the current Toil workflow.

Return type:

str

classmethod getLocalWorkflowDir(workflowID, configWorkDir=None)

Return the directory where worker directories and the cache will be located for this workflow on this machine.

Parameters:
  • configWorkDir (Optional[str]) – Value passed to the program using the –workDir flag

  • workflowID (str)

Returns:

Path to the local workflow directory on this machine

Return type:

str

classmethod get_local_workflow_coordination_dir(workflow_id, config_work_dir, config_coordination_dir)

Return the directory where coordination files should be located for this workflow on this machine. These include internal Toil databases and lock files for the machine.

If an in-memory filesystem is available, it is used. Otherwise, the local workflow directory, which may be on a shared network filesystem, is used.

Parameters:
  • workflow_id (str) – Unique ID of the current workflow.

  • config_work_dir (Optional[str]) – Value used for the work directory in the current Toil Config.

  • config_coordination_dir (Optional[str]) – Value used for the coordination directory in the current Toil Config.

Returns:

Path to the local workflow coordination directory on this machine.

Return type:

str

toil.server.wes.tasks.generate_locator(job_store_type, local_suggestion=None, decoration=None)

Generate a random locator for a job store of the given type. Raises an JobStoreUnavailableException if that job store cannot be used.

Parameters:
  • job_store_type (str) – Registry name of the job store to use.

  • local_suggestion (Optional[str]) – Path to a nonexistent local directory suitable for use as a file job store.

  • decoration (Optional[str]) – Extra string to add to the job store locator, if convenient.

Return str:

Job store locator for a usable job store.

Return type:

str

toil.server.wes.tasks.celery
class toil.server.wes.tasks.WorkflowStateMachine(store)

Class for managing the WES workflow state machine.

This is the authority on the WES “state” of a workflow. You need one to read or change the state.

Guaranteeing that only certain transitions can be observed is possible but not worth it. Instead, we just let updates clobber each other and grab and cache the first terminal state we see forever. If it becomes important that clients never see e.g. CANCELED -> COMPLETE or COMPLETE -> SYSTEM_ERROR, we can implement a real distributed state machine here.

We do handle making sure that tasks don’t get stuck in CANCELING.

State can be:

“UNKNOWN” “QUEUED” “INITIALIZING” “RUNNING” “PAUSED” “COMPLETE” “EXECUTOR_ERROR” “SYSTEM_ERROR” “CANCELED” “CANCELING”

Uses the state store’s local cache to prevent needing to read things we’ve seen already.

Parameters:

store (WorkflowStateStore)

send_enqueue()

Send an enqueue message that would move from UNKNOWN to QUEUED.

Return type:

None

send_initialize()

Send an initialize message that would move from QUEUED to INITIALIZING.

Return type:

None

send_run()

Send a run message that would move from INITIALIZING to RUNNING.

Return type:

None

send_cancel()

Send a cancel message that would move to CANCELING from any non-terminal state.

Return type:

None

send_canceled()

Send a canceled message that would move to CANCELED from CANCELLING.

Return type:

None

send_complete()

Send a complete message that would move from RUNNING to COMPLETE.

Return type:

None

send_executor_error()

Send an executor_error message that would move from QUEUED, INITIALIZING, or RUNNING to EXECUTOR_ERROR.

Return type:

None

send_system_error()
Send a system_error message that would move from QUEUED, INITIALIZING,

or RUNNING to SYSTEM_ERROR.

Return type:

None

get_current_state()

Get the current state of the workflow.

Return type:

str

toil.server.wes.tasks.connect_to_workflow_state_store(url, workflow_id)

Connect to a place to store state for the given workflow, in the state store defined by the given URL.

Parameters:
  • url (str) – A URL that can be used for connect_to_state_store()

  • workflow_id (str)

Return type:

WorkflowStateStore

toil.server.wes.tasks.download_file_from_internet(src, dest, content_type=None)

Download a file from the Internet and write it to dest.

Parameters:
  • src (str)

  • dest (str)

  • content_type (Optional[str])

Return type:

None

toil.server.wes.tasks.download_file_from_s3(src, dest, content_type=None)

Download a file from Amazon S3 and write it to dest.

Parameters:
  • src (str)

  • dest (str)

  • content_type (Optional[str])

Return type:

None

toil.server.wes.tasks.get_file_class(path)

Return the type of the file as a human readable string.

Parameters:

path (str)

Return type:

str

toil.server.wes.tasks.get_iso_time()

Return the current time in ISO 8601 format.

Return type:

str

Create a link to a file from src to dest.

Parameters:
Return type:

None

toil.server.wes.tasks.logger
toil.server.wes.tasks.WAIT_FOR_DEATH_TIMEOUT = 20
class toil.server.wes.tasks.ToilWorkflowRunner(base_scratch_dir, state_store_url, workflow_id, request, engine_options)

A class to represent a workflow runner to run the requested workflow.

Responsible for parsing the user request into a shell command, executing that command, and collecting the outputs of the resulting workflow run.

Parameters:
  • base_scratch_dir (str)

  • state_store_url (str)

  • workflow_id (str)

  • request (Dict[str, Any])

  • engine_options (List[str])

write_scratch_file(filename, contents)

Write a file to the scratch directory.

Parameters:
  • filename (str)

  • contents (str)

Return type:

None

get_state()
Return type:

str

write_workflow(src_url)

Fetch the workflow file from its source and write it to a destination file.

Parameters:

src_url (str)

Return type:

str

sort_options(workflow_engine_parameters=None)

Sort the command line arguments in the order that can be recognized by the workflow execution engine.

Parameters:

workflow_engine_parameters (Optional[Dict[str, Optional[str]]]) – User-specified parameters for this particular workflow. Keys are command-line options, and values are option arguments, or None for options that are flags.

Return type:

List[str]

initialize_run()

Write workflow and input files and construct a list of shell commands to be executed. Return that list of shell commands that should be executed in order to complete this workflow run.

Return type:

List[str]

call_cmd(cmd, cwd)

Calls a command with Popen. Writes stdout, stderr, and the command to separate files.

Parameters:
Return type:

subprocess.Popen[bytes]

run()

Construct a command to run a the requested workflow with the options, run it, and deposit the outputs in the output directory.

Return type:

None

write_output_files()

Fetch all the files that this workflow generated and output information about them to outputs.json.

Return type:

None

toil.server.wes.tasks.run_wes_task(base_scratch_dir, state_store_url, workflow_id, request, engine_options)

Run a requested workflow.

Parameters:
  • base_scratch_dir (str) – Directory where the workflow’s scratch dir will live, under the workflow’s ID.

  • state_store_url (str) – URL/path at which the server and Celery task communicate about workflow state.

  • workflow_id (str) – ID of the workflow run.

  • request (Dict[str, Any])

  • engine_options (List[str])

Returns:

the state of the workflow run.

Return type:

str

toil.server.wes.tasks.run_wes
toil.server.wes.tasks.cancel_run(task_id)

Send a SIGTERM signal to the process that is running task_id.

Parameters:

task_id (str)

Return type:

None

class toil.server.wes.tasks.TaskRunner

Abstraction over the Celery API. Runs our run_wes task and allows canceling it.

We can swap this out in the server to allow testing without Celery.

static run(args, task_id)

Run the given task args with the given ID on Celery.

Parameters:
Return type:

None

static cancel(task_id)

Cancel the task with the given ID on Celery.

Parameters:

task_id (str)

Return type:

None

static is_ok(task_id)

Make sure that the task running system is working for the given task. If the task system has detected an internal failure, return False.

Parameters:

task_id (str)

Return type:

bool

class toil.server.wes.tasks.MultiprocessingTaskRunner

Bases: TaskRunner

Version of TaskRunner that just runs tasks with Multiprocessing.

Can’t use threading because there’s no way to send a cancel signal or exception to a Python thread, if loops in the task (i.e. ToilWorkflowRunner) don’t poll for it.

static set_up_and_run_task(output_path, args)

Set up logging for the process into the given file and then call run_wes_task with the given arguments.

If the process finishes successfully, it will clean up the log, but if the process crashes, the caller must clean up the log.

Parameters:
Return type:

None

classmethod run(args, task_id)

Run the given task args with the given ID.

Parameters:
Return type:

None

classmethod cancel(task_id)

Cancel the task with the given ID.

Parameters:

task_id (str)

Return type:

None

classmethod is_ok(task_id)

Make sure that the task running system is working for the given task. If the task system has detected an internal failure, return False.

Parameters:

task_id (str)

Return type:

bool