Job Methods API¶
Jobs are the units of work in Toil which are composed into workflows.
- class toil.job.Job(memory=None, cores=None, disk=None, accelerators=None, preemptible=None, preemptable=None, unitName='', checkpoint=False, displayName='', descriptionClass=None, local=None)[source]
Class represents a unit of work in toil.
- Parameters:
- __init__(memory=None, cores=None, disk=None, accelerators=None, preemptible=None, preemptable=None, unitName='', checkpoint=False, displayName='', descriptionClass=None, local=None)[source]
Job initializer.
This method must be called by any overriding constructor.
- Parameters:
memory (int or string convertible by toil.lib.conversions.human2bytes to an int) – the maximum number of bytes of memory the job will require to run.
cores (float, int, or string convertible by toil.lib.conversions.human2bytes to an int) – the number of CPU cores required.
disk (int or string convertible by toil.lib.conversions.human2bytes to an int) – the amount of local disk space required by the job, expressed in bytes.
accelerators (int, string, dict, or list of those. Strings and dicts must be parseable by parse_accelerator.) – the computational accelerators required by the job. If a string, can be a string of a number, or a string specifying a model, brand, or API (with optional colon-delimited count).
preemptible (bool, int in {0, 1}, or string in {'false', 'true'} in any case) – if the job can be run on a preemptible node.
preemptable (
Union
[str
,int
,bool
,None
]) – legacy preemptible parameter, for backwards compatibility with workflows not using the preemptible keywordunitName (str) – Human-readable name for this instance of the job.
checkpoint (bool) – if any of this job’s successor jobs completely fails, exhausting all their retries, remove any successor jobs and rerun this job to restart the subtree. Job must be a leaf vertex in the job graph when initially defined, see
toil.job.Job.checkNewCheckpointsAreCutVertices()
.displayName (str) – Human-readable job type display name.
descriptionClass (class) – Override for the JobDescription class used to describe the job.
local (
Optional
[bool
]) – if the job can be run on the leader.
- Return type:
None
- property jobStoreID: str | TemporaryID
Get the ID of this Job.
- Return type:
Union[str, TemporaryID]
- property description: JobDescription
Expose the JobDescription that describes this job.
- Return type:
- property disk: int
The maximum number of bytes of disk the job will require to run.
- Return type:
- property memory
The maximum number of bytes of memory the job will require to run.
- property accelerators: List[AcceleratorRequirement]
Any accelerators, such as GPUs, that are needed.
- Return type:
List[AcceleratorRequirement]
- assignConfig(config)[source]
Assign the given config object.
It will be used by various actions implemented inside the Job class.
- run(fileStore)[source]
Override this function to perform work and dynamically create successor jobs.
- Parameters:
fileStore (
AbstractFileStore
) – Used to create local and globally sharable temporary files and to send log messages to the leader process.- Return type:
- Returns:
The return value of the function can be passed to other jobs by means of
toil.job.Job.rv()
.
- addChild(childJob)[source]
Add a childJob to be run as child of this job.
Child jobs will be run directly after this job’s
toil.job.Job.run()
method has completed.
- hasChild(childJob)[source]
Check if childJob is already a child of this job.
- addFollowOn(followOnJob)[source]
Add a follow-on job.
Follow-on jobs will be run after the child jobs and their successors have been run.
- hasPredecessor(job)[source]
Check if a given job is already a predecessor of this job.
- hasFollowOn(followOnJob)[source]
Check if given job is already a follow-on of this job.
- addService(service, parentService=None)[source]
Add a service.
The
toil.job.Job.Service.start()
method of the service will be called after the run method has completed but before any successors are run. The service’stoil.job.Job.Service.stop()
method will be called once the successors of the job have been run.Services allow things like databases and servers to be started and accessed by jobs in a workflow.
- Raises:
toil.job.JobException – If service has already been made the child of a job or another service.
- Parameters:
service (Service) – Service to add.
parentService (
Optional
[Service]) – Service that will be started before ‘service’ is started. Allows trees of services to be established. parentService must be a service of this job.
- Return type:
Promise
- Returns:
a promise that will be replaced with the return value from
toil.job.Job.Service.start()
of service in any successor of the job.
- hasService(service)[source]
Return True if the given Service is a service of this job, and False otherwise.
- Parameters:
service (Service) –
- Return type:
- addChildFn(fn, *args, **kwargs)[source]
Add a function as a child job.
- Parameters:
fn (
Callable
) – Function to be run as a child job with*args
and**kwargs
as arguments to this function. See toil.job.FunctionWrappingJob for reserved keyword arguments used to specify resource requirements.- Return type:
- Returns:
The new child job that wraps fn.
- addFollowOnFn(fn, *args, **kwargs)[source]
Add a function as a follow-on job.
- Parameters:
fn (
Callable
) – Function to be run as a follow-on job with*args
and**kwargs
as arguments to this function. See toil.job.FunctionWrappingJob for reserved keyword arguments used to specify resource requirements.- Return type:
- Returns:
The new follow-on job that wraps fn.
- addChildJobFn(fn, *args, **kwargs)[source]
Add a job function as a child job.
See
toil.job.JobFunctionWrappingJob
for a definition of a job function.- Parameters:
fn (
Callable
) – Job function to be run as a child job with*args
and**kwargs
as arguments to this function. See toil.job.JobFunctionWrappingJob for reserved keyword arguments used to specify resource requirements.- Return type:
- Returns:
The new child job that wraps fn.
- addFollowOnJobFn(fn, *args, **kwargs)[source]
Add a follow-on job function.
See
toil.job.JobFunctionWrappingJob
for a definition of a job function.- Parameters:
fn (
Callable
) – Job function to be run as a follow-on job with*args
and**kwargs
as arguments to this function. See toil.job.JobFunctionWrappingJob for reserved keyword arguments used to specify resource requirements.- Return type:
- Returns:
The new follow-on job that wraps fn.
- property tempDir: str
Shortcut to calling
job.fileStore.getLocalTempDir()
.Temp dir is created on first call and will be returned for first and future calls :return: Path to tempDir. See job.fileStore.getLocalTempDir
- Return type:
- log(text, level=20)[source]
Log using
fileStore.logToMaster()
.
- static wrapFn(fn, *args, **kwargs)[source]
Makes a Job out of a function.
Convenience function for constructor of
toil.job.FunctionWrappingJob
.- Parameters:
fn – Function to be run with
*args
and**kwargs
as arguments. See toil.job.JobFunctionWrappingJob for reserved keyword arguments used to specify resource requirements.- Return type:
- Returns:
The new function that wraps fn.
- static wrapJobFn(fn, *args, **kwargs)[source]
Makes a Job out of a job function.
Convenience function for constructor of
toil.job.JobFunctionWrappingJob
.- Parameters:
fn – Job function to be run with
*args
and**kwargs
as arguments. See toil.job.JobFunctionWrappingJob for reserved keyword arguments used to specify resource requirements.- Return type:
- Returns:
The new job function that wraps fn.
- encapsulate(name=None)[source]
Encapsulates the job, see
toil.job.EncapsulatedJob
. Convenience function for constructor oftoil.job.EncapsulatedJob
.
- rv(*path)[source]
Create a promise (
toil.job.Promise
).The “promise” representing a return value of the job’s run method, or, in case of a function-wrapping job, the wrapped function’s return value.
- Parameters:
path ((Any)) – Optional path for selecting a component of the promised return value. If absent or empty, the entire return value will be used. Otherwise, the first element of the path is used to select an individual item of the return value. For that to work, the return value must be a list, dictionary or of any other type implementing the __getitem__() magic method. If the selected item is yet another composite value, the second element of the path can be used to select an item from it, and so on. For example, if the return value is [6,{‘a’:42}], .rv(0) would select 6 , rv(1) would select {‘a’:3} while rv(1,’a’) would select 3. To select a slice from a return value that is slicable, e.g. tuple or list, the path element should be a slice object. For example, assuming that the return value is [6, 7, 8, 9] then .rv(slice(1, 3)) would select [7, 8]. Note that slicing really only makes sense at the end of path.
- Return type:
- Returns:
A promise representing the return value of this jobs
toil.job.Job.run()
method.
- prepareForPromiseRegistration(jobStore)[source]
Set up to allow this job’s promises to register themselves.
Prepare this job (the promisor) so that its promises can register themselves with it, when the jobs they are promised to (promisees) are serialized.
The promissee holds the reference to the promise (usually as part of the job arguments) and when it is being pickled, so will the promises it refers to. Pickling a promise triggers it to be registered with the promissor.
- Parameters:
jobStore (
AbstractJobStore
) –- Return type:
- checkJobGraphForDeadlocks()[source]
Ensures that a graph of Jobs (that hasn’t yet been saved to the JobStore) doesn’t contain any pathological relationships between jobs that would result in deadlocks if we tried to run the jobs.
See
toil.job.Job.checkJobGraphConnected()
,toil.job.Job.checkJobGraphAcyclic()
andtoil.job.Job.checkNewCheckpointsAreLeafVertices()
for more info.- Raises:
toil.job.JobGraphDeadlockException – if the job graph is cyclic, contains multiple roots or contains checkpoint jobs that are not leaf vertices when defined (see
toil.job.Job.checkNewCheckpointsAreLeaves()
).
- getRootJobs()[source]
Return the set of root job objects that contain this job.
A root job is a job with no predecessors (i.e. which are not children, follow-ons, or services).
Only deals with jobs created here, rather than loaded from the job store.
- checkJobGraphConnected()[source]
- Raises:
toil.job.JobGraphDeadlockException – if
toil.job.Job.getRootJobs()
does not contain exactly one root job.
As execution always starts from one root job, having multiple root jobs will cause a deadlock to occur.
Only deals with jobs created here, rather than loaded from the job store.
- checkJobGraphAcylic()[source]
- Raises:
toil.job.JobGraphDeadlockException – if the connected component of jobs containing this job contains any cycles of child/followOn dependencies in the augmented job graph (see below). Such cycles are not allowed in valid job graphs.
A follow-on edge (A, B) between two jobs A and B is equivalent to adding a child edge to B from (1) A, (2) from each child of A, and (3) from the successors of each child of A. We call each such edge an edge an “implied” edge. The augmented job graph is a job graph including all the implied edges.
For a job graph G = (V, E) the algorithm is
O(|V|^2)
. It isO(|V| + |E|)
for a graph with no follow-ons. The former follow-on case could be improved!Only deals with jobs created here, rather than loaded from the job store.
- checkNewCheckpointsAreLeafVertices()[source]
A checkpoint job is a job that is restarted if either it fails, or if any of its successors completely fails, exhausting their retries.
A job is a leaf it is has no successors.
A checkpoint job must be a leaf when initially added to the job graph. When its run method is invoked it can then create direct successors. This restriction is made to simplify implementation.
Only works on connected components of jobs not yet added to the JobStore.
- Raises:
toil.job.JobGraphDeadlockException – if there exists a job being added to the graph for which checkpoint=True and which is not a leaf.
- Return type:
- defer(function, *args, **kwargs)[source]
Register a deferred function, i.e. a callable that will be invoked after the current attempt at running this job concludes. A job attempt is said to conclude when the job function (or the
toil.job.Job.run()
method for class-based jobs) returns, raises an exception or after the process running it terminates abnormally. A deferred function will be called on the node that attempted to run the job, even if a subsequent attempt is made on another node. A deferred function should be idempotent because it may be called multiple times on the same node or even in the same process. More than one deferred function may be registered per job attempt by calling this method repeatedly with different arguments. If the same function is registered twice with the same or different arguments, it will be called twice per job attempt.Examples for deferred functions are ones that handle cleanup of resources external to Toil, like Docker containers, files outside the work directory, etc.
- getTopologicalOrderingOfJobs()[source]
- Return type:
- Returns:
a list of jobs such that for all pairs of indices i, j for which i < j, the job at index i can be run before the job at index j.
Only considers jobs in this job’s subgraph that are newly added, not loaded from the job store.
Ignores service jobs.
- saveBody(jobStore)[source]
Save the execution data for just this job to the JobStore, and fill in the JobDescription with the information needed to retrieve it.
The Job’s JobDescription must have already had a real jobStoreID assigned to it.
Does not save the JobDescription.
- Parameters:
jobStore (
AbstractJobStore
) – The job store to save the job body into.- Return type:
- saveAsRootJob(jobStore)[source]
Save this job to the given jobStore as the root job of the workflow.
- Return type:
- Returns:
the JobDescription describing this job.
- Parameters:
jobStore (
AbstractJobStore
) –
- classmethod loadJob(jobStore, jobDescription)[source]
Retrieves a
toil.job.Job
instance from a JobStore- Parameters:
jobStore (
AbstractJobStore
) – The job store.jobDescription (
JobDescription
) – the JobDescription of the job to retrieve.
- Return type:
- Returns:
The job referenced by the JobDescription.
JobDescription¶
The class used to store all the information that the Toil Leader ever needs to know about a Job.
- class toil.job.JobDescription(requirements, jobName, unitName='', displayName='', command=None, local=None)[source]
Stores all the information that the Toil Leader ever needs to know about a Job.
(requirements information, dependency information, commands to issue, etc.)
Can be obtained from an actual (i.e. executable) Job object, and can be used to obtain the Job object from the JobStore.
Never contains other Jobs or JobDescriptions: all reference is by ID.
Subclassed into variants for checkpoint jobs and service jobs that have their specific parameters.
- Parameters:
- __init__(requirements, jobName, unitName='', displayName='', command=None, local=None)[source]
Create a new JobDescription.
- Parameters:
requirements (
Mapping
[str
,Union
[int
,str
,bool
]]) – Dict from string to number, string, or bool describing the resource requirements of the job. ‘cores’, ‘memory’, ‘disk’, and ‘preemptible’ fields, if set, are parsed and broken out into properties. If unset, the relevant property will be unspecified, and will be pulled from the assigned Config object if queried (seetoil.job.Requirer.assignConfig()
).jobName (
str
) – Name of the kind of job this is. May be used in job store IDs and logging. Also used to let the cluster scaler learn a model for how long the job will take. Ought to be the job class’s name if no real user-defined name is available.unitName (
Optional
[str
]) – Name of this instance of this kind of job. May appear with jobName in logging.displayName (
Optional
[str
]) – A human-readable name to identify this particular job instance. Ought to be the job class’s name if no real user-defined name is available.local (
Optional
[bool
]) – If True, the job is meant to use minimal resources but is sensitive to execution latency, and so should be executed by the leader.
- Return type:
None
- serviceHostIDsInBatches()[source]
Find all batches of service host job IDs that can be started at the same time.
(in the order they need to start in)
- successorsAndServiceHosts()[source]
Get an iterator over all child, follow-on, and service job IDs.
- allSuccessors()[source]
Get an iterator over all child, follow-on, and chained, inherited successor job IDs.
Follow-ons will come before children.
- successors_by_phase()[source]
Get an iterator over all child/follow-on/chained inherited successor job IDs, along with their phase numbere on the stack.
Phases ececute higher numbers to lower numbers.
- property services
Get a collection of the IDs of service host jobs for this job, in arbitrary order.
Will be empty if the job has no unfinished services.
- nextSuccessors()[source]
Return the collection of job IDs for the successors of this job that are ready to run.
If those jobs have multiple predecessor relationships, they may still be blocked on other jobs.
Returns None when at the final phase (all successors done), and an empty collection if there are more phases but they can’t be entered yet (e.g. because we are waiting for the job itself to run).
- filterSuccessors(predicate)[source]
Keep only successor jobs for which the given predicate function approves.
The predicate function is called with the job’s ID.
Treats all other successors as complete and forgets them.
- filterServiceHosts(predicate)[source]
Keep only services for which the given predicate approves.
The predicate function is called with the service host job’s ID.
Treats all other services as complete and forgets them.
- clear_nonexistent_dependents(job_store)[source]
Remove all references to child, follow-on, and associated service jobs that do not exist.
That is to say, all those that have been completed and removed.
- Parameters:
job_store (
AbstractJobStore
) –- Return type:
- is_subtree_done()[source]
Check if the subtree is done.
- Return type:
- Returns:
True if the job appears to be done, and all related child, follow-on, and service jobs appear to be finished and removed.
- replace(other)[source]
Take on the ID of another JobDescription, retaining our own state and type.
When updated in the JobStore, we will save over the other JobDescription.
Useful for chaining jobs: the chained-to job can replace the parent job.
Merges cleanup state and successors other than this job from the job being replaced into this one.
- Parameters:
other (
JobDescription
) – Job description to replace.- Return type:
- check_new_version(other)[source]
Make sure a prospective new version of the JobDescription is actually moving forward in time and not backward.
- Parameters:
other (
JobDescription
) –- Return type:
- addChild(childID)[source]
Make the job with the given ID a child of the described job.
- addFollowOn(followOnID)[source]
Make the job with the given ID a follow-on of the described job.
- addServiceHostJob(serviceID, parentServiceID=None)[source]
Make the ServiceHostJob with the given ID a service of the described job.
If a parent ServiceHostJob ID is given, that parent service will be started first, and must have already been added.
- hasChild(childID)[source]
Return True if the job with the given ID is a child of the described job.
- hasFollowOn(followOnID)[source]
Test if the job with the given ID is a follow-on of the described job.
- hasServiceHostJob(serviceID)[source]
Test if the ServiceHostJob is a service of the described job.
- Return type:
- renameReferences(renames)[source]
Apply the given dict of ID renames to all references to jobs.
Does not modify our own ID or those of finished predecessors. IDs not present in the renames dict are left as-is.
- Parameters:
renames (
Dict
[TemporaryID
,str
]) – Rename operations to apply.- Return type:
- addPredecessor()[source]
Notify the JobDescription that a predecessor has been added to its Job.
- Return type:
- onRegistration(jobStore)[source]
Perform setup work that requires the JobStore.
Called by the Job saving logic when this JobDescription meets the JobStore and has its ID assigned.
Overridden to perform setup work (like hooking up flag files for service jobs) that requires the JobStore.
- Parameters:
jobStore (
AbstractJobStore
) – The job store we are being placed into- Return type:
- setupJobAfterFailure(exit_status=None, exit_reason=None)[source]
Configure job after a failure.
Reduce the remainingTryCount if greater than zero and set the memory to be at least as big as the default memory (in case of exhaustion of memory, which is common).
Requires a configuration to have been assigned (see
toil.job.Requirer.assignConfig()
).- Parameters:
exit_reason (
Optional
[BatchJobExitReason
]) – The reason the job stopped, if available from the batch system.
- Return type:
- getLogFileHandle(jobStore)[source]
Create a context manager that yields a file handle to the log file.
Assumes logJobStoreFileID is set.
- property remainingTryCount
Get the number of tries remaining.
The try count set on the JobDescription, or the default based on the retry count from the config if none is set.
- clearRemainingTryCount()[source]
Clear remainingTryCount and set it back to its default value.
- Return type:
- Returns:
True if a modification to the JobDescription was made, and False otherwise.
- reserve_versions(count)[source]
Reserve a job version number for later, for journaling asynchronously.
- pre_update_hook()[source]
Run before pickling and saving a created or updated version of this job.
Called by the job store.
- Return type: