toil.server.wes.toil_backend

Attributes

TaskLog

baseVersion

logger

Exceptions

OperationForbidden

Raised when the request is forbidden.

VersionNotImplementedException

Raised when the requested workflow version is not implemented.

WorkflowConflictException

Raised when the requested workflow is not in the expected state.

WorkflowExecutionException

Raised when an internal error occurred during the execution of the workflow.

WorkflowNotFoundException

Raised when the requested run ID is not found.

Classes

JobStatus

Records the status of a job.

WorkflowStateMachine

Class for managing the WES workflow state machine.

WESBackend

A class to represent a GA4GH Workflow Execution Service (WES) API backend.

MultiprocessingTaskRunner

Version of TaskRunner that just runs tasks with Multiprocessing.

TaskRunner

Abstraction over the Celery API. Runs our run_wes task and allows canceling it.

ToilWorkflow

ToilBackend

WES backend implemented for Toil to run CWL, WDL, or Toil workflows. This

Functions

replay_message_bus(path)

Replay all the messages and work out what they mean for jobs.

AtomicFileCreate(final_path[, keep])

Context manager to create a temporary file. Entering returns path to

global_mutex(base_dir, mutex)

Context manager that locks a mutex. The mutex is identified by the given

connect_to_workflow_state_store(url, workflow_id)

Connect to a place to store state for the given workflow, in the state

handle_errors(func)

This decorator catches errors from the wrapped function and returns a JSON

Module Contents

class toil.server.wes.toil_backend.JobStatus

Records the status of a job.

job_store_id: str
name: str
exit_code: int
annotations: Dict[str, str]
toil_batch_id: int
external_batch_id: str
batch_system: str
__repr__()

Return repr(self).

Return type:

str

toil.server.wes.toil_backend.replay_message_bus(path)

Replay all the messages and work out what they mean for jobs.

We track the state and name of jobs here, by ID. We would use a list of two items but MyPy can’t understand a list of items of multiple types, so we need to define a new class.

Returns a dictionary from the job_id to a dataclass, JobStatus. A JobStatus contains information about a job which we have gathered from the message bus, including the job store id, name of the job the exit code, any associated annotations, the toil batch id the external batch id, and the batch system on which the job is running.

Parameters:

path (str)

Return type:

Dict[str, JobStatus]

toil.server.wes.toil_backend.AtomicFileCreate(final_path, keep=False)

Context manager to create a temporary file. Entering returns path to the temporary file in the same directory as finalPath. If the code in context succeeds, the file renamed to its actually name. If an error occurs, the file is not installed and is removed unless keep is specified.

Parameters:
Return type:

Iterator[str]

toil.server.wes.toil_backend.global_mutex(base_dir, mutex)

Context manager that locks a mutex. The mutex is identified by the given name, and scoped to the given directory. Works across all containers that have access to the given diectory. Mutexes held by dead processes are automatically released.

Only works between processes, NOT between threads.

Parameters:
  • base_dir (str) – Base directory to work in. Defines the shared namespace.

  • mutex (str) – Mutex to lock. Must be a permissible path component.

Return type:

Iterator[None]

class toil.server.wes.toil_backend.WorkflowStateMachine(store)

Class for managing the WES workflow state machine.

This is the authority on the WES “state” of a workflow. You need one to read or change the state.

Guaranteeing that only certain transitions can be observed is possible but not worth it. Instead, we just let updates clobber each other and grab and cache the first terminal state we see forever. If it becomes important that clients never see e.g. CANCELED -> COMPLETE or COMPLETE -> SYSTEM_ERROR, we can implement a real distributed state machine here.

We do handle making sure that tasks don’t get stuck in CANCELING.

State can be:

“UNKNOWN” “QUEUED” “INITIALIZING” “RUNNING” “PAUSED” “COMPLETE” “EXECUTOR_ERROR” “SYSTEM_ERROR” “CANCELED” “CANCELING”

Uses the state store’s local cache to prevent needing to read things we’ve seen already.

Parameters:

store (WorkflowStateStore)

send_enqueue()

Send an enqueue message that would move from UNKNOWN to QUEUED.

Return type:

None

send_initialize()

Send an initialize message that would move from QUEUED to INITIALIZING.

Return type:

None

send_run()

Send a run message that would move from INITIALIZING to RUNNING.

Return type:

None

send_cancel()

Send a cancel message that would move to CANCELING from any non-terminal state.

Return type:

None

send_canceled()

Send a canceled message that would move to CANCELED from CANCELLING.

Return type:

None

send_complete()

Send a complete message that would move from RUNNING to COMPLETE.

Return type:

None

send_executor_error()

Send an executor_error message that would move from QUEUED, INITIALIZING, or RUNNING to EXECUTOR_ERROR.

Return type:

None

send_system_error()
Send a system_error message that would move from QUEUED, INITIALIZING,

or RUNNING to SYSTEM_ERROR.

Return type:

