toil.server.utils¶
Attributes¶
Classes¶
An in-memory place to store workflow state. |
|
A place for the WES server to keep its state: the set of workflows that |
|
An in-memory place to store workflow state, for testing. |
|
A place to store workflow state that uses a POSIX-compatible file system. |
|
A place to store workflow state that uses an S3-compatible object store. |
|
Slice of a state store for the state of a particular workflow. |
|
Class for managing the WES workflow state machine. |
Functions¶
Return the current time in ISO 8601 format. |
|
|
Create a link to a file from src to dest. |
|
Download a file from the Internet and write it to dest. |
|
Download a file from Amazon S3 and write it to dest. |
|
Return the type of the file as a human readable string. |
|
Safely read a file by acquiring a shared lock to prevent other processes |
|
Safely write to a file by acquiring an exclusive lock to prevent other |
Connect to a place to store state for workflows, defined by a URL. |
|
|
Connect to a place to store state for the given workflow, in the state |
Module Contents¶
- toil.server.utils.HAVE_S3 = True¶
- toil.server.utils.logger¶
- toil.server.utils.download_file_from_internet(src, dest, content_type=None)[source]¶
Download a file from the Internet and write it to dest.
- toil.server.utils.download_file_from_s3(src, dest, content_type=None)[source]¶
Download a file from Amazon S3 and write it to dest.
- toil.server.utils.get_file_class(path)[source]¶
Return the type of the file as a human readable string.
- toil.server.utils.safe_read_file(file)[source]¶
Safely read a file by acquiring a shared lock to prevent other processes from writing to it while reading.
- toil.server.utils.safe_write_file(file, s)[source]¶
Safely write to a file by acquiring an exclusive lock to prevent other processes from reading and writing to it while writing.
- class toil.server.utils.MemoryStateCache[source]¶
An in-memory place to store workflow state.
- class toil.server.utils.AbstractStateStore[source]¶
A place for the WES server to keep its state: the set of workflows that exist and whether they are done or not.
This is a key-value store, with keys namespaced by workflow ID. Concurrent access from multiple threads or processes is safe and globally consistent.
Keys and workflow IDs are restricted to
[-a-zA-Z0-9_]
, because backends may use them as path or URL components.Key values are either a string, or None if the key is not set.
Workflow existence isn’t a thing; nonexistent workflows just have None for all keys.
Note that we don’t yet have a cleanup operation: things are stored permanently. Even clearing all the keys may leave data behind.
Also handles storage for a local cache, with a separate key namespace (not a read/write-through cache).
TODO: Can we replace this with just using a JobStore eventually, when AWSJobStore no longer needs SimpleDB?
- abstract get(workflow_id, key)[source]¶
Get the value of the given key for the given workflow, or None if the key is not set for the workflow.
- abstract set(workflow_id, key, value)[source]¶
Set the value of the given key for the given workflow. If the value is None, clear the key.
- class toil.server.utils.MemoryStateStore[source]¶
Bases:
MemoryStateCache
,AbstractStateStore
An in-memory place to store workflow state, for testing.
Inherits from MemoryStateCache first to provide implementations for AbstractStateStore.
- class toil.server.utils.FileStateStore(url)[source]¶
Bases:
AbstractStateStore
A place to store workflow state that uses a POSIX-compatible file system.
- Parameters:
url (str)
- class toil.server.utils.S3StateStore(url)¶
Bases:
AbstractStateStore
A place to store workflow state that uses an S3-compatible object store.
- Parameters:
url (str)
- get(workflow_id, key)¶
Get a key value from S3.
- toil.server.utils.state_store_cache: dict[str, AbstractStateStore]¶
- toil.server.utils.connect_to_state_store(url)[source]¶
Connect to a place to store state for workflows, defined by a URL.
URL may be a local file path or URL or an S3 URL.
- Parameters:
url (str)
- Return type:
- class toil.server.utils.WorkflowStateStore(state_store, workflow_id)[source]¶
Slice of a state store for the state of a particular workflow.
- Parameters:
state_store (AbstractStateStore)
workflow_id (str)
- toil.server.utils.connect_to_workflow_state_store(url, workflow_id)[source]¶
Connect to a place to store state for the given workflow, in the state store defined by the given URL.
- Parameters:
- Return type:
- toil.server.utils.TERMINAL_STATES¶
- toil.server.utils.MAX_CANCELING_SECONDS = 30¶
- class toil.server.utils.WorkflowStateMachine(store)[source]¶
Class for managing the WES workflow state machine.
This is the authority on the WES “state” of a workflow. You need one to read or change the state.
Guaranteeing that only certain transitions can be observed is possible but not worth it. Instead, we just let updates clobber each other and grab and cache the first terminal state we see forever. If it becomes important that clients never see e.g. CANCELED -> COMPLETE or COMPLETE -> SYSTEM_ERROR, we can implement a real distributed state machine here.
We do handle making sure that tasks don’t get stuck in CANCELING.
State can be:
“UNKNOWN” “QUEUED” “INITIALIZING” “RUNNING” “PAUSED” “COMPLETE” “EXECUTOR_ERROR” “SYSTEM_ERROR” “CANCELED” “CANCELING”
Uses the state store’s local cache to prevent needing to read things we’ve seen already.
- Parameters:
store (WorkflowStateStore)
- send_enqueue()[source]¶
Send an enqueue message that would move from UNKNOWN to QUEUED.
- Return type:
None
- send_initialize()[source]¶
Send an initialize message that would move from QUEUED to INITIALIZING.
- Return type:
None
- send_run()[source]¶
Send a run message that would move from INITIALIZING to RUNNING.
- Return type:
None
- send_cancel()[source]¶
Send a cancel message that would move to CANCELING from any non-terminal state.
- Return type:
None
- send_canceled()[source]¶
Send a canceled message that would move to CANCELED from CANCELLING.
- Return type:
None
- send_complete()[source]¶
Send a complete message that would move from RUNNING to COMPLETE.
- Return type:
None
- send_executor_error()[source]¶
Send an executor_error message that would move from QUEUED, INITIALIZING, or RUNNING to EXECUTOR_ERROR.
- Return type:
None