toil.server.utils¶
Attributes¶
Classes¶
An in-memory place to store workflow state. |
|
A place for the WES server to keep its state: the set of workflows that |
|
An in-memory place to store workflow state, for testing. |
|
A place to store workflow state that uses a POSIX-compatible file system. |
|
A place to store workflow state that uses an S3-compatible object store. |
|
Slice of a state store for the state of a particular workflow. |
|
Class for managing the WES workflow state machine. |
Functions¶
|
Context manager to create a temporary file. Entering returns path to |
|
Retry a function if it fails with any Exception defined in "errors". |
Return the current time in ISO 8601 format. |
|
|
Create a link to a file from src to dest. |
|
Download a file from the Internet and write it to dest. |
|
Download a file from Amazon S3 and write it to dest. |
|
Return the type of the file as a human readable string. |
|
Safely read a file by acquiring a shared lock to prevent other processes |
|
Safely write to a file by acquiring an exclusive lock to prevent other |
Connect to a place to store state for workflows, defined by a URL. |
|
|
Connect to a place to store state for the given workflow, in the state |
Module Contents¶
- toil.server.utils.AtomicFileCreate(final_path, keep=False)[source]¶
Context manager to create a temporary file. Entering returns path to the temporary file in the same directory as finalPath. If the code in context succeeds, the file renamed to its actually name. If an error occurs, the file is not installed and is removed unless keep is specified.
- toil.server.utils.retry(intervals=None, infinite_retries=False, errors=None, log_message=None, prepare=None)[source]¶
Retry a function if it fails with any Exception defined in “errors”.
Does so every x seconds, where x is defined by a list of numbers (ints or floats) in “intervals”. Also accepts ErrorCondition events for more detailed retry attempts.
- Parameters:
intervals (Optional[List]) – A list of times in seconds we keep retrying until returning failure. Defaults to retrying with the following exponential back-off before failing: 1s, 1s, 2s, 4s, 8s, 16s
infinite_retries (bool) – If this is True, reset the intervals when they run out. Defaults to: False.
errors (Optional[Sequence[Union[ErrorCondition, Type[Exception]]]]) –
A list of exceptions OR ErrorCondition objects to catch and retry on. ErrorCondition objects describe more detailed error event conditions than a plain error. An ErrorCondition specifies: - Exception (required) - Error codes that must match to be retried (optional; defaults to not checking) - A string that must be in the error message to be retried (optional; defaults to not checking) - A bool that can be set to False to always error on this condition.
If not specified, this will default to a generic Exception.
log_message (Optional[Tuple[Callable, str]]) – Optional tuple of (“log/print function()”, “message string”) that will precede each attempt.
prepare (Optional[List[Callable]]) – Optional list of functions to call, with the function’s arguments, between retries, to reset state.
- Returns:
The result of the wrapped function or raise.
- Return type:
Callable[[Callable[Ellipsis, RT]], Callable[Ellipsis, RT]]
- toil.server.utils.HAVE_S3 = True¶
- toil.server.utils.logger¶
- toil.server.utils.download_file_from_internet(src, dest, content_type=None)[source]¶
Download a file from the Internet and write it to dest.
- toil.server.utils.download_file_from_s3(src, dest, content_type=None)[source]¶
Download a file from Amazon S3 and write it to dest.
- toil.server.utils.get_file_class(path)[source]¶
Return the type of the file as a human readable string.
- toil.server.utils.safe_read_file(file)[source]¶
Safely read a file by acquiring a shared lock to prevent other processes from writing to it while reading.
- toil.server.utils.safe_write_file(file, s)[source]¶
Safely write to a file by acquiring an exclusive lock to prevent other processes from reading and writing to it while writing.
- class toil.server.utils.MemoryStateCache[source]¶
An in-memory place to store workflow state.
- class toil.server.utils.AbstractStateStore[source]¶
A place for the WES server to keep its state: the set of workflows that exist and whether they are done or not.
This is a key-value store, with keys namespaced by workflow ID. Concurrent access from multiple threads or processes is safe and globally consistent.
Keys and workflow IDs are restricted to
[-a-zA-Z0-9_], because backends may use them as path or URL components.Key values are either a string, or None if the key is not set.
Workflow existence isn’t a thing; nonexistent workflows just have None for all keys.
Note that we don’t yet have a cleanup operation: things are stored permanently. Even clearing all the keys may leave data behind.
Also handles storage for a local cache, with a separate key namespace (not a read/write-through cache).
TODO: Can we replace this with just using a JobStore eventually, when AWSJobStore no longer needs SimpleDB?
- abstract get(workflow_id, key)[source]¶
Get the value of the given key for the given workflow, or None if the key is not set for the workflow.
- abstract set(workflow_id, key, value)[source]¶
Set the value of the given key for the given workflow. If the value is None, clear the key.
- class toil.server.utils.MemoryStateStore[source]¶
Bases:
MemoryStateCache,AbstractStateStoreAn in-memory place to store workflow state, for testing.
Inherits from MemoryStateCache first to provide implementations for AbstractStateStore.
- class toil.server.utils.FileStateStore(url)[source]¶
Bases:
AbstractStateStoreA place to store workflow state that uses a POSIX-compatible file system.
- Parameters:
url (str)
- class toil.server.utils.S3StateStore(url)¶
Bases:
AbstractStateStoreA place to store workflow state that uses an S3-compatible object store.
- Parameters:
url (str)
- get(workflow_id, key)¶
Get a key value from S3.
- toil.server.utils.state_store_cache: Dict[str, AbstractStateStore]¶
- toil.server.utils.connect_to_state_store(url)[source]¶
Connect to a place to store state for workflows, defined by a URL.
URL may be a local file path or URL or an S3 URL.
- Parameters:
url (str)
- Return type:
- class toil.server.utils.WorkflowStateStore(state_store, workflow_id)[source]¶
Slice of a state store for the state of a particular workflow.
- Parameters:
state_store (AbstractStateStore)
workflow_id (str)
- toil.server.utils.connect_to_workflow_state_store(url, workflow_id)[source]¶
Connect to a place to store state for the given workflow, in the state store defined by the given URL.
- Parameters:
- Return type:
- toil.server.utils.TERMINAL_STATES¶
- toil.server.utils.MAX_CANCELING_SECONDS = 30¶
- class toil.server.utils.WorkflowStateMachine(store)[source]¶
Class for managing the WES workflow state machine.
This is the authority on the WES “state” of a workflow. You need one to read or change the state.
Guaranteeing that only certain transitions can be observed is possible but not worth it. Instead, we just let updates clobber each other and grab and cache the first terminal state we see forever. If it becomes important that clients never see e.g. CANCELED -> COMPLETE or COMPLETE -> SYSTEM_ERROR, we can implement a real distributed state machine here.
We do handle making sure that tasks don’t get stuck in CANCELING.
State can be:
“UNKNOWN” “QUEUED” “INITIALIZING” “RUNNING” “PAUSED” “COMPLETE” “EXECUTOR_ERROR” “SYSTEM_ERROR” “CANCELED” “CANCELING”
Uses the state store’s local cache to prevent needing to read things we’ve seen already.
- Parameters:
store (WorkflowStateStore)
- send_enqueue()[source]¶
Send an enqueue message that would move from UNKNOWN to QUEUED.
- Return type:
None
- send_initialize()[source]¶
Send an initialize message that would move from QUEUED to INITIALIZING.
- Return type:
None
- send_run()[source]¶
Send a run message that would move from INITIALIZING to RUNNING.
- Return type:
None
- send_cancel()[source]¶
Send a cancel message that would move to CANCELING from any non-terminal state.
- Return type:
None
- send_canceled()[source]¶
Send a canceled message that would move to CANCELED from CANCELLING.
- Return type:
None
- send_complete()[source]¶
Send a complete message that would move from RUNNING to COMPLETE.
- Return type:
None
- send_executor_error()[source]¶
Send an executor_error message that would move from QUEUED, INITIALIZING, or RUNNING to EXECUTOR_ERROR.
- Return type:
None