toil.server.wes.tasks

Attributes

logger

WAIT_FOR_DEATH_TIMEOUT

run_wes

Classes

ToilWorkflowRunner

A class to represent a workflow runner to run the requested workflow.

TaskRunner

Abstraction over the Celery API. Runs our run_wes task and allows canceling it.

MultiprocessingTaskRunner

Version of TaskRunner that just runs tasks with Multiprocessing.

Functions

run_wes_task(base_scratch_dir, state_store_url, ...)

Run a requested workflow.

cancel_run(task_id)

Send a signal to the process that is running Celery task task_id.

Module Contents

toil.server.wes.tasks.logger
toil.server.wes.tasks.WAIT_FOR_DEATH_TIMEOUT = 20
class toil.server.wes.tasks.ToilWorkflowRunner(base_scratch_dir, state_store_url, workflow_id, request, engine_options)

A class to represent a workflow runner to run the requested workflow.

Responsible for parsing the user request into a shell command, executing that command, and collecting the outputs of the resulting workflow run.

Parameters:
  • base_scratch_dir (str)

  • state_store_url (str)

  • workflow_id (str)

  • request (dict[str, Any])

  • engine_options (list[str])

scratch_dir
store
state_machine
request
engine_options
wf_type: str
version: str
exec_dir
out_dir
default_job_store
job_store
write_scratch_file(filename, contents)

Write a file to the scratch directory.

Parameters:
  • filename (str)

  • contents (str)

Return type:

None

get_state()
Return type:

str

write_workflow(src_url)

Fetch the workflow file from its source and write it to a destination file.

Parameters:

src_url (str)

Return type:

str

sort_options(workflow_engine_parameters=None)

Sort the command line arguments in the order that can be recognized by the workflow execution engine.

Parameters:

workflow_engine_parameters (dict[str, str | None] | None) – User-specified parameters for this particular workflow. Keys are command-line options, and values are option arguments, or None for options that are flags.

Return type:

list[str]

initialize_run()

Write workflow and input files and construct a list of shell commands to be executed. Return that list of shell commands that should be executed in order to complete this workflow run.

Return type:

list[str]

call_cmd(cmd, cwd)

Calls a command with Popen. Writes stdout, stderr, and the command to separate files.

Parameters:
Return type:

subprocess.Popen[bytes]

run()

Construct a command to run a the requested workflow with the options, run it, and deposit the outputs in the output directory.

Return type:

None

write_output_files()

Fetch all the files that this workflow generated and output information about them to outputs.json.

Return type:

None

toil.server.wes.tasks.run_wes_task(base_scratch_dir, state_store_url, workflow_id, request, engine_options)

Run a requested workflow.

Parameters:
  • base_scratch_dir (str) – Directory where the workflow’s scratch dir will live, under the workflow’s ID.

  • state_store_url (str) – URL/path at which the server and Celery task communicate about workflow state.

  • workflow_id (str) – ID of the workflow run.

  • request (dict[str, Any])

  • engine_options (list[str])

Returns:

the state of the workflow run.

Return type:

str

toil.server.wes.tasks.run_wes
toil.server.wes.tasks.cancel_run(task_id)

Send a signal to the process that is running Celery task task_id.

Parameters:

task_id (str)

Return type:

None

class toil.server.wes.tasks.TaskRunner

Abstraction over the Celery API. Runs our run_wes task and allows canceling it.

We can swap this out in the server to allow testing without Celery.

Note that this is not responsible for acting on or having events for things like failed Celery tasks.

static run(args, task_id)

Run the given task args with the given ID on Celery.

Parameters:
Return type:

None

static cancel(task_id)

Cancel the task with the given ID on Celery.

Parameters:

task_id (str)

Return type:

None

static is_ok(task_id)

Returns True if the task has not yet failed, and False otherwise.

Returns True if the task was successfully canceled.

If False, the task is also not live.

Parameters:

task_id (str)

Return type:

bool

static is_live(task_id)

Returns True if the task has not yet stopped, and False otherwise.

Parameters:

task_id (str)

Return type:

bool

class toil.server.wes.tasks.MultiprocessingTaskRunner

Bases: TaskRunner

Version of TaskRunner that just runs tasks with Multiprocessing.

Can’t use threading because there’s no way to send a cancel signal or exception to a Python thread, if loops and the task (i.e. ToilWorkflowRunner) doesn’t poll for it.

setup_delay = 0
static set_up_and_run_task(output_path, args, setup_delay)

Set up logging for the process into the given file and then call run_wes_task with the given arguments.

If the process finishes successfully, it will clean up the log, but if the process crashes, the caller must clean up the log.

Parameters:
Return type:

None

classmethod run(args, task_id)

Run the given task args with the given ID.

Parameters:
Return type:

None

classmethod cancel(task_id)

Cancel the task with the given ID.

Parameters:

task_id (str)

Return type:

None

classmethod is_ok(task_id)

Return True if the task has not yet failed, and False otherwise.

Returns True if the task was successfully canceled.

If False, the task is also not live.

Parameters:

task_id (str)

Return type:

bool

classmethod is_live(task_id)

Returns True if the task has not yet stopped, and False otherwise.

Parameters:

task_id (str)

Return type:

bool