toil.job¶
Attributes¶
Exceptions¶
Error for job being asked to promise its return value, but it not available. |
|
Common base class for all non-exit exceptions. |
|
Raised when a job reaches a point at which it has been instructed to stop for debugging. |
|
Raised when a job stops because it was asked to download its files, and the files are downloaded. |
|
General job exception. |
|
An exception raised in the event that a workflow contains an unresolvable dependency, such as a cycle. See |
Classes¶
Placeholder for a unregistered job ID used by a JobDescription. |
|
Requirement for one or more computational accelerators, like a GPU or FPGA. |
|
Typed storage for requirements for a job. |
|
Base class implementing the storage and presentation of requirements. |
|
Reference from a job description to its body. |
|
Stores all the information that the Toil Leader ever needs to know about a Job. |
|
A description of a job that hosts a service. |
|
A description of a job that is a checkpoint. |
|
Class represents a unit of work in toil. |
|
Job used to wrap a function. In its run method the wrapped function is called. |
|
A job function is a function whose first argument is a |
|
Handles dynamic resource allocation using |
|
Handles dynamic resource allocation for job functions. |
|
A convenience Job class used to make a job subgraph appear to be a single job. |
|
Job that runs a service. Used internally by Toil. Users should subclass Service instead of using this. |
|
Metadata for a file. |
|
Combine the outputs of multiple WorkerImportJobs into one promise |
|
Job to do file imports on a worker instead of a leader. Assumes all local and cloud files are accessible. |
|
Job to organize and delegate files to individual WorkerImportJobs. |
|
References a return value from a method as a promise before the method itself is run. |
|
Class for dynamically allocating job function resource requirements. |
|
This should be overwritten by a proper promised value. |
Functions¶
|
Parse an AcceleratorRequirement specified by user code. |
|
Test if candidate partially satisfies the given requirement. |
|
Determine if a set of accelerators satisfy a requirement. |
|
Get potential absolute URIs to check for an imported file. |
|
Resolve relative-URI files in the given environment and turn them into absolute normalized URIs. Returns a dictionary of the string values from the WDL file values |
|
Function for ensuring you actually have a promised value, and not just a promise. |
|
Function for ensuring you actually have a collection of promised values, |
Module Contents¶
- toil.job.logger¶
- exception toil.job.JobPromiseConstraintError(promisingJob, recipientJob=None)[source]¶
Bases:
RuntimeErrorError for job being asked to promise its return value, but it not available.
(Due to the return value not yet been hit in the topological order of the job graph.)
- promisingJob¶
- recipientJob = None¶
- exception toil.job.ConflictingPredecessorError(predecessor, successor)[source]¶
Bases:
ExceptionCommon base class for all non-exit exceptions.
- exception toil.job.DebugStoppingPointReached[source]¶
Bases:
BaseExceptionRaised when a job reaches a point at which it has been instructed to stop for debugging.
- exception toil.job.FilesDownloadedStoppingPointReached(message, host_and_job_paths=None)[source]¶
Bases:
DebugStoppingPointReachedRaised when a job stops because it was asked to download its files, and the files are downloaded.
- host_and_job_paths = None¶
- class toil.job.TemporaryID[source]¶
Placeholder for a unregistered job ID used by a JobDescription.
- Needs to be held:
By JobDescription objects to record normal relationships.
By Jobs to key their connected-component registries and to record predecessor relationships to facilitate EncapsulatedJob adding itself as a child.
By Services to tie back to their hosting jobs, so the service tree can be built up from Service objects.
- class toil.job.AcceleratorRequirement[source]¶
Bases:
TypedDictRequirement for one or more computational accelerators, like a GPU or FPGA.
- kind: str¶
What kind of accelerator is required. Can be “gpu”. Other kinds defined in the future might be “fpga”, etc.
- model: NotRequired[str]¶
What model of accelerator is needed. The exact set of values available depends on what the backing scheduler calls its accelerators; strings like “nvidia-tesla-k80” might be expected to work. If a specific model of accelerator is not required, this should be absent.
- brand: NotRequired[str]¶
What brand or manufacturer of accelerator is required. The exact set of values available depends on what the backing scheduler calls the brands of its accleerators; strings like “nvidia” or “amd” might be expected to work. If a specific brand of accelerator is not required (for example, because the job can use multiple brands of accelerator that support a given API) this should be absent.
- toil.job.parse_accelerator(spec)[source]¶
Parse an AcceleratorRequirement specified by user code.
Supports formats like:
>>> parse_accelerator(8) {'count': 8, 'kind': 'gpu'}
>>> parse_accelerator("1") {'count': 1, 'kind': 'gpu'}
>>> parse_accelerator("nvidia-tesla-k80") {'count': 1, 'kind': 'gpu', 'model': 'nvidia-tesla-k80', 'brand': 'nvidia'}
>>> parse_accelerator("nvidia-tesla-k80:2") {'count': 2, 'kind': 'gpu', 'model': 'nvidia-tesla-k80', 'brand': 'nvidia'}
>>> parse_accelerator("gpu") {'count': 1, 'kind': 'gpu'}
>>> parse_accelerator("cuda:1") {'count': 1, 'kind': 'gpu', 'api': 'cuda', 'brand': 'nvidia'}
>>> parse_accelerator({"kind": "gpu"}) {'count': 1, 'kind': 'gpu'}
>>> parse_accelerator({"brand": "nvidia", "count": 5}) {'count': 5, 'kind': 'gpu', 'brand': 'nvidia'}
Assumes that if not specified, we are talking about GPUs, and about one of them. Knows that “gpu” is a kind, and “cuda” is an API, and “nvidia” is a brand.
- Raises:
ValueError – if it gets something it can’t parse
TypeError – if it gets something it can’t parse because it’s the wrong type.
- Parameters:
spec (ParseableSingleAcceleratorRequirement)
- Return type:
- toil.job.accelerator_satisfies(candidate, requirement, ignore=[])[source]¶
Test if candidate partially satisfies the given requirement.
- Returns:
True if the given candidate at least partially satisfies the given requirement (i.e. check all fields other than count).
- Parameters:
candidate (AcceleratorRequirement)
requirement (AcceleratorRequirement)
- Return type:
- toil.job.accelerators_fully_satisfy(candidates, requirement, ignore=[])[source]¶
Determine if a set of accelerators satisfy a requirement.
Ignores fields specified in ignore.
- Returns:
True if the requirement AcceleratorRequirement is fully satisfied by the ones in the list, taken together (i.e. check all fields including count).
- Parameters:
candidates (list[AcceleratorRequirement] | None)
requirement (AcceleratorRequirement)
- Return type:
- class toil.job.RequirementsDict[source]¶
Bases:
TypedDictTyped storage for requirements for a job.
Where requirement values are of different types depending on the requirement.
- accelerators: NotRequired[list[AcceleratorRequirement]]¶
- toil.job.REQUIREMENT_NAMES = ['disk', 'memory', 'cores', 'accelerators', 'preemptible']¶
- toil.job.ParsedRequirement¶
- toil.job.ParseableIndivisibleResource¶
- toil.job.ParseableDivisibleResource¶
- toil.job.ParseableFlag¶
- toil.job.ParseableSingleAcceleratorRequirement¶
- toil.job.ParseableAcceleratorRequirement¶
- toil.job.ParseableRequirement¶
- class toil.job.Requirer(requirements)[source]¶
Base class implementing the storage and presentation of requirements.
Has cores, memory, disk, and preemptability as properties.
- Parameters:
requirements (collections.abc.Mapping[str, ParseableRequirement | None])
- assignConfig(config)[source]¶
Assign the given config object to be used to provide default values.
Must be called exactly once on a loaded JobDescription before any requirements are queried.
- Parameters:
config (toil.common.Config) – Config object to query
- Return type:
None
- __deepcopy__(memo)[source]¶
Return a semantically-deep copy of the object, for
copy.deepcopy().- Parameters:
memo (Any)
- Return type:
- property requirements: RequirementsDict¶
Get dict containing all non-None, non-defaulted requirements.
- Return type:
- property preemptible: bool¶
Whether a preemptible node is permitted, or a nonpreemptible one is required.
- Return type:
- property accelerators: list[AcceleratorRequirement]¶
Any accelerators, such as GPUs, that are needed.
- Return type:
- class toil.job.JobBodyReference[source]¶
Bases:
NamedTupleReference from a job description to its body.
- module_command: collections.abc.Sequence[str]¶
Description of the module needed to load the body.
- class toil.job.JobDescription(requirements, jobName, unitName='', displayName='', local=None, files=None)[source]¶
Bases:
RequirerStores all the information that the Toil Leader ever needs to know about a Job.
- This includes:
Resource requirements.
Which jobs are children or follow-ons or predecessors of this job.
A reference to the Job object in the job store.
Can be obtained from an actual (i.e. executable) Job object, and can be used to obtain the Job object from the JobStore.
Never contains other Jobs or JobDescriptions: all reference is by ID.
Subclassed into variants for checkpoint jobs and service jobs that have their specific parameters.
- Parameters:
requirements (collections.abc.Mapping[str, ParseableRequirement | None])
jobName (str)
unitName (str | None)
displayName (str | None)
local (bool | None)
files (set[toil.fileStores.FileID] | None)
- jobName¶
- unitName¶
- displayName¶
- jobStoreID: str | TemporaryID¶
- predecessorNumber = 0¶
- childIDs: set[str | TemporaryID]¶
- followOnIDs: set[str | TemporaryID]¶
- successor_phases: list[set[str | TemporaryID]]¶
- serviceTree: dict[str | TemporaryID, list[str | TemporaryID]]¶
- files_to_use¶
- get_chain()[source]¶
Get all the jobs that executed in this job’s chain, in order.
For each job, produces a named tuple with its various names and its original job store ID. The jobs in the chain are in execution order.
If the job hasn’t run yet or it didn’t chain, produces a one-item list.
- Return type:
- serviceHostIDsInBatches()[source]¶
Find all batches of service host job IDs that can be started at the same time.
(in the order they need to start in)
- Return type:
- successorsAndServiceHosts()[source]¶
Get an iterator over all child, follow-on, and service job IDs.
- Return type:
- allSuccessors()[source]¶
Get an iterator over all child, follow-on, and chained, inherited successor job IDs.
Follow-ons will come before children.
- Return type:
- successors_by_phase()[source]¶
Get an iterator over all child/follow-on/chained inherited successor job IDs, along with their phase number on the stack.
Phases execute higher numbers to lower numbers.
- Return type:
- property services: list[str]¶
Get a collection of the IDs of service host jobs for this job, in arbitrary order.
Will be empty if the job has no unfinished services.
- has_body()[source]¶
Returns True if we have a job body associated, and False otherwise.
- Return type:
- attach_body(file_store_id, user_script)[source]¶
Attach a job body to this JobDescription.
Takes the file store ID that the body is stored at, and the required user script module.
The file store ID can also be “firstJob” for the root job, stored as a shared file instead.
- Parameters:
file_store_id (str)
user_script (toil.resource.ModuleDescriptor)
- Return type:
None
- get_body()[source]¶
Get the information needed to load the job body.
- Returns:
a file store ID (or magic shared file name “firstJob”) and a user script module.
- Return type:
Fails if no body is attached; check has_body() first.
- nextSuccessors()[source]¶
Return the collection of job IDs for the successors of this job that are ready to run.
If those jobs have multiple predecessor relationships, they may still be blocked on other jobs.
Returns None when at the final phase (all successors done), and an empty collection if there are more phases but they can’t be entered yet (e.g. because we are waiting for the job itself to run).
- filterSuccessors(predicate)[source]¶
Keep only successor jobs for which the given predicate function approves.
The predicate function is called with the job’s ID.
Treats all other successors as complete and forgets them.
- Parameters:
predicate (collections.abc.Callable[[str], bool])
- Return type:
None
- filterServiceHosts(predicate)[source]¶
Keep only services for which the given predicate approves.
The predicate function is called with the service host job’s ID.
Treats all other services as complete and forgets them.
- Parameters:
predicate (collections.abc.Callable[[str], bool])
- Return type:
None
- clear_nonexistent_dependents(job_store)[source]¶
Remove all references to child, follow-on, and associated service jobs that do not exist.
That is to say, all those that have been completed and removed.
- Parameters:
job_store (toil.jobStores.abstractJobStore.AbstractJobStore)
- Return type:
None
- is_subtree_done()[source]¶
Check if the subtree is done.
- Returns:
True if the job appears to be done, and all related child, follow-on, and service jobs appear to be finished and removed.
- Return type:
- replace(other)[source]¶
Take on the ID of another JobDescription, retaining our own state and type.
When updated in the JobStore, we will save over the other JobDescription.
Useful for chaining jobs: the chained-to job can replace the parent job.
Merges cleanup state and successors other than this job from the job being replaced into this one.
- Parameters:
other (JobDescription) – Job description to replace.
- Return type:
None
- assert_is_not_newer_than(other)[source]¶
Make sure this JobDescription is not newer than a prospective new version of the JobDescription.
- Parameters:
other (JobDescription)
- Return type:
None
- is_updated_by(other)[source]¶
Return True if the passed JobDescription is a distinct, newer version of this one.
- Parameters:
other (JobDescription)
- Return type:
- addChild(childID)[source]¶
Make the job with the given ID a child of the described job.
- Parameters:
childID (str | TemporaryID)
- Return type:
None
- addFollowOn(followOnID)[source]¶
Make the job with the given ID a follow-on of the described job.
- Parameters:
followOnID (str | TemporaryID)
- Return type:
None
- addServiceHostJob(serviceID, parentServiceID=None)[source]¶
Make the ServiceHostJob with the given ID a service of the described job.
If a parent ServiceHostJob ID is given, that parent service will be started first, and must have already been added.
- Parameters:
serviceID (str | TemporaryID)
parentServiceID (str | TemporaryID | None)
- Return type:
None
- hasChild(childID)[source]¶
Return True if the job with the given ID is a child of the described job.
- Parameters:
childID (str | TemporaryID)
- Return type:
- hasFollowOn(followOnID)[source]¶
Test if the job with the given ID is a follow-on of the described job.
- Parameters:
followOnID (str | TemporaryID)
- Return type:
- hasServiceHostJob(serviceID)[source]¶
Test if the ServiceHostJob is a service of the described job.
- Parameters:
serviceID (str | TemporaryID)
- Return type:
- renameReferences(renames)[source]¶
Apply the given dict of ID renames to all references to jobs.
Does not modify our own ID or those of finished predecessors. IDs not present in the renames dict are left as-is.
- Parameters:
renames (dict[TemporaryID, str]) – Rename operations to apply.
- Return type:
None
- addPredecessor()[source]¶
Notify the JobDescription that a predecessor has been added to its Job.
- Return type:
None
- onRegistration(jobStore)[source]¶
Perform setup work that requires the JobStore.
Called by the Job saving logic when this JobDescription meets the JobStore and has its ID assigned.
Overridden to perform setup work (like hooking up flag files for service jobs) that requires the JobStore.
- Parameters:
jobStore (toil.jobStores.abstractJobStore.AbstractJobStore) – The job store we are being placed into
- Return type:
None
- chargeRetry()[source]¶
Charge the job one of its remaining retries.
Manages exponential backoff of retried jobs.
On completion, self.retry_backoff_seconds will be the time to wait before the next retry.
Can only be used once the config has been attached.
- Return type:
None
- setupJobAfterFailure(exit_status=None, exit_reason=None)[source]¶
Configure job after a failure.
Reduce the remainingTryCount if greater than zero and set the memory to be at least as big as the default memory (in case of exhaustion of memory, which is common).
Requires a configuration to have been assigned (see
toil.job.Requirer.assignConfig()).- Parameters:
exit_status (int | None) – The exit code from the job.
exit_reason (toil.batchSystems.abstractBatchSystem.BatchJobExitReason | None) – The reason the job stopped, if available from the batch system.
- Return type:
None
- getLogFileHandle(jobStore)[source]¶
Create a context manager that yields a file handle to the log file.
Assumes logJobStoreFileID is set.
- Parameters:
- Return type:
Any
- property remainingTryCount: int¶
Get the number of tries remaining.
The try count set on the JobDescription, or the default based on the retry count from the config if none is set.
- Return type:
- property retry_backoff_seconds: float¶
Get the number of seconds to wait before retrying this job.
- Return type:
- resetRetries()[source]¶
Clear retry system values back to the workflow defaults.
- Returns:
True if a modification to the JobDescription was made, and False otherwise.
- Return type:
- class toil.job.ServiceJobDescription(*args, **kwargs)[source]¶
Bases:
JobDescriptionA description of a job that hosts a service.
- Parameters:
args (Any)
kwargs (Any)
- class toil.job.CheckpointJobDescription(*args, **kwargs)[source]¶
Bases:
JobDescriptionA description of a job that is a checkpoint.
- Parameters:
args (Any)
kwargs (Any)
- checkpoint: JobBodyReference | None = None¶
- restartCheckpoint(jobStore)[source]¶
Restart a checkpoint after the total failure of jobs in its subtree.
Writes the changes to the jobStore immediately. All the checkpoint’s successors will be deleted, but its try count will not be decreased.
Returns a list with the IDs of any successors deleted.
- Parameters:
- Return type:
- toil.job.JobType¶
- class toil.job.Job(memory=None, cores=None, disk=None, accelerators=None, preemptible=None, preemptable=None, unitName='', checkpoint=False, displayName='', descriptionClass=None, local=None, files=None)[source]¶
Class represents a unit of work in toil.
- Parameters:
memory (ParseableIndivisibleResource | None)
cores (ParseableDivisibleResource | None)
disk (ParseableIndivisibleResource | None)
accelerators (ParseableAcceleratorRequirement | None)
preemptible (ParseableFlag | None)
preemptable (ParseableFlag | None)
unitName (str | None)
checkpoint (bool | None)
displayName (str | None)
descriptionClass (type[JobDescription] | None)
local (bool | None)
files (set[toil.fileStores.FileID] | None)
- userModule: toil.resource.ModuleDescriptor¶
- __str__()[source]¶
Produce a useful logging string to identify this Job and distinguish it from its JobDescription.
- Return type:
- check_initialized()[source]¶
Ensure that Job.__init__() has been called by any subclass __init__().
This uses the fact that the self._description instance variable should always be set after __init__().
If __init__() has not been called, raise an error.
- Return type:
None
- property jobStoreID: str | TemporaryID¶
Get the ID of this Job.
- Return type:
- property description: JobDescription¶
Expose the JobDescription that describes this job.
- Return type:
- property memory: int¶
The maximum number of bytes of memory the job will require to run.
- Return type:
- property accelerators: list[AcceleratorRequirement]¶
Any accelerators, such as GPUs, that are needed.
- Return type:
- property files_to_use: set[toil.fileStores.FileID]¶
- Return type:
- add_to_files_to_use(val)[source]¶
- Parameters:
val (toil.fileStores.FileID)
- Return type:
None
- remove_from_files_to_use(val)[source]¶
- Parameters:
val (toil.fileStores.FileID)
- Return type:
None
- assignConfig(config)[source]¶
Assign the given config object.
It will be used by various actions implemented inside the Job class.
- Parameters:
config (toil.common.Config) – Config object to query
- Return type:
None
- run(fileStore)[source]¶
Override this function to perform work and dynamically create successor jobs.
- Parameters:
fileStore (toil.fileStores.abstractFileStore.AbstractFileStore) – Used to create local and globally sharable temporary files and to send log messages to the leader process.
- Returns:
The return value of the function can be passed to other jobs by means of
toil.job.Job.rv().- Return type:
Any
- addChild(childJob)[source]¶
Add a childJob to be run as child of this job.
Child jobs will be run directly after this job’s
toil.job.Job.run()method has completed.- Returns:
childJob: for call chaining
- Parameters:
childJob (JobType)
- Return type:
JobType
- addFollowOn(followOnJob)[source]¶
Add a follow-on job.
Follow-on jobs will be run after the child jobs and their successors have been run.
- Returns:
followOnJob for call chaining
- Parameters:
followOnJob (JobType)
- Return type:
JobType
- addService(service, parentService=None)[source]¶
Add a service.
The
toil.job.Job.Service.start()method of the service will be called after the run method has completed but before any successors are run. The service’stoil.job.Job.Service.stop()method will be called once the successors of the job have been run.Services allow things like databases and servers to be started and accessed by jobs in a workflow.
- Raises:
toil.job.JobException – If service has already been made the child of a job or another service.
- Parameters:
- Returns:
a promise that will be replaced with the return value from
toil.job.Job.Service.start()of service in any successor of the job.- Return type:
- hasService(service)[source]¶
Return True if the given Service is a service of this job, and False otherwise.
- addChildFn(fn, *args, **kwargs)[source]¶
Add a function as a child job.
- Parameters:
fn (collections.abc.Callable[Ellipsis, Any]) – Function to be run as a child job with
*argsand**kwargsas arguments to this function. See toil.job.FunctionWrappingJob for reserved keyword arguments used to specify resource requirements.args (Any)
kwargs (Any)
- Returns:
The new child job that wraps fn.
- Return type:
- addFollowOnFn(fn, *args, **kwargs)[source]¶
Add a function as a follow-on job.
- Parameters:
fn (collections.abc.Callable[Ellipsis, Any]) – Function to be run as a follow-on job with
*argsand**kwargsas arguments to this function. See toil.job.FunctionWrappingJob for reserved keyword arguments used to specify resource requirements.args (Any)
kwargs (Any)
- Returns:
The new follow-on job that wraps fn.
- Return type:
- addChildJobFn(fn, *args, **kwargs)[source]¶
Add a job function as a child job.
See
toil.job.JobFunctionWrappingJobfor a definition of a job function.- Parameters:
fn (collections.abc.Callable[Ellipsis, Any]) – Job function to be run as a child job with
*argsand**kwargsas arguments to this function. See toil.job.JobFunctionWrappingJob for reserved keyword arguments used to specify resource requirements.args (Any)
kwargs (Any)
- Returns:
The new child job that wraps fn.
- Return type:
- addFollowOnJobFn(fn, *args, **kwargs)[source]¶
Add a follow-on job function.
See
toil.job.JobFunctionWrappingJobfor a definition of a job function.- Parameters:
fn (collections.abc.Callable[Ellipsis, Any]) – Job function to be run as a follow-on job with
*argsand**kwargsas arguments to this function. See toil.job.JobFunctionWrappingJob for reserved keyword arguments used to specify resource requirements.args (Any)
kwargs (Any)
- Returns:
The new follow-on job that wraps fn.
- Return type:
- property tempDir: str¶
Shortcut to calling
job.fileStore.getLocalTempDir().Temp dir is created on first call and will be returned for first and future calls :return: Path to tempDir. See job.fileStore.getLocalTempDir
- Return type:
- static wrapFn(fn, *args, **kwargs)[source]¶
Makes a Job out of a function.
Convenience function for constructor of
toil.job.FunctionWrappingJob.- Parameters:
fn (collections.abc.Callable[Ellipsis, Any]) – Function to be run with
*argsand**kwargsas arguments. See toil.job.JobFunctionWrappingJob for reserved keyword arguments used to specify resource requirements.args (Any)
kwargs (Any)
- Returns:
The new function that wraps fn.
- Return type:
- static wrapJobFn(fn, *args, **kwargs)[source]¶
Makes a Job out of a job function.
Convenience function for constructor of
toil.job.JobFunctionWrappingJob.- Parameters:
fn (collections.abc.Callable[Ellipsis, Any]) – Job function to be run with
*argsand**kwargsas arguments. See toil.job.JobFunctionWrappingJob for reserved keyword arguments used to specify resource requirements.args (Any)
kwargs (Any)
- Returns:
The new job function that wraps fn.
- Return type:
- encapsulate(name=None)[source]¶
Encapsulates the job, see
toil.job.EncapsulatedJob. Convenience function for constructor oftoil.job.EncapsulatedJob.- Parameters:
name (str | None) – Human-readable name for the encapsulated job.
- Returns:
an encapsulated version of this job.
- Return type:
- rv(*path)[source]¶
Create a promise (
toil.job.Promise).The “promise” representing a return value of the job’s run method, or, in case of a function-wrapping job, the wrapped function’s return value.
- Parameters:
path ((Any)) – Optional path for selecting a component of the promised return value. If absent or empty, the entire return value will be used. Otherwise, the first element of the path is used to select an individual item of the return value. For that to work, the return value must be a list, dictionary or of any other type implementing the __getitem__() magic method. If the selected item is yet another composite value, the second element of the path can be used to select an item from it, and so on. For example, if the return value is [6,{‘a’:42}], .rv(0) would select 6 , rv(1) would select {‘a’:3} while rv(1,’a’) would select 3. To select a slice from a return value that is slicable, e.g. tuple or list, the path element should be a slice object. For example, assuming that the return value is [6, 7, 8, 9] then .rv(slice(1, 3)) would select [7, 8]. Note that slicing really only makes sense at the end of path.
- Returns:
A promise representing the return value of this jobs
toil.job.Job.run()method.- Return type:
- prepareForPromiseRegistration(jobStore)[source]¶
Set up to allow this job’s promises to register themselves.
Prepare this job (the promisor) so that its promises can register themselves with it, when the jobs they are promised to (promisees) are serialized.
The promissee holds the reference to the promise (usually as part of the job arguments) and when it is being pickled, so will the promises it refers to. Pickling a promise triggers it to be registered with the promissor.
- Parameters:
- Return type:
None
- checkJobGraphForDeadlocks()[source]¶
Ensures that a graph of Jobs (that hasn’t yet been saved to the JobStore) doesn’t contain any pathological relationships between jobs that would result in deadlocks if we tried to run the jobs.
See
toil.job.Job.checkJobGraphConnected(),toil.job.Job.checkJobGraphAcyclic()andtoil.job.Job.checkNewCheckpointsAreLeafVertices()for more info.- Raises:
toil.job.JobGraphDeadlockException – if the job graph is cyclic, contains multiple roots or contains checkpoint jobs that are not leaf vertices when defined (see
toil.job.Job.checkNewCheckpointsAreLeaves()).- Return type:
None
- getRootJobs()[source]¶
Return the set of root job objects that contain this job.
A root job is a job with no predecessors (i.e. which are not children, follow-ons, or services).
Only deals with jobs created here, rather than loaded from the job store.
- checkJobGraphConnected()[source]¶
- Raises:
toil.job.JobGraphDeadlockException – if
toil.job.Job.getRootJobs()does not contain exactly one root job.- Return type:
None
As execution always starts from one root job, having multiple root jobs will cause a deadlock to occur.
Only deals with jobs created here, rather than loaded from the job store.
- checkJobGraphAcylic()[source]¶
- Raises:
toil.job.JobGraphDeadlockException – if the connected component of jobs containing this job contains any cycles of child/followOn dependencies in the augmented job graph (see below). Such cycles are not allowed in valid job graphs.
- Return type:
None
A follow-on edge (A, B) between two jobs A and B is equivalent to adding a child edge to B from (1) A, (2) from each child of A, and (3) from the successors of each child of A. We call each such edge an edge an “implied” edge. The augmented job graph is a job graph including all the implied edges.
For a job graph G = (V, E) the algorithm is
O(|V|^2). It isO(|V| + |E|)for a graph with no follow-ons. The former follow-on case could be improved!Only deals with jobs created here, rather than loaded from the job store.
- checkNewCheckpointsAreLeafVertices()[source]¶
A checkpoint job is a job that is restarted if either it fails, or if any of its successors completely fails, exhausting their retries.
A job is a leaf it is has no successors.
A checkpoint job must be a leaf when initially added to the job graph. When its run method is invoked it can then create direct successors. This restriction is made to simplify implementation.
Only works on connected components of jobs not yet added to the JobStore.
- Raises:
toil.job.JobGraphDeadlockException – if there exists a job being added to the graph for which checkpoint=True and which is not a leaf.
- Return type:
None
- P¶
- defer(function, *args, **kwargs)[source]¶
Register a deferred function, i.e. a callable that will be invoked after the current attempt at running this job concludes. A job attempt is said to conclude when the job function (or the
toil.job.Job.run()method for class-based jobs) returns, raises an exception or after the process running it terminates abnormally. A deferred function will be called on the node that attempted to run the job, even if a subsequent attempt is made on another node. A deferred function should be idempotent because it may be called multiple times on the same node or even in the same process. More than one deferred function may be registered per job attempt by calling this method repeatedly with different arguments. If the same function is registered twice with the same or different arguments, it will be called twice per job attempt.Examples for deferred functions are ones that handle cleanup of resources external to Toil, like Docker containers, files outside the work directory, etc.
- class Runner[source]¶
Used to setup and run Toil workflow.
- static getDefaultArgumentParser(jobstore_as_flag=False)[source]¶
Get argument parser with added toil workflow options.
This is the Right Way to get an argument parser in a Toil Python workflow.
- Parameters:
jobstore_as_flag (bool) – make the job store option a –jobStore flag instead of a required jobStore positional argument.
- Returns:
The argument parser used by a toil workflow with added Toil options.
- Return type:
configargparse.ArgParser
- static getDefaultOptions(jobStore=None, jobstore_as_flag=False)[source]¶
Get default options for a toil workflow.
- Parameters:
jobStore (toil.lib.misc.StrPath | None) – A string describing the jobStore for the workflow.
jobstore_as_flag (bool) – make the job store option a –jobStore flag instead of a required jobStore positional argument.
- Returns:
The options used by a toil workflow.
- Return type:
- static addToilOptions(parser, jobstore_as_flag=False)[source]¶
Adds the default toil options to an
optparseorargparseparser object.Consider using
getDefaultArgumentParser()instead, which will produce a parser of the correct class to use Toil’s config file and environment variables. If ther parser passed here is just anargparse.ArgumentParserand not aconfigargparse.ArgParser, the Toil config file and environment variables will not be respected.- Parameters:
parser (optparse.OptionParser | argparse.ArgumentParser) – Options object to add toil options to.
jobstore_as_flag (bool) – make the job store option a –jobStore flag instead of a required jobStore positional argument.
- Return type:
None
- static startToil(job, options)[source]¶
Run the toil workflow using the given options.
Deprecated by toil.common.Toil.start.
(see Job.Runner.getDefaultOptions and Job.Runner.addToilOptions) starting with this job. :param job: root job of the workflow :raises: toil.exceptions.FailedJobsException if at the end of function there remain failed jobs. :return: The return value of the root job’s run function.
- Parameters:
job (Job)
options (argparse.Namespace)
- Return type:
Any
- class Service(memory=None, cores=None, disk=None, accelerators=None, preemptible=None, unitName='')[source]¶
Bases:
RequirerAbstract class used to define the interface to a service.
Should be subclassed by the user to define services.
Is not executed as a job; runs within a ServiceHostJob.
- Parameters:
memory (ParseableIndivisibleResource | None)
cores (ParseableDivisibleResource | None)
disk (ParseableIndivisibleResource | None)
accelerators (ParseableAcceleratorRequirement | None)
preemptible (ParseableFlag | None)
unitName (str | None)
- unitName = ''¶
- jobName = 'Service'¶
- hostID: str | TemporaryID | None = None¶
- abstractmethod start(job)[source]¶
Start the service.
- Parameters:
job (ServiceHostJob) – The underlying host job that the service is being run in. Can be used to register deferred functions, or to access the fileStore for creating temporary files.
- Returns:
An object describing how to access the service. The object must be pickleable and will be used by jobs to access the service (see
toil.job.Job.addService()).- Return type:
Any
- abstractmethod stop(job)[source]¶
Stops the service. Function can block until complete.
- Parameters:
job (ServiceHostJob) – The underlying host job that the service is being run in. Can be used to register deferred functions, or to access the fileStore for creating temporary files.
- Return type:
None
- abstractmethod check()[source]¶
Checks the service is still running.
- Raises:
exceptions.RuntimeError – If the service failed, this will cause the service job to be labeled failed.
- Returns:
True if the service is still running, else False. If False then the service job will be terminated, and considered a success. Important point: if the service job exits due to a failure, it should raise a RuntimeError, not return False!
- Return type:
- getTopologicalOrderingOfJobs()[source]¶
- Returns:
a list of jobs such that for all pairs of indices i, j for which i < j, the job at index i can be run before the job at index j.
- Return type:
Only considers jobs in this job’s subgraph that are newly added, not loaded from the job store.
Ignores service jobs.
- saveBody(jobStore)[source]¶
Save the execution data for just this job to the JobStore, and fill in the JobDescription with the information needed to retrieve it.
The Job’s JobDescription must have already had a real jobStoreID assigned to it.
Does not save the JobDescription.
- Parameters:
jobStore (toil.jobStores.abstractJobStore.AbstractJobStore) – The job store to save the job body into.
- Return type:
None
- saveAsRootJob(jobStore)[source]¶
Save this job to the given jobStore as the root job of the workflow.
- Returns:
the JobDescription describing this job.
- Parameters:
- Return type:
- classmethod loadJob(job_store, job_description)[source]¶
Retrieves a
toil.job.Jobinstance from a JobStore- Parameters:
job_store (toil.jobStores.abstractJobStore.AbstractJobStore) – The job store.
job_description (JobDescription) – the JobDescription of the job to retrieve.
- Returns:
The job referenced by the JobDescription.
- Return type:
- set_debug_flag(flag)[source]¶
Enable the given debug option on the job.
- Parameters:
flag (str)
- Return type:
None
- files_downloaded_hook(host_and_job_paths=None)[source]¶
Function that subclasses can call when they have downloaded their input files.
Will abort the job if the “download_only” debug flag is set.
Can be hinted a list of file path pairs outside and inside the job container, in which case the container environment can be reconstructed.
- exception toil.job.JobException(message)[source]¶
Bases:
ExceptionGeneral job exception.
- Parameters:
message (str)
- exception toil.job.JobGraphDeadlockException(string)[source]¶
Bases:
JobExceptionAn exception raised in the event that a workflow contains an unresolvable dependency, such as a cycle. See
toil.job.Job.checkJobGraphForDeadlocks().- Parameters:
string (str)
- class toil.job.FunctionWrappingJob(userFunction, *args, **kwargs)[source]¶
Bases:
JobJob used to wrap a function. In its run method the wrapped function is called.
- Parameters:
userFunction (collections.abc.Callable[Ellipsis, Any])
args (Any)
kwargs (Any)
- userFunctionModule¶
- userFunctionName = ''¶
- run(fileStore)[source]¶
Override this function to perform work and dynamically create successor jobs.
- Parameters:
fileStore (toil.fileStores.abstractFileStore.AbstractFileStore) – Used to create local and globally sharable temporary files and to send log messages to the leader process.
- Returns:
The return value of the function can be passed to other jobs by means of
toil.job.Job.rv().- Return type:
Any
- class toil.job.JobFunctionWrappingJob(userFunction, *args, **kwargs)[source]¶
Bases:
FunctionWrappingJobA job function is a function whose first argument is a
Jobinstance that is the wrapping job for the function. This can be used to add successor jobs for the function and perform all the functions theJobclass provides.To enable the job function to get access to the
toil.fileStores.abstractFileStore.AbstractFileStoreinstance (seetoil.job.Job.run()), it is made a variable of the wrapping job called fileStore.To specify a job’s resource requirements the following default keyword arguments can be specified:
memory
disk
cores
accelerators
preemptible
For example to wrap a function into a job we would call:
Job.wrapJobFn(myJob, memory='100k', disk='1M', cores=0.1)
- Parameters:
userFunction (collections.abc.Callable[Ellipsis, Any])
args (Any)
kwargs (Any)
- property fileStore: toil.fileStores.abstractFileStore.AbstractFileStore¶
- Return type:
- run(fileStore)[source]¶
Override this function to perform work and dynamically create successor jobs.
- Parameters:
fileStore (toil.fileStores.abstractFileStore.AbstractFileStore) – Used to create local and globally sharable temporary files and to send log messages to the leader process.
- Returns:
The return value of the function can be passed to other jobs by means of
toil.job.Job.rv().- Return type:
Any
- class toil.job.PromisedRequirementFunctionWrappingJob(userFunction, *args, **kwargs)[source]¶
Bases:
FunctionWrappingJobHandles dynamic resource allocation using
toil.job.Promiseinstances. Spawns child function using parent function parameters and fulfilled promised resource requirements.- Parameters:
userFunction (collections.abc.Callable[Ellipsis, Any])
args (Any)
kwargs (Any)
- classmethod create(userFunction, *args, **kwargs)[source]¶
Creates an encapsulated Toil job function with unfulfilled promised resource requirements. After the promises are fulfilled, a child job function is created using updated resource values. The subgraph is encapsulated to ensure that this child job function is run before other children in the workflow. Otherwise, a different child may try to use an unresolved promise return value from the parent.
- Parameters:
userFunction (collections.abc.Callable[Ellipsis, Any])
args (Any)
kwargs (Any)
- Return type:
- run(fileStore)[source]¶
Override this function to perform work and dynamically create successor jobs.
- Parameters:
fileStore (toil.fileStores.abstractFileStore.AbstractFileStore) – Used to create local and globally sharable temporary files and to send log messages to the leader process.
- Returns:
The return value of the function can be passed to other jobs by means of
toil.job.Job.rv().- Return type:
Any
- class toil.job.PromisedRequirementJobFunctionWrappingJob(userFunction, *args, **kwargs)[source]¶
Bases:
PromisedRequirementFunctionWrappingJobHandles dynamic resource allocation for job functions. See
toil.job.JobFunctionWrappingJob- Parameters:
userFunction (collections.abc.Callable[Ellipsis, Any])
args (Any)
kwargs (Any)
- run(fileStore)[source]¶
Override this function to perform work and dynamically create successor jobs.
- Parameters:
fileStore (toil.fileStores.abstractFileStore.AbstractFileStore) – Used to create local and globally sharable temporary files and to send log messages to the leader process.
- Returns:
The return value of the function can be passed to other jobs by means of
toil.job.Job.rv().- Return type:
Any
- class toil.job.EncapsulatedJob(job, unitName=None)[source]¶
Bases:
JobA convenience Job class used to make a job subgraph appear to be a single job.
Let A be the root job of a job subgraph and B be another job we’d like to run after A and all its successors have completed, for this use encapsulate:
# Job A and subgraph, Job B A, B = A(), B() Aprime = A.encapsulate() Aprime.addChild(B) # B will run after A and all its successors have completed, A and its subgraph of # successors in effect appear to be just one job.
If the job being encapsulated has predecessors (e.g. is not the root job), then the encapsulated job will inherit these predecessors. If predecessors are added to the job being encapsulated after the encapsulated job is created then the encapsulating job will NOT inherit these predecessors automatically. Care should be exercised to ensure the encapsulated job has the proper set of predecessors.
The return value of an encapsulated job (as accessed by the
toil.job.Job.rv()function) is the return value of the root job, e.g. A().encapsulate().rv() and A().rv() will resolve to the same value after A or A.encapsulate() has been run.- addChild(childJob)[source]¶
Add a childJob to be run as child of this job.
Child jobs will be run directly after this job’s
toil.job.Job.run()method has completed.- Returns:
childJob: for call chaining
- Parameters:
childJob (JobType)
- Return type:
JobType
- addService(service, parentService=None)[source]¶
Add a service.
The
toil.job.Job.Service.start()method of the service will be called after the run method has completed but before any successors are run. The service’stoil.job.Job.Service.stop()method will be called once the successors of the job have been run.Services allow things like databases and servers to be started and accessed by jobs in a workflow.
- Raises:
toil.job.JobException – If service has already been made the child of a job or another service.
- Parameters:
- Returns:
a promise that will be replaced with the return value from
toil.job.Job.Service.start()of service in any successor of the job.- Return type:
- addFollowOn(followOnJob)[source]¶
Add a follow-on job.
Follow-on jobs will be run after the child jobs and their successors have been run.
- Returns:
followOnJob for call chaining
- Parameters:
followOnJob (JobType)
- Return type:
JobType
- rv(*path)[source]¶
Create a promise (
toil.job.Promise).The “promise” representing a return value of the job’s run method, or, in case of a function-wrapping job, the wrapped function’s return value.
- Parameters:
path ((Any)) – Optional path for selecting a component of the promised return value. If absent or empty, the entire return value will be used. Otherwise, the first element of the path is used to select an individual item of the return value. For that to work, the return value must be a list, dictionary or of any other type implementing the __getitem__() magic method. If the selected item is yet another composite value, the second element of the path can be used to select an item from it, and so on. For example, if the return value is [6,{‘a’:42}], .rv(0) would select 6 , rv(1) would select {‘a’:3} while rv(1,’a’) would select 3. To select a slice from a return value that is slicable, e.g. tuple or list, the path element should be a slice object. For example, assuming that the return value is [6, 7, 8, 9] then .rv(slice(1, 3)) would select [7, 8]. Note that slicing really only makes sense at the end of path.
- Returns:
A promise representing the return value of this jobs
toil.job.Job.run()method.- Return type:
- prepareForPromiseRegistration(jobStore)[source]¶
Set up to allow this job’s promises to register themselves.
Prepare this job (the promisor) so that its promises can register themselves with it, when the jobs they are promised to (promisees) are serialized.
The promissee holds the reference to the promise (usually as part of the job arguments) and when it is being pickled, so will the promises it refers to. Pickling a promise triggers it to be registered with the promissor.
- Parameters:
- Return type:
None
- __reduce__()[source]¶
Called during pickling to define the pickled representation of the job.
We don’t want to pickle our internal references to the job we encapsulate, so we elide them here. When actually run, we’re just a no-op job that can maybe chain.
- class toil.job.ServiceHostJob(service)[source]¶
Bases:
JobJob that runs a service. Used internally by Toil. Users should subclass Service instead of using this.
- Parameters:
service (Job)
- serviceModule¶
- property fileStore: toil.fileStores.abstractFileStore.AbstractFileStore¶
Return the file store, which the Service may need.
- Return type:
- addChild(child)[source]¶
Add a childJob to be run as child of this job.
Child jobs will be run directly after this job’s
toil.job.Job.run()method has completed.- Returns:
childJob: for call chaining
- Parameters:
child (Job)
- Return type:
NoReturn
- addFollowOn(followOn)[source]¶
Add a follow-on job.
Follow-on jobs will be run after the child jobs and their successors have been run.
- Returns:
followOnJob for call chaining
- Parameters:
followOn (Job)
- Return type:
NoReturn
- addService(service, parentService=None)[source]¶
Add a service.
The
toil.job.Job.Service.start()method of the service will be called after the run method has completed but before any successors are run. The service’stoil.job.Job.Service.stop()method will be called once the successors of the job have been run.Services allow things like databases and servers to be started and accessed by jobs in a workflow.
- Raises:
toil.job.JobException – If service has already been made the child of a job or another service.
- Parameters:
- Returns:
a promise that will be replaced with the return value from
toil.job.Job.Service.start()of service in any successor of the job.- Return type:
NoReturn
- saveBody(jobStore)[source]¶
Serialize the service itself before saving the host job’s body.
- Parameters:
- Return type:
None
- run(fileStore)[source]¶
Override this function to perform work and dynamically create successor jobs.
- Parameters:
fileStore (toil.fileStores.abstractFileStore.AbstractFileStore) – Used to create local and globally sharable temporary files and to send log messages to the leader process.
- Returns:
The return value of the function can be passed to other jobs by means of
toil.job.Job.rv().- Return type:
None
- class toil.job.FileMetadata[source]¶
Bases:
NamedTupleMetadata for a file. source is the URL to grab the file from parent_dir is parent directory of the source size is the size of the file. Is none if the filesize cannot be retrieved.
- toil.job.potential_absolute_uris(uri, path, importer=None, execution_dir=None)[source]¶
Get potential absolute URIs to check for an imported file.
Given a URI or bare path, yield in turn all the URIs, with schemes, where we should actually try to find it, given that we want to search under/against the given paths or URIs, the current directory, and the given importing WDL document if any.
- toil.job.get_file_sizes(filenames, search_paths=None, include_remote_files=True, execution_dir=None)[source]¶
Resolve relative-URI files in the given environment and turn them into absolute normalized URIs. Returns a dictionary of the string values from the WDL file values to a tuple of the normalized URI, parent directory ID, and size of the file. The size of the file may be None, which means unknown size.
- Parameters:
task_path – Dotted WDL name of the user-level code doing the importing (probably the workflow name).
search_paths (list[str] | None) – If set, try resolving input location relative to the URLs or directories in this list.
include_remote_files (bool) – If set, import files from remote locations. Else leave them as URI references.
execution_dir (str | None)
- Return type:
- class toil.job.CombineImportsJob(d, **kwargs)[source]¶
Bases:
JobCombine the outputs of multiple WorkerImportJobs into one promise
- Parameters:
d (collections.abc.Sequence[Promised[dict[str, toil.fileStores.FileID]]])
kwargs (Any)
- run(file_store)[source]¶
Merge the dicts
- Parameters:
file_store (toil.fileStores.abstractFileStore.AbstractFileStore)
- Return type:
Promised[dict[str, toil.fileStores.FileID]]
- class toil.job.WorkerImportJob(filenames, local=False, **kwargs)[source]¶
Bases:
JobJob to do file imports on a worker instead of a leader. Assumes all local and cloud files are accessible.
For the CWL/WDL runners, this class is only used when runImportsOnWorkers is enabled.
- filenames¶
- static import_files(files, file_source, symlink=True)[source]¶
Import a list of files into the jobstore. Returns a mapping of the filename to the associated FileIDs
When stream is true but the import is not streamable, the worker will run out of disk space and run a new import job with enough disk space instead.
- Parameters:
file_source (toil.jobStores.abstractJobStore.AbstractJobStore) – AbstractJobStore
symlink (bool) – whether to allow symlinking the imported files
- Returns:
Dictionary mapping filenames from files to associated jobstore FileID.
- Return type:
- run(file_store)[source]¶
Import the workflow inputs and then create and run the workflow. :return: Promise of workflow outputs
- Parameters:
file_store (toil.fileStores.abstractFileStore.AbstractFileStore)
- Return type:
Promised[dict[str, toil.fileStores.FileID]]
- class toil.job.ImportsJob(file_to_data, max_batch_size, import_worker_disk, **kwargs)[source]¶
Bases:
JobJob to organize and delegate files to individual WorkerImportJobs.
Only works on files of known size.
For the CWL/WDL runners, this is only used when runImportsOnWorkers is enabled
- Parameters:
file_to_data (dict[str, FileMetadata])
max_batch_size (int)
import_worker_disk (int)
kwargs (Any)
- run(file_store)[source]¶
Import the workflow inputs and then create and run the workflow.
The two parts of the return value must be used together and cannot be safely split apart. To look up a FileID from an original filename, you must first look up the FileMetadata in the second dict, extract its .source (the normalized candidate URI), and then look that up in the first dict.
- Returns:
Tuple of (candidate URI to FileID mapping, original filename to FileMetadata mapping). The candidate URI is stored in FileMetadata.source.
- Parameters:
file_store (toil.fileStores.abstractFileStore.AbstractFileStore)
- Return type:
tuple[Promised[dict[str, toil.fileStores.FileID]], dict[str, FileMetadata]]
- class toil.job.Promise(job, path)[source]¶
References a return value from a method as a promise before the method itself is run.
References a return value from a
toil.job.Job.run()ortoil.job.Job.Service.start()method as a promise before the method itself is run.Let T be a job. Instances of
Promise(termed a promise) are returned by T.rv(), which is used to reference the return value of T’s run function. When the promise is passed to the constructor (or as an argument to a wrapped function) of a different, successor job the promise will be replaced by the actual referenced return value. This mechanism allows a return values from one job’s run method to be input argument to job before the former job’s run function has been executed.- Parameters:
job (Job)
path (Any)
- filesToDelete: set[str]¶
A set of IDs of files containing promised values when we know we won’t need them anymore
- resolving = True¶
Set to False to disable promise resolution for debugging.
- job¶
- path¶
- __reduce__()[source]¶
Return the Promise class and construction arguments.
Called during pickling when a promise (an instance of this class) is about to be be pickled. Returns the Promise class and construction arguments that will be evaluated during unpickling, namely the job store coordinates of a file that will hold the promised return value. By the time the promise is about to be unpickled, that file should be populated.
- toil.job.T¶
- toil.job.Promised¶
- toil.job.unwrap(p)[source]¶
Function for ensuring you actually have a promised value, and not just a promise. Mostly useful for satisfying type-checking.
The “unwrap” terminology is borrowed from Rust.
- Parameters:
p (Promised[T])
- Return type:
T
- toil.job.unwrap_all(p)[source]¶
Function for ensuring you actually have a collection of promised values, and not any remaining promises. Mostly useful for satisfying type-checking.
The “unwrap” terminology is borrowed from Rust.
- Parameters:
p (collections.abc.Sequence[Promised[T]])
- Return type:
- class toil.job.PromisedRequirement(valueOrCallable, *args)[source]¶
Class for dynamically allocating job function resource requirements.
(involving
toil.job.Promiseinstances.)Use when resource requirements depend on the return value of a parent function. PromisedRequirements can be modified by passing a function that takes the
Promiseas input.For example, let f, g, and h be functions. Then a Toil workflow can be defined as follows:: A = Job.wrapFn(f) B = A.addChildFn(g, cores=PromisedRequirement(A.rv()) C = B.addChildFn(h, cores=PromisedRequirement(lambda x: 2*x, B.rv()))
- Parameters:
valueOrCallable (Any)
args (Any)