None

get_current_state()

Get the current state of the workflow.

Return type:

str

toil.server.wes.toil_backend.connect_to_workflow_state_store(url, workflow_id)

Connect to a place to store state for the given workflow, in the state store defined by the given URL.

Parameters:
  • url (str) – A URL that can be used for connect_to_state_store()

  • workflow_id (str)

Return type:

WorkflowStateStore

exception toil.server.wes.toil_backend.OperationForbidden(message)

Bases: Exception

Raised when the request is forbidden.

Parameters:

message (str)

toil.server.wes.toil_backend.TaskLog
exception toil.server.wes.toil_backend.VersionNotImplementedException(wf_type, version=None, supported_versions=None)

Bases: Exception

Raised when the requested workflow version is not implemented.

Parameters:
  • wf_type (str)

  • version (Optional[str])

  • supported_versions (Optional[List[str]])

class toil.server.wes.toil_backend.WESBackend(options)

A class to represent a GA4GH Workflow Execution Service (WES) API backend. Intended to be inherited. Subclasses should implement all abstract methods to handle user requests when they hit different endpoints.

Parameters:

options (List[str])

resolve_operation_id(operation_id)

Map an operationId defined in the OpenAPI or swagger yaml file to a function.

Parameters:

operation_id (str) – The operation ID defined in the specification.

Returns:

A function that should be called when the given endpoint is reached.

Return type:

Any

abstract get_service_info()

Get information about the Workflow Execution Service.

GET /service-info

Return type:

Dict[str, Any]

abstract list_runs(page_size=None, page_token=None)

List the workflow runs.

GET /runs

Parameters:
  • page_size (Optional[int])

  • page_token (Optional[str])

Return type:

Dict[str, Any]

abstract run_workflow()

Run a workflow. This endpoint creates a new workflow run and returns a RunId to monitor its progress.

POST /runs

Return type:

Dict[str, str]

abstract get_run_log(run_id)

Get detailed info about a workflow run.

GET /runs/{run_id}

Parameters:

run_id (str)

Return type:

Dict[str, Any]

abstract cancel_run(run_id)

Cancel a running workflow.

POST /runs/{run_id}/cancel

Parameters:

run_id (str)

Return type:

Dict[str, str]

abstract get_run_status(run_id)

Get quick status info about a workflow run, returning a simple result with the overall state of the workflow run.

GET /runs/{run_id}/status

Parameters:

run_id (str)

Return type:

Dict[str, str]

static log_for_run(run_id, message)
Parameters:
  • run_id (Optional[str])

  • message (str)

Return type:

None

static secure_path(path)
Parameters:

path (str)

Return type:

str

collect_attachments(run_id, temp_dir)

Collect attachments from the current request by staging uploaded files to temp_dir, and return the temp_dir and parsed body of the request.

Parameters:
  • run_id (Optional[str]) – The run ID for logging.

  • temp_dir (Optional[str]) – The directory where uploaded files should be staged. If None, a temporary directory is created.

Return type:

Tuple[str, Dict[str, Any]]

exception toil.server.wes.toil_backend.WorkflowConflictException(run_id)

Bases: Exception

Raised when the requested workflow is not in the expected state.

Parameters:

run_id (str)

exception toil.server.wes.toil_backend.WorkflowExecutionException(message)

Bases: Exception

Raised when an internal error occurred during the execution of the workflow.

Parameters:

message (str)

exception toil.server.wes.toil_backend.WorkflowNotFoundException

Bases: Exception

Raised when the requested run ID is not found.

toil.server.wes.toil_backend.handle_errors(func)

This decorator catches errors from the wrapped function and returns a JSON formatted error message with the appropriate status code defined by the GA4GH WES spec.

Parameters:

func (Callable[Ellipsis, Any])

Return type:

Callable[Ellipsis, Any]

class toil.server.wes.toil_backend.MultiprocessingTaskRunner

Bases: TaskRunner

Version of TaskRunner that just runs tasks with Multiprocessing.

Can’t use threading because there’s no way to send a cancel signal or exception to a Python thread, if loops in the task (i.e. ToilWorkflowRunner) don’t poll for it.

static set_up_and_run_task(output_path, args)

Set up logging for the process into the given file and then call run_wes_task with the given arguments.

If the process finishes successfully, it will clean up the log, but if the process crashes, the caller must clean up the log.

Parameters:
Return type:

None

classmethod run(args, task_id)

Run the given task args with the given ID.

Parameters:
Return type:

None

classmethod cancel(task_id)

Cancel the task with the given ID.

Parameters:

task_id (str)

Return type:

None

classmethod is_ok(task_id)

Make sure that the task running system is working for the given task. If the task system has detected an internal failure, return False.

Parameters:

task_id (str)

Return type:

bool

class toil.server.wes.toil_backend.TaskRunner

Abstraction over the Celery API. Runs our run_wes task and allows canceling it.

We can swap this out in the server to allow testing without Celery.

static run(args, task_id)

Run the given task args with the given ID on Celery.

Parameters:
Return type:

None

static cancel(task_id)

Cancel the task with the given ID on Celery.

Parameters:

task_id (str)

Return type:

None

static is_ok(task_id)

Make sure that the task running system is working for the given task. If the task system has detected an internal failure, return False.

Parameters:

task_id (str)

Return type:

bool

toil.server.wes.toil_backend.baseVersion = '7.0.0'
toil.server.wes.toil_backend.logger
class toil.server.wes.toil_backend.ToilWorkflow(base_work_dir, state_store_url, run_id)
Parameters:
  • base_work_dir (str)

  • state_store_url (str)

  • run_id (str)

fetch_state(key: str, default: str) str
fetch_state(key: str, default: None = None) str | None

Return the contents of the given key in the workflow’s state store. If the key does not exist, the default value is returned.

fetch_scratch(filename)

Get a context manager for either a stream for the given file from the workflow’s scratch directory, or None if it isn’t there.

Parameters:

filename (str)

Return type:

Generator[Optional[TextIO], None, None]

exists()

Return True if the workflow run exists.

Return type:

bool

get_state()

Return the state of the current run.

Return type:

str

check_on_run(task_runner)

Check to make sure nothing has gone wrong in the task runner for this workflow. If something has, log, and fail the workflow with an error.

Parameters:

task_runner (Type[toil.server.wes.tasks.TaskRunner])

Return type:

None

set_up_run()

Set up necessary directories for the run.

Return type:

None

clean_up()

Clean directory and files related to the run.

Return type:

None

queue_run(task_runner, request, options)

This workflow should be ready to run. Hand this to the task system.

Parameters:
Return type:

None

get_output_files()

Return a collection of output files that this workflow generated.

Return type:

Any

get_stdout_path()

Return the path to the standard output log, relative to the run’s scratch_dir, or None if it doesn’t exist.

Return type:

Optional[str]

get_stderr_path()

Return the path to the standard output log, relative to the run’s scratch_dir, or None if it doesn’t exist.

Return type:

Optional[str]

get_messages_path()

Return the path to the bus message log, relative to the run’s scratch_dir, or None if it doesn’t exist.

Return type:

Optional[str]

get_task_logs(filter_function=None)

Return all the task log objects for the individual tasks in the workflow.

Task names will be the job_type values from issued/completed/failed messages, with annotations from JobAnnotationMessage messages if available.

Parameters:

filter_function (Optional[Callable[[toil.server.wes.abstract_backend.TaskLog, toil.bus.JobStatus], Optional[toil.server.wes.abstract_backend.TaskLog]]]) – If set, will be called with each task log and its job annotations. Returns a modified copy of the task log to actually report, or None if the task log should be omitted.

Return type:

List[Dict[str, Union[str, int, None]]]

class toil.server.wes.toil_backend.ToilBackend(work_dir, state_store, options, dest_bucket_base, bypass_celery=False, wes_dialect='standard')

Bases: toil.server.wes.abstract_backend.WESBackend

WES backend implemented for Toil to run CWL, WDL, or Toil workflows. This class is responsible for validating and executing submitted workflows.

Parameters:
  • work_dir (str)

  • state_store (Optional[str])

  • options (List[str])

  • dest_bucket_base (Optional[str])

  • bypass_celery (bool)

  • wes_dialect (str)

get_runs()

A generator of a list of run ids and their state.

Return type:

Generator[Tuple[str, str], None, None]

get_state(run_id)

Return the state of the workflow run with the given run ID. May raise an error if the workflow does not exist.

Parameters:

run_id (str)

Return type:

str

get_service_info()

Get information about the Workflow Execution Service.

Return type:

Dict[str, Any]

list_runs(page_size=None, page_token=None)

List the workflow runs.

Parameters:
  • page_size (Optional[int])

  • page_token (Optional[str])

Return type:

Dict[str, Any]

run_workflow()

Run a workflow.

Return type:

Dict[str, str]

get_run_log(run_id)

Get detailed info about a workflow run.

Parameters:

run_id (str)

Return type:

Dict[str, Any]

cancel_run(run_id)

Cancel a running workflow.

Parameters:

run_id (str)

Return type:

Dict[str, str]

get_run_status(run_id)

Get quick status info about a workflow run, returning a simple result with the overall state of the workflow run.

Parameters:

run_id (str)

Return type:

Dict[str, str]

get_stdout(run_id)

Get the stdout of a workflow run as a static file.

Parameters:

run_id (str)

Return type:

Any

get_stderr(run_id)

Get the stderr of a workflow run as a static file.

Parameters:

run_id (str)

Return type:

Any

get_health()

Return successfully if the server is healthy.

Return type:

werkzeug.wrappers.response.Response

get_homepage()

Provide a sensible result for / other than 404.

Return type:

werkzeug.wrappers.response.Response