toil.server.wes.tasks

Module Contents

Classes

ToilWorkflowRunner

A class to represent a workflow runner to run the requested workflow.

TaskRunner

Abstraction over the Celery API. Runs our run_wes task and allows canceling it.

MultiprocessingTaskRunner

Version of TaskRunner that just runs tasks with Multiprocessing.

Functions

run_wes_task(base_scratch_dir, state_store_url, ...)

Run a requested workflow.

cancel_run(task_id)

Send a SIGTERM signal to the process that is running task_id.

Attributes

logger

WAIT_FOR_DEATH_TIMEOUT

run_wes

toil.server.wes.tasks.logger
toil.server.wes.tasks.WAIT_FOR_DEATH_TIMEOUT = 20
class toil.server.wes.tasks.ToilWorkflowRunner(base_scratch_dir, state_store_url, workflow_id, request, engine_options)

A class to represent a workflow runner to run the requested workflow.

Responsible for parsing the user request into a shell command, executing that command, and collecting the outputs of the resulting workflow run.

Parameters:
  • base_scratch_dir (str) –

  • state_store_url (str) –

  • workflow_id (str) –

  • request (Dict[str, Any]) –

  • engine_options (List[str]) –

write_scratch_file(filename, contents)

Write a file to the scratch directory.

Parameters:
  • filename (str) –

  • contents (str) –

Return type:

None

get_state()
Return type:

str

write_workflow(src_url)

Fetch the workflow file from its source and write it to a destination file.

Parameters:

src_url (str) –

Return type:

str

sort_options(workflow_engine_parameters=None)

Sort the command line arguments in the order that can be recognized by the workflow execution engine.

Parameters:

workflow_engine_parameters (Optional[Dict[str, Optional[str]]]) – User-specified parameters for this particular workflow. Keys are command-line options, and values are option arguments, or None for options that are flags.

Return type:

List[str]

initialize_run()

Write workflow and input files and construct a list of shell commands to be executed. Return that list of shell commands that should be executed in order to complete this workflow run.

Return type:

List[str]

call_cmd(cmd, cwd)

Calls a command with Popen. Writes stdout, stderr, and the command to separate files.

Parameters:
  • cmd (Union[List[str], str]) –

  • cwd (str) –

Return type:

subprocess.Popen[bytes]

run()

Construct a command to run a the requested workflow with the options, run it, and deposit the outputs in the output directory.

Return type:

None

write_output_files()

Fetch all the files that this workflow generated and output information about them to outputs.json.

Return type:

None

toil.server.wes.tasks.run_wes_task(base_scratch_dir, state_store_url, workflow_id, request, engine_options)

Run a requested workflow.

Parameters:
  • base_scratch_dir (str) – Directory where the workflow’s scratch dir will live, under the workflow’s ID.

  • state_store_url (str) – URL/path at which the server and Celery task communicate about workflow state.

  • workflow_id (str) – ID of the workflow run.

  • request (Dict[str, Any]) –

  • engine_options (List[str]) –

Returns:

the state of the workflow run.

Return type:

str

toil.server.wes.tasks.run_wes
toil.server.wes.tasks.cancel_run(task_id)

Send a SIGTERM signal to the process that is running task_id.

Parameters:

task_id (str) –

Return type:

None

class toil.server.wes.tasks.TaskRunner

Abstraction over the Celery API. Runs our run_wes task and allows canceling it.

We can swap this out in the server to allow testing without Celery.

static run(args, task_id)

Run the given task args with the given ID on Celery.

Parameters:
Return type:

None

static cancel(task_id)

Cancel the task with the given ID on Celery.

Parameters:

task_id (str) –

Return type:

None

static is_ok(task_id)

Make sure that the task running system is working for the given task. If the task system has detected an internal failure, return False.

Parameters:

task_id (str) –

Return type:

bool

class toil.server.wes.tasks.MultiprocessingTaskRunner

Bases: TaskRunner

Version of TaskRunner that just runs tasks with Multiprocessing.

Can’t use threading because there’s no way to send a cancel signal or exception to a Python thread, if loops in the task (i.e. ToilWorkflowRunner) don’t poll for it.

static set_up_and_run_task(output_path, args)

Set up logging for the process into the given file and then call run_wes_task with the given arguments.

If the process finishes successfully, it will clean up the log, but if the process crashes, the caller must clean up the log.

Parameters:
Return type:

None

classmethod run(args, task_id)

Run the given task args with the given ID.

Parameters:
Return type:

None

classmethod cancel(task_id)

Cancel the task with the given ID.

Parameters:

task_id (str) –

Return type:

None

classmethod is_ok(task_id)

Make sure that the task running system is working for the given task. If the task system has detected an internal failure, return False.

Parameters:

task_id (str) –

Return type:

bool