toil.server.wes.toil_backend¶
Attributes¶
Exceptions¶
Raised when the request is forbidden. |
|
Raised when the requested workflow version is not implemented. |
|
Raised when the requested workflow is not in the expected state. |
|
Raised when an internal error occurred during the execution of the workflow. |
|
Raised when the requested run ID is not found. |
Classes¶
Records the status of a job. |
|
Class for managing the WES workflow state machine. |
|
A class to represent a GA4GH Workflow Execution Service (WES) API backend. |
|
Version of TaskRunner that just runs tasks with Multiprocessing. |
|
Abstraction over the Celery API. Runs our run_wes task and allows canceling it. |
|
WES backend implemented for Toil to run CWL, WDL, or Toil workflows. This |
Functions¶
|
Replay all the messages and work out what they mean for jobs. |
|
Context manager to create a temporary file. Entering returns path to |
|
Context manager that locks a mutex. The mutex is identified by the given |
|
Connect to a place to store state for the given workflow, in the state |
|
This decorator catches errors from the wrapped function and returns a JSON |
Module Contents¶
- class toil.server.wes.toil_backend.JobStatus¶
Records the status of a job.
- toil.server.wes.toil_backend.replay_message_bus(path)¶
Replay all the messages and work out what they mean for jobs.
We track the state and name of jobs here, by ID. We would use a list of two items but MyPy can’t understand a list of items of multiple types, so we need to define a new class.
Returns a dictionary from the job_id to a dataclass, JobStatus. A JobStatus contains information about a job which we have gathered from the message bus, including the job store id, name of the job the exit code, any associated annotations, the toil batch id the external batch id, and the batch system on which the job is running.
- toil.server.wes.toil_backend.AtomicFileCreate(final_path, keep=False)¶
Context manager to create a temporary file. Entering returns path to the temporary file in the same directory as finalPath. If the code in context succeeds, the file renamed to its actually name. If an error occurs, the file is not installed and is removed unless keep is specified.
- toil.server.wes.toil_backend.global_mutex(base_dir, mutex)¶
Context manager that locks a mutex. The mutex is identified by the given name, and scoped to the given directory. Works across all containers that have access to the given diectory. Mutexes held by dead processes are automatically released.
Only works between processes, NOT between threads.
- class toil.server.wes.toil_backend.WorkflowStateMachine(store)¶
Class for managing the WES workflow state machine.
This is the authority on the WES “state” of a workflow. You need one to read or change the state.
Guaranteeing that only certain transitions can be observed is possible but not worth it. Instead, we just let updates clobber each other and grab and cache the first terminal state we see forever. If it becomes important that clients never see e.g. CANCELED -> COMPLETE or COMPLETE -> SYSTEM_ERROR, we can implement a real distributed state machine here.
We do handle making sure that tasks don’t get stuck in CANCELING.
State can be:
“UNKNOWN” “QUEUED” “INITIALIZING” “RUNNING” “PAUSED” “COMPLETE” “EXECUTOR_ERROR” “SYSTEM_ERROR” “CANCELED” “CANCELING”
Uses the state store’s local cache to prevent needing to read things we’ve seen already.
- Parameters:
store (WorkflowStateStore)
- send_enqueue()¶
Send an enqueue message that would move from UNKNOWN to QUEUED.
- Return type:
None
- send_initialize()¶
Send an initialize message that would move from QUEUED to INITIALIZING.
- Return type:
None
- send_run()¶
Send a run message that would move from INITIALIZING to RUNNING.
- Return type:
None
- send_cancel()¶
Send a cancel message that would move to CANCELING from any non-terminal state.
- Return type:
None
- send_canceled()¶
Send a canceled message that would move to CANCELED from CANCELLING.
- Return type:
None
- send_complete()¶
Send a complete message that would move from RUNNING to COMPLETE.
- Return type:
None
- send_executor_error()¶
Send an executor_error message that would move from QUEUED, INITIALIZING, or RUNNING to EXECUTOR_ERROR.
- Return type:
None
- send_system_error()¶
- Send a system_error message that would move from QUEUED, INITIALIZING,
or RUNNING to SYSTEM_ERROR.
- Return type:
None
- toil.server.wes.toil_backend.connect_to_workflow_state_store(url, workflow_id)¶
Connect to a place to store state for the given workflow, in the state store defined by the given URL.
- Parameters:
- Return type:
- exception toil.server.wes.toil_backend.OperationForbidden(message)¶
Bases:
ExceptionRaised when the request is forbidden.
- Parameters:
message (str)
- toil.server.wes.toil_backend.TaskLog¶
- exception toil.server.wes.toil_backend.VersionNotImplementedException(wf_type, version=None, supported_versions=None)¶
Bases:
ExceptionRaised when the requested workflow version is not implemented.
- class toil.server.wes.toil_backend.WESBackend(options)¶
A class to represent a GA4GH Workflow Execution Service (WES) API backend. Intended to be inherited. Subclasses should implement all abstract methods to handle user requests when they hit different endpoints.
- Parameters:
options (List[str])
- resolve_operation_id(operation_id)¶
Map an operationId defined in the OpenAPI or swagger yaml file to a function.
- Parameters:
operation_id (str) – The operation ID defined in the specification.
- Returns:
A function that should be called when the given endpoint is reached.
- Return type:
Any
- abstract get_service_info()¶
Get information about the Workflow Execution Service.
GET /service-info
- Return type:
Dict[str, Any]
- abstract list_runs(page_size=None, page_token=None)¶
List the workflow runs.
GET /runs
- abstract run_workflow()¶
Run a workflow. This endpoint creates a new workflow run and returns a RunId to monitor its progress.
POST /runs
- abstract get_run_log(run_id)¶
Get detailed info about a workflow run.
GET /runs/{run_id}
- abstract cancel_run(run_id)¶
Cancel a running workflow.
POST /runs/{run_id}/cancel
- abstract get_run_status(run_id)¶
Get quick status info about a workflow run, returning a simple result with the overall state of the workflow run.
GET /runs/{run_id}/status
- static log_for_run(run_id, message)¶
- collect_attachments(run_id, temp_dir)¶
Collect attachments from the current request by staging uploaded files to temp_dir, and return the temp_dir and parsed body of the request.
- exception toil.server.wes.toil_backend.WorkflowConflictException(run_id)¶
Bases:
ExceptionRaised when the requested workflow is not in the expected state.
- Parameters:
run_id (str)
- exception toil.server.wes.toil_backend.WorkflowExecutionException(message)¶
Bases:
ExceptionRaised when an internal error occurred during the execution of the workflow.
- Parameters:
message (str)
- exception toil.server.wes.toil_backend.WorkflowNotFoundException¶
Bases:
ExceptionRaised when the requested run ID is not found.
- toil.server.wes.toil_backend.handle_errors(func)¶
This decorator catches errors from the wrapped function and returns a JSON formatted error message with the appropriate status code defined by the GA4GH WES spec.
- Parameters:
func (Callable[Ellipsis, Any])
- Return type:
Callable[Ellipsis, Any]
- class toil.server.wes.toil_backend.MultiprocessingTaskRunner¶
Bases:
TaskRunnerVersion of TaskRunner that just runs tasks with Multiprocessing.
Can’t use threading because there’s no way to send a cancel signal or exception to a Python thread, if loops in the task (i.e. ToilWorkflowRunner) don’t poll for it.
- static set_up_and_run_task(output_path, args)¶
Set up logging for the process into the given file and then call run_wes_task with the given arguments.
If the process finishes successfully, it will clean up the log, but if the process crashes, the caller must clean up the log.
- classmethod run(args, task_id)¶
Run the given task args with the given ID.
- class toil.server.wes.toil_backend.TaskRunner¶
Abstraction over the Celery API. Runs our run_wes task and allows canceling it.
We can swap this out in the server to allow testing without Celery.
- static run(args, task_id)¶
Run the given task args with the given ID on Celery.
- toil.server.wes.toil_backend.baseVersion = '7.0.0'¶
- toil.server.wes.toil_backend.logger¶
- class toil.server.wes.toil_backend.ToilWorkflow(base_work_dir, state_store_url, run_id)¶
-
- fetch_state(key: str, default: str) str¶
- fetch_state(key: str, default: None = None) str | None
Return the contents of the given key in the workflow’s state store. If the key does not exist, the default value is returned.
- fetch_scratch(filename)¶
Get a context manager for either a stream for the given file from the workflow’s scratch directory, or None if it isn’t there.
- Parameters:
filename (str)
- Return type:
Generator[Optional[TextIO], None, None]
- check_on_run(task_runner)¶
Check to make sure nothing has gone wrong in the task runner for this workflow. If something has, log, and fail the workflow with an error.
- Parameters:
task_runner (Type[toil.server.wes.tasks.TaskRunner])
- Return type:
None
- set_up_run()¶
Set up necessary directories for the run.
- Return type:
None
- clean_up()¶
Clean directory and files related to the run.
- Return type:
None
- queue_run(task_runner, request, options)¶
This workflow should be ready to run. Hand this to the task system.
- Parameters:
task_runner (Type[toil.server.wes.tasks.TaskRunner])
request (Dict[str, Any])
options (List[str])
- Return type:
None
- get_output_files()¶
Return a collection of output files that this workflow generated.
- Return type:
Any
- get_stdout_path()¶
Return the path to the standard output log, relative to the run’s scratch_dir, or None if it doesn’t exist.
- Return type:
Optional[str]
- get_stderr_path()¶
Return the path to the standard output log, relative to the run’s scratch_dir, or None if it doesn’t exist.
- Return type:
Optional[str]
- get_messages_path()¶
Return the path to the bus message log, relative to the run’s scratch_dir, or None if it doesn’t exist.
- Return type:
Optional[str]
- get_task_logs(filter_function=None)¶
Return all the task log objects for the individual tasks in the workflow.
Task names will be the job_type values from issued/completed/failed messages, with annotations from JobAnnotationMessage messages if available.
- Parameters:
filter_function (Optional[Callable[[toil.server.wes.abstract_backend.TaskLog, toil.bus.JobStatus], Optional[toil.server.wes.abstract_backend.TaskLog]]]) – If set, will be called with each task log and its job annotations. Returns a modified copy of the task log to actually report, or None if the task log should be omitted.
- Return type:
- class toil.server.wes.toil_backend.ToilBackend(work_dir, state_store, options, dest_bucket_base, bypass_celery=False, wes_dialect='standard')¶
Bases:
toil.server.wes.abstract_backend.WESBackendWES backend implemented for Toil to run CWL, WDL, or Toil workflows. This class is responsible for validating and executing submitted workflows.
- Parameters:
- get_runs()¶
A generator of a list of run ids and their state.
- get_state(run_id)¶
Return the state of the workflow run with the given run ID. May raise an error if the workflow does not exist.
- get_service_info()¶
Get information about the Workflow Execution Service.
- Return type:
Dict[str, Any]
- list_runs(page_size=None, page_token=None)¶
List the workflow runs.
- get_run_log(run_id)¶
Get detailed info about a workflow run.
- get_run_status(run_id)¶
Get quick status info about a workflow run, returning a simple result with the overall state of the workflow run.
- get_stdout(run_id)¶
Get the stdout of a workflow run as a static file.
- Parameters:
run_id (str)
- Return type:
Any
- get_stderr(run_id)¶
Get the stderr of a workflow run as a static file.
- Parameters:
run_id (str)
- Return type:
Any
- get_health()¶
Return successfully if the server is healthy.
- Return type:
werkzeug.wrappers.response.Response
- get_homepage()¶
Provide a sensible result for / other than 404.
- Return type:
werkzeug.wrappers.response.Response