toil.server.utils

Attributes

HAVE_S3

logger

state_store_cache

TERMINAL_STATES

MAX_CANCELING_SECONDS

Classes

MemoryStateCache

An in-memory place to store workflow state.

AbstractStateStore

A place for the WES server to keep its state: the set of workflows that

MemoryStateStore

An in-memory place to store workflow state, for testing.

FileStateStore

A place to store workflow state that uses a POSIX-compatible file system.

S3StateStore

A place to store workflow state that uses an S3-compatible object store.

WorkflowStateStore

Slice of a state store for the state of a particular workflow.

WorkflowStateMachine

Class for managing the WES workflow state machine.

Functions

get_iso_time()

Return the current time in ISO 8601 format.

link_file(src, dest)

Create a link to a file from src to dest.

download_file_from_internet(src, dest[, content_type])

Download a file from the Internet and write it to dest.

download_file_from_s3(src, dest[, content_type])

Download a file from Amazon S3 and write it to dest.

get_file_class(path)

Return the type of the file as a human readable string.

safe_read_file(file)

Safely read a file by acquiring a shared lock to prevent other processes

safe_write_file(file, s)

Safely write to a file by acquiring an exclusive lock to prevent other

connect_to_state_store(url)

Connect to a place to store state for workflows, defined by a URL.

connect_to_workflow_state_store(url, workflow_id)

Connect to a place to store state for the given workflow, in the state

Module Contents

toil.server.utils.HAVE_S3 = True
toil.server.utils.logger
toil.server.utils.get_iso_time()[source]

Return the current time in ISO 8601 format.

Return type:

str

Create a link to a file from src to dest.

Parameters:
Return type:

None

toil.server.utils.download_file_from_internet(src, dest, content_type=None)[source]

Download a file from the Internet and write it to dest.

Parameters:
  • src (str)

  • dest (str)

  • content_type (Optional[str])

Return type:

None

toil.server.utils.download_file_from_s3(src, dest, content_type=None)[source]

Download a file from Amazon S3 and write it to dest.

Parameters:
  • src (str)

  • dest (str)

  • content_type (Optional[str])

Return type:

None

toil.server.utils.get_file_class(path)[source]

Return the type of the file as a human readable string.

Parameters:

path (str)

Return type:

str

toil.server.utils.safe_read_file(file)[source]

Safely read a file by acquiring a shared lock to prevent other processes from writing to it while reading.

Parameters:

file (str)

Return type:

Optional[str]

toil.server.utils.safe_write_file(file, s)[source]

Safely write to a file by acquiring an exclusive lock to prevent other processes from reading and writing to it while writing.

Parameters:
Return type:

None

class toil.server.utils.MemoryStateCache[source]

An in-memory place to store workflow state.

get(workflow_id, key)[source]

Get a key value from memory.

Parameters:
Return type:

Optional[str]

set(workflow_id, key, value)[source]

Set or clear a key value in memory.

Parameters:
  • workflow_id (str)

  • key (str)

  • value (Optional[str])

Return type:

None

class toil.server.utils.AbstractStateStore[source]

A place for the WES server to keep its state: the set of workflows that exist and whether they are done or not.

This is a key-value store, with keys namespaced by workflow ID. Concurrent access from multiple threads or processes is safe and globally consistent.

Keys and workflow IDs are restricted to [-a-zA-Z0-9_], because backends may use them as path or URL components.

Key values are either a string, or None if the key is not set.

Workflow existence isn’t a thing; nonexistent workflows just have None for all keys.

Note that we don’t yet have a cleanup operation: things are stored permanently. Even clearing all the keys may leave data behind.

Also handles storage for a local cache, with a separate key namespace (not a read/write-through cache).

TODO: Can we replace this with just using a JobStore eventually, when AWSJobStore no longer needs SimpleDB?

abstract get(workflow_id, key)[source]

Get the value of the given key for the given workflow, or None if the key is not set for the workflow.

Parameters:
Return type:

Optional[str]

abstract set(workflow_id, key, value)[source]

Set the value of the given key for the given workflow. If the value is None, clear the key.

Parameters:
  • workflow_id (str)

  • key (str)

  • value (Optional[str])

Return type:

None

read_cache(workflow_id, key)[source]

Read a value from a local cache, without checking the actual backend.

Parameters:
Return type:

Optional[str]

write_cache(workflow_id, key, value)[source]

Write a value to a local cache, without modifying the actual backend.

Parameters:
  • workflow_id (str)

  • key (str)

  • value (Optional[str])

Return type:

None

class toil.server.utils.MemoryStateStore[source]

Bases: MemoryStateCache, AbstractStateStore

An in-memory place to store workflow state, for testing.

Inherits from MemoryStateCache first to provide implementations for AbstractStateStore.

class toil.server.utils.FileStateStore(url)[source]

Bases: AbstractStateStore

A place to store workflow state that uses a POSIX-compatible file system.

Parameters:

url (str)

get(workflow_id, key)[source]

Get a key value from the filesystem.

Parameters:
Return type:

Optional[str]

set(workflow_id, key, value)[source]

Set or clear a key value on the filesystem.

Parameters:
  • workflow_id (str)

  • key (str)

  • value (Optional[str])

Return type:

None

class toil.server.utils.S3StateStore(url)

Bases: AbstractStateStore

A place to store workflow state that uses an S3-compatible object store.

Parameters:

url (str)

get(workflow_id, key)

Get a key value from S3.

Parameters:
Return type:

Optional[str]

set(workflow_id, key, value)

Set or clear a key value on S3.

Parameters:
  • workflow_id (str)

  • key (str)

  • value (Optional[str])

Return type:

None

toil.server.utils.state_store_cache: Dict[str, AbstractStateStore]
toil.server.utils.connect_to_state_store(url)[source]

Connect to a place to store state for workflows, defined by a URL.

URL may be a local file path or URL or an S3 URL.

Parameters:

url (str)

Return type:

AbstractStateStore

class toil.server.utils.WorkflowStateStore(state_store, workflow_id)[source]

Slice of a state store for the state of a particular workflow.

Parameters:
get(key)[source]

Get the given item of workflow state.

Parameters:

key (str)

Return type:

Optional[str]

set(key, value)[source]

Set the given item of workflow state.

Parameters:
  • key (str)

  • value (Optional[str])

Return type:

None

read_cache(key)[source]

Read a value from a local cache, without checking the actual backend.

Parameters:

key (str)

Return type:

Optional[str]

write_cache(key, value)[source]

Write a value to a local cache, without modifying the actual backend.

Parameters:
  • key (str)

  • value (Optional[str])

Return type:

None

toil.server.utils.connect_to_workflow_state_store(url, workflow_id)[source]

Connect to a place to store state for the given workflow, in the state store defined by the given URL.

Parameters:
  • url (str) – A URL that can be used for connect_to_state_store()

  • workflow_id (str)

Return type:

WorkflowStateStore

toil.server.utils.TERMINAL_STATES
toil.server.utils.MAX_CANCELING_SECONDS = 30
class toil.server.utils.WorkflowStateMachine(store)[source]

Class for managing the WES workflow state machine.

This is the authority on the WES “state” of a workflow. You need one to read or change the state.

Guaranteeing that only certain transitions can be observed is possible but not worth it. Instead, we just let updates clobber each other and grab and cache the first terminal state we see forever. If it becomes important that clients never see e.g. CANCELED -> COMPLETE or COMPLETE -> SYSTEM_ERROR, we can implement a real distributed state machine here.

We do handle making sure that tasks don’t get stuck in CANCELING.

State can be:

“UNKNOWN” “QUEUED” “INITIALIZING” “RUNNING” “PAUSED” “COMPLETE” “EXECUTOR_ERROR” “SYSTEM_ERROR” “CANCELED” “CANCELING”

Uses the state store’s local cache to prevent needing to read things we’ve seen already.

Parameters:

store (WorkflowStateStore)

send_enqueue()[source]

Send an enqueue message that would move from UNKNOWN to QUEUED.

Return type:

None

send_initialize()[source]

Send an initialize message that would move from QUEUED to INITIALIZING.

Return type:

None

send_run()[source]

Send a run message that would move from INITIALIZING to RUNNING.

Return type:

None

send_cancel()[source]

Send a cancel message that would move to CANCELING from any non-terminal state.

Return type:

None

send_canceled()[source]

Send a canceled message that would move to CANCELED from CANCELLING.

Return type:

None

send_complete()[source]

Send a complete message that would move from RUNNING to COMPLETE.

Return type:

None

send_executor_error()[source]

Send an executor_error message that would move from QUEUED, INITIALIZING, or RUNNING to EXECUTOR_ERROR.

Return type:

None

send_system_error()[source]
Send a system_error message that would move from QUEUED, INITIALIZING,

or RUNNING to SYSTEM_ERROR.

Return type:

None

get_current_state()[source]

Get the current state of the workflow.

Return type:

str