toil.server.wes.tasks

Module Contents

Classes

ToilWorkflowRunner

A class to represent a workflow runner to run the requested workflow.

TaskRunner

Abstraction over the Celery API. Runs our run_wes task and allows canceling it.

MultiprocessingTaskRunner

Version of TaskRunner that just runs tasks with Multiprocessing.

Functions

run_wes_task(base_scratch_dir, state_store_url, ...)

Run a requested workflow.

cancel_run(task_id)

Send a SIGTERM signal to the process that is running task_id.

Attributes

logger

WAIT_FOR_DEATH_TIMEOUT

run_wes

toil.server.wes.tasks.logger
toil.server.wes.tasks.WAIT_FOR_DEATH_TIMEOUT = 20
class toil.server.wes.tasks.ToilWorkflowRunner(base_scratch_dir, state_store_url, workflow_id, request, engine_options)

A class to represent a workflow runner to run the requested workflow.

Responsible for parsing the user request into a shell command, executing that command, and collecting the outputs of the resulting workflow run.

Parameters
  • base_scratch_dir (str) –

  • state_store_url (str) –

  • workflow_id (str) –

  • request (Dict[str, Any]) –

  • engine_options (List[str]) –

write_scratch_file(filename, contents)

Write a file to the scratch directory.

Parameters
  • filename (str) –

  • contents (str) –

Return type

None

get_state()
Return type

str

write_workflow(src_url)

Fetch the workflow file from its source and write it to a destination file.

Parameters

src_url (str) –

Return type

str

sort_options(workflow_engine_parameters=None)

Sort the command line arguments in the order that can be recognized by the workflow execution engine.

Parameters

workflow_engine_parameters (Optional[Dict[str, Optional[str]]]) – User-specified parameters for this

Return type

List[str]

particular workflow. Keys are command-line options, and values are option arguments, or None for options that are flags.

initialize_run()

Write workflow and input files and construct a list of shell commands to be executed. Return that list of shell commands that should be executed in order to complete this workflow run.

Return type

List[str]

call_cmd(cmd, cwd)

Calls a command with Popen. Writes stdout, stderr, and the command to separate files.

Parameters
  • cmd (Union[List[str], str]) –

  • cwd (str) –

Return type

subprocess.Popen[bytes]

run()

Construct a command to run a the requested workflow with the options, run it, and deposit the outputs in the output directory.

Return type

None

write_output_files()

Fetch all the files that this workflow generated and output information about them to outputs.json.

Return type

None

toil.server.wes.tasks.run_wes_task(base_scratch_dir, state_store_url, workflow_id, request, engine_options)

Run a requested workflow.

Parameters
  • base_scratch_dir (str) – Directory where the workflow’s scratch dir will live, under the workflow’s ID.

  • state_store_url (str) – URL/path at which the server and Celery task communicate about workflow state.

  • workflow_id (str) – ID of the workflow run.

  • request (Dict[str, Any]) –

  • engine_options (List[str]) –

Returns

the state of the workflow run.

Return type

str

toil.server.wes.tasks.run_wes
toil.server.wes.tasks.cancel_run(task_id)

Send a SIGTERM signal to the process that is running task_id.

Parameters

task_id (str) –

Return type

None

class toil.server.wes.tasks.TaskRunner

Abstraction over the Celery API. Runs our run_wes task and allows canceling it.

We can swap this out in the server to allow testing without Celery.

static run(args, task_id)

Run the given task args with the given ID on Celery.

Parameters
Return type

None

static cancel(task_id)

Cancel the task with the given ID on Celery.

Parameters

task_id (str) –

Return type

None

static is_ok(task_id)

Make sure that the task running system is working for the given task. If the task system has detected an internal failure, return False.

Parameters

task_id (str) –

Return type

bool

class toil.server.wes.tasks.MultiprocessingTaskRunner

Bases: TaskRunner

digraph inheritance47fd0023c3 { bgcolor=transparent; rankdir=LR; size="8.0, 12.0"; "MultiprocessingTaskRunner" [URL="#toil.server.wes.tasks.MultiprocessingTaskRunner",fillcolor=white,fontname="Vera Sans, DejaVu Sans, Liberation Sans, Arial, Helvetica, sans",fontsize=10,height=0.25,shape=box,style="setlinewidth(0.5),filled",target="_top",tooltip="Version of TaskRunner that just runs tasks with Multiprocessing."]; "TaskRunner" -> "MultiprocessingTaskRunner" [arrowsize=0.5,style="setlinewidth(0.5)"]; "TaskRunner" [URL="#toil.server.wes.tasks.TaskRunner",fillcolor=white,fontname="Vera Sans, DejaVu Sans, Liberation Sans, Arial, Helvetica, sans",fontsize=10,height=0.25,shape=box,style="setlinewidth(0.5),filled",target="_top",tooltip="Abstraction over the Celery API. Runs our run_wes task and allows canceling it."]; }

Version of TaskRunner that just runs tasks with Multiprocessing.

Can’t use threading because there’s no way to send a cancel signal or exception to a Python thread, if loops in the task (i.e. ToilWorkflowRunner) don’t poll for it.

static set_up_and_run_task(output_path, args)

Set up logging for the process into the given file and then call run_wes_task with the given arguments.

If the process finishes successfully, it will clean up the log, but if the process crashes, the caller must clean up the log.

Parameters
Return type

None

classmethod run(args, task_id)

Run the given task args with the given ID.

Parameters
Return type

None

classmethod cancel(task_id)

Cancel the task with the given ID.

Parameters

task_id (str) –

Return type

None

classmethod is_ok(task_id)

Make sure that the task running system is working for the given task. If the task system has detected an internal failure, return False.

Parameters

task_id (str) –

Return type

bool