Toil API

This section describes the API for writing Toil workflows in Python.

Job methods

Jobs are the units of work in Toil which are composed into workflows.

class toil.job.Job(memory=None, cores=None, disk=None, preemptable=None, unitName=None, checkpoint=False)[source]

Class represents a unit of work in toil.

__init__(memory=None, cores=None, disk=None, preemptable=None, unitName=None, checkpoint=False)[source]

This method must be called by any overriding constructor.

Parameters:
  • memory (int or string convertable by bd2k.util.humanize.human2bytes to an int) – the maximum number of bytes of memory the job will require to run.
  • cores (int or string convertable by bd2k.util.humanize.human2bytes to an int) – the number of CPU cores required.
  • disk (int or string convertable by bd2k.util.humanize.human2bytes to an int) – the amount of local disk space required by the job, expressed in bytes.
  • preemptable (bool) – if the job can be run on a preemptable node.
  • checkpoint – if any of this job’s successor jobs completely fails, exhausting all their retries, remove any successor jobs and rerun this job to restart the subtree. Job must be a leaf vertex in the job graph when initially defined, see toil.job.Job.checkNewCheckpointsAreCutVertices().
run(fileStore)[source]

Override this function to perform work and dynamically create successor jobs.

Parameters:fileStore (toil.fileStore.FileStore) – Used to create local and globally sharable temporary files and to send log messages to the leader process.
Returns:The return value of the function can be passed to other jobs by means of toil.job.Job.rv().
addChild(childJob)[source]

Adds childJob to be run as child of this job. Child jobs will be run directly after this job’s toil.job.Job.run() method has completed.

Parameters:childJob (toil.job.Job) –
Returns:childJob
Return type:toil.job.Job
hasChild(childJob)[source]

Check if childJob is already a child of this job.

Parameters:childJob (toil.job.Job) –
Returns:True if childJob is a child of the job, else False.
Return type:bool
addFollowOn(followOnJob)[source]

Adds a follow-on job, follow-on jobs will be run after the child jobs and their successors have been run.

Parameters:followOnJob (toil.job.Job) –
Returns:followOnJob
Return type:toil.job.Job
addService(service, parentService=None)[source]

Add a service.

The toil.job.Job.Service.start() method of the service will be called after the run method has completed but before any successors are run. The service’s toil.job.Job.Service.stop() method will be called once the successors of the job have been run.

Services allow things like databases and servers to be started and accessed by jobs in a workflow.

Raises:

toil.job.JobException – If service has already been made the child of a job or another service.

Parameters:
  • service (toil.job.Job.Service) – Service to add.
  • parentService (toil.job.Job.Service) – Service that will be started before ‘service’ is started. Allows trees of services to be established. parentService must be a service of this job.
Returns:

a promise that will be replaced with the return value from toil.job.Job.Service.start() of service in any successor of the job.

Return type:

toil.job.Promise

addChildFn(fn, *args, **kwargs)[source]

Adds a function as a child job.

Parameters:fn – Function to be run as a child job with *args and **kwargs as arguments to this function. See toil.job.FunctionWrappingJob for reserved keyword arguments used to specify resource requirements.
Returns:The new child job that wraps fn.
Return type:toil.job.FunctionWrappingJob
addFollowOnFn(fn, *args, **kwargs)[source]

Adds a function as a follow-on job.

Parameters:fn – Function to be run as a follow-on job with *args and **kwargs as arguments to this function. See toil.job.FunctionWrappingJob for reserved keyword arguments used to specify resource requirements.
Returns:The new follow-on job that wraps fn.
Return type:toil.job.FunctionWrappingJob
addChildJobFn(fn, *args, **kwargs)[source]

Adds a job function as a child job. See toil.job.JobFunctionWrappingJob for a definition of a job function.

Parameters:fn – Job function to be run as a child job with *args and **kwargs as arguments to this function. See toil.job.JobFunctionWrappingJob for reserved keyword arguments used to specify resource requirements.
Returns:The new child job that wraps fn.
Return type:toil.job.JobFunctionWrappingJob
addFollowOnJobFn(fn, *args, **kwargs)[source]

Add a follow-on job function. See toil.job.JobFunctionWrappingJob for a definition of a job function.

Parameters:fn – Job function to be run as a follow-on job with *args and **kwargs as arguments to this function. See toil.job.JobFunctionWrappingJob for reserved keyword arguments used to specify resource requirements.
Returns:The new follow-on job that wraps fn.
Return type:toil.job.JobFunctionWrappingJob
static wrapFn(fn, *args, **kwargs)[source]

Makes a Job out of a function. Convenience function for constructor of toil.job.FunctionWrappingJob.

Parameters:fn – Function to be run with *args and **kwargs as arguments. See toil.job.JobFunctionWrappingJob for reserved keyword arguments used to specify resource requirements.
Returns:The new function that wraps fn.
Return type:toil.job.FunctionWrappingJob
static wrapJobFn(fn, *args, **kwargs)[source]

Makes a Job out of a job function. Convenience function for constructor of toil.job.JobFunctionWrappingJob.

Parameters:fn – Job function to be run with *args and **kwargs as arguments. See toil.job.JobFunctionWrappingJob for reserved keyword arguments used to specify resource requirements.
Returns:The new job function that wraps fn.
Return type:toil.job.JobFunctionWrappingJob
encapsulate()[source]

Encapsulates the job, see toil.job.EncapsulatedJob. Convenience function for constructor of toil.job.EncapsulatedJob.

Returns:an encapsulated version of this job.
Return type:toil.job.EncapsulatedJob
rv(*path)[source]

Creates a promise (toil.job.Promise) representing a return value of the job’s run method, or, in case of a function-wrapping job, the wrapped function’s return value.

Parameters:path ((Any)) – Optional path for selecting a component of the promised return value. If absent or empty, the entire return value will be used. Otherwise, the first element of the path is used to select an individual item of the return value. For that to work, the return value must be a list, dictionary or of any other type implementing the __getitem__() magic method. If the selected item is yet another composite value, the second element of the path can be used to select an item from it, and so on. For example, if the return value is [6,{‘a’:42}], .rv(0) would select 6 , rv(1) would select {‘a’:3} while rv(1,’a’) would select 3. To select a slice from a return value that is slicable, e.g. tuple or list, the path element should be a slice object. For example, assuming that the return value is [6, 7, 8, 9] then .rv(slice(1, 3)) would select [7, 8]. Note that slicing really only makes sense at the end of path.
Returns:A promise representing the return value of this jobs toil.job.Job.run() method.
Return type:toil.job.Promise
prepareForPromiseRegistration(jobStore)[source]

Ensure that a promise by this job (the promissor) can register with the promissor when another job referring to the promise (the promissee) is being serialized. The promissee holds the reference to the promise (usually as part of the the job arguments) and when it is being pickled, so will the promises it refers to. Pickling a promise triggers it to be registered with the promissor.

Returns:
checkJobGraphForDeadlocks()[source]

See toil.job.Job.checkJobGraphConnected(), toil.job.Job.checkJobGraphAcyclic() and toil.job.Job.checkNewCheckpointsAreLeafVertices() for more info.

Raises:toil.job.JobGraphDeadlockException – if the job graph is cyclic, contains multiple roots or contains checkpoint jobs that are not leaf vertices when defined (see toil.job.Job.checkNewCheckpointsAreLeaves()).
getRootJobs()[source]
Returns:The roots of the connected component of jobs that contains this job. A root is a job with no predecessors.

:rtype : set of toil.job.Job instances

checkJobGraphConnected()[source]
Raises:toil.job.JobGraphDeadlockException – if toil.job.Job.getRootJobs() does not contain exactly one root job.

As execution always starts from one root job, having multiple root jobs will cause a deadlock to occur.

checkJobGraphAcylic()[source]
Raises:toil.job.JobGraphDeadlockException – if the connected component of jobs containing this job contains any cycles of child/followOn dependencies in the augmented job graph (see below). Such cycles are not allowed in valid job graphs.

A follow-on edge (A, B) between two jobs A and B is equivalent to adding a child edge to B from (1) A, (2) from each child of A, and (3) from the successors of each child of A. We call each such edge an edge an “implied” edge. The augmented job graph is a job graph including all the implied edges.

For a job graph G = (V, E) the algorithm is O(|V|^2). It is O(|V| + |E|) for a graph with no follow-ons. The former follow-on case could be improved!

checkNewCheckpointsAreLeafVertices()[source]

A checkpoint job is a job that is restarted if either it fails, or if any of its successors completely fails, exhausting their retries.

A job is a leaf it is has no successors.

A checkpoint job must be a leaf when initially added to the job graph. When its run method is invoked it can then create direct successors. This restriction is made to simplify implementation.

Raises:toil.job.JobGraphDeadlockException – if there exists a job being added to the graph for which checkpoint=True and which is not a leaf.
defer(function, *args, **kwargs)[source]

Register a deferred function, i.e. a callable that will be invoked after the current attempt at running this job concludes. A job attempt is said to conclude when the job function (or the toil.job.Job.run() method for class-based jobs) returns, raises an exception or after the process running it terminates abnormally. A deferred function will be called on the node that attempted to run the job, even if a subsequent attempt is made on another node. A deferred function should be idempotent because it may be called multiple times on the same node or even in the same process. More than one deferred function may be registered per job attempt by calling this method repeatedly with different arguments. If the same function is registered twice with the same or different arguments, it will be called twice per job attempt.

Examples for deferred functions are ones that handle cleanup of resources external to Toil, like Docker containers, files outside the work directory, etc.

Parameters:
  • function (callable) – The function to be called after this job concludes.
  • args (list) – The arguments to the function
  • kwargs (dict) – The keyword arguments to the function
getTopologicalOrderingOfJobs()[source]
Returns:a list of jobs such that for all pairs of indices i, j for which i < j, the job at index i can be run before the job at index j.
Return type:list

Job.FileStore

The FileStore is an abstraction of a Toil run’s shared storage.

class toil.fileStore.FileStore(jobStore, jobGraph, localTempDir, inputBlockFn)[source]

An abstract base class to represent the interface between a worker and the job store. Concrete subclasses will be used to manage temporary files, read and write files from the job store and log messages, passed as argument to the toil.job.Job.run() method.

__init__(jobStore, jobGraph, localTempDir, inputBlockFn)[source]
open(*args, **kwds)[source]

The context manager used to conduct tasks prior-to, and after a job has been run.

Parameters:job (toil.job.Job) – The job instance of the toil job to run.
getLocalTempDir()[source]

Get a new local temporary directory in which to write files that persist for the duration of the job.

Returns:The absolute path to a new local temporary directory. This directory will exist for the duration of the job only, and is guaranteed to be deleted once the job terminates, removing all files it contains recursively.
Return type:str
getLocalTempFile()[source]

Get a new local temporary file that will persist for the duration of the job.

Returns:The absolute path to a local temporary file. This file will exist for the duration of the job only, and is guaranteed to be deleted once the job terminates.
Return type:str
getLocalTempFileName()[source]

Get a valid name for a new local file. Don’t actually create a file at the path.

Returns:Path to valid file
Return type:str
writeGlobalFile(localFileName, cleanup=False)[source]

Takes a file (as a path) and uploads it to the job store.

Parameters:
  • localFileName (string) – The path to the local file to upload.
  • cleanup (bool) – if True then the copy of the global file will be deleted once the job and all its successors have completed running. If not the global file must be deleted manually.
Returns:

an ID that can be used to retrieve the file.

Return type:

toil.fileStore.FileID

writeGlobalFileStream(cleanup=False)[source]

Similar to writeGlobalFile, but allows the writing of a stream to the job store. The yielded file handle does not need to and should not be closed explicitly.

Parameters:cleanup (bool) – is as in toil.fileStore.FileStore.writeGlobalFile().
Returns:A context manager yielding a tuple of 1) a file handle which can be written to and 2) the ID of the resulting file in the job store.
readGlobalFile(fileStoreID, userPath=None, cache=True, mutable=None)[source]

Downloads a file described by fileStoreID from the file store to the local directory.

If a user path is specified, it is used as the destination. If a user path isn’t specified, the file is stored in the local temp directory with an encoded name.

Parameters:
  • fileStoreID (toil.fileStore.FileID) – job store id for the file
  • userPath (string) – a path to the name of file to which the global file will be copied or hard-linked (see below).
  • cache (bool) – Described in toil.fileStore.CachingFileStore.readGlobalFile()
  • mutable (bool) – Described in toil.fileStore.CachingFileStore.readGlobalFile()
Returns:

An absolute path to a local, temporary copy of the file keyed by fileStoreID.

Return type:

str

readGlobalFileStream(fileStoreID)[source]

Similar to readGlobalFile, but allows a stream to be read from the job store. The yielded file handle does not need to and should not be closed explicitly.

Returns:a context manager yielding a file handle which can be read from.
deleteLocalFile(fileStoreID)[source]

Deletes Local copies of files associated with the provided job store ID.

Parameters:fileStoreID (str) – File Store ID of the file to be deleted.
deleteGlobalFile(fileStoreID)[source]

Deletes local files with the provided job store ID and then permanently deletes them from the job store. To ensure that the job can be restarted if necessary, the delete will not happen until after the job’s run method has completed.

Parameters:fileStoreID – the job store ID of the file to be deleted.
classmethod findAndHandleDeadJobs(nodeInfo, batchSystemShutdown=False)[source]

This function looks at the state of all jobs registered on the node and will handle them (clean up their presence ont he node, and run any registered defer functions)

Parameters:
  • nodeInfo – Information regarding the node required for identifying dead jobs.
  • batchSystemShutdown (bool) – Is the batch system in the process of shutting down?
logToMaster(text, level=20)[source]

Send a logging message to the leader. The message will also be logged by the worker at the same level.

Parameters:
  • text – The string to log.
  • level (int) – The logging level.
classmethod shutdown(dir_)[source]

Shutdown the filestore on this node.

This is intended to be called on batch system shutdown.

Parameters:dir – The jeystone directory containing the required information for fixing the state of failed workers on the node before cleaning up.

Job.Runner

The Runner contains the methods needed to configure and start a Toil run.

class Job.Runner[source]

Used to setup and run Toil workflow.

static getDefaultArgumentParser()[source]

Get argument parser with added toil workflow options.

Returns:The argument parser used by a toil workflow with added Toil options.
Return type:argparse.ArgumentParser
static getDefaultOptions(jobStore)[source]

Get default options for a toil workflow.

Parameters:jobStore (string) – A string describing the jobStore for the workflow.
Returns:The options used by a toil workflow.
Return type:argparse.ArgumentParser values object
static addToilOptions(parser)[source]

Adds the default toil options to an optparse or argparse parser object.

Parameters:parser (optparse.OptionParser or argparse.ArgumentParser) – Options object to add toil options to.
static startToil(job, options)[source]

Deprecated by toil.common.Toil.run. Runs the toil workflow using the given options (see Job.Runner.getDefaultOptions and Job.Runner.addToilOptions) starting with this job. :param toil.job.Job job: root job of the workflow :raises: toil.leader.FailedJobsException if at the end of function their remain failed jobs. :return: The return value of the root job’s run function. :rtype: Any

Toil

The Toil class provides for a more general way to configure and start a Toil run.

class toil.common.Toil(options)[source]

A context manager that represents a Toil workflow, specifically the batch system, job store, and its configuration.

__init__(options)[source]

Initialize a Toil object from the given options. Note that this is very light-weight and that the bulk of the work is done when the context is entered.

Parameters:options (argparse.Namespace) – command line options specified by the user
config = None
Type:toil.common.Config
start(rootJob)[source]

Invoke a Toil workflow with the given job as the root for an initial run. This method must be called in the body of a with Toil(...) as toil: statement. This method should not be called more than once for a workflow that has not finished.

Parameters:rootJob (toil.job.Job) – The root job of the workflow
Returns:The root job’s return value
restart()[source]

Restarts a workflow that has been interrupted. This method should be called if and only if a workflow has previously been started and has not finished.

Returns:The root job’s return value
classmethod getJobStore(locator)[source]

Create an instance of the concrete job store implementation that matches the given locator.

Parameters:locator (str) – The location of the job store to be represent by the instance
Returns:an instance of a concrete subclass of AbstractJobStore
Return type:toil.jobStores.abstractJobStore.AbstractJobStore
static createBatchSystem(config)[source]

Creates an instance of the batch system specified in the given config.

Parameters:config (toil.common.Config) – the current configuration
Return type:batchSystems.abstractBatchSystem.AbstractBatchSystem
Returns:an instance of a concrete subclass of AbstractBatchSystem
importFile(srcUrl, sharedFileName=None)[source]

Imports the file at the given URL into job store.

See toil.jobStores.abstractJobStore.AbstractJobStore.importFile() for a full description

exportFile(jobStoreFileID, dstUrl)[source]

Exports file to destination pointed at by the destination URL.

See toil.jobStores.abstractJobStore.AbstractJobStore.exportFile() for a full description

static getWorkflowDir(workflowID, configWorkDir=None)[source]

Returns a path to the directory where worker directories and the cache will be located for this workflow.

Parameters:
  • workflowID (str) – Unique identifier for the workflow
  • configWorkDir (str) – Value passed to the program using the –workDir flag
Returns:

Path to the workflow directory

Return type:

str

Job.Service

The Service class allows databases and servers to be spawned within a Toil workflow.

class Job.Service(memory=None, cores=None, disk=None, preemptable=None, unitName=None)[source]

Abstract class used to define the interface to a service.

__init__(memory=None, cores=None, disk=None, preemptable=None, unitName=None)[source]

Memory, core and disk requirements are specified identically to as in toil.job.Job.__init__().

start(job)[source]

Start the service.

Parameters:job (toil.job.Job) – The underlying job that is being run. Can be used to register deferred functions, or to access the fileStore for creating temporary files.
Returns:An object describing how to access the service. The object must be pickleable and will be used by jobs to access the service (see toil.job.Job.addService()).
stop(job)[source]

Stops the service. Function can block until complete.

Parameters:job (toil.job.Job) – The underlying job that is being run. Can be used to register deferred functions, or to access the fileStore for creating temporary files.
check()[source]

Checks the service is still running.

Raises:exceptions.RuntimeError – If the service failed, this will cause the service job to be labeled failed.
Returns:True if the service is still running, else False. If False then the service job will be terminated, and considered a success. Important point: if the service job exits due to a failure, it should raise a RuntimeError, not return False!

FunctionWrappingJob

The subclass of Job for wrapping user functions.

class toil.job.FunctionWrappingJob(userFunction, *args, **kwargs)[source]

Job used to wrap a function. In its run method the wrapped function is called.

__init__(userFunction, *args, **kwargs)[source]
Parameters:userFunction (callable) – The function to wrap. It will be called with *args and **kwargs as arguments.

The keywords memory, cores, disk, preemptable and checkpoint are reserved keyword arguments that if specified will be used to determine the resources required for the job, as toil.job.Job.__init__(). If they are keyword arguments to the function they will be extracted from the function definition, but may be overridden by the user (as you would expect).

JobFunctionWrappingJob

The subclass of FunctionWrappingJob for wrapping user job functions.

class toil.job.JobFunctionWrappingJob(userFunction, *args, **kwargs)[source]

A job function is a function whose first argument is a Job instance that is the wrapping job for the function. This can be used to add successor jobs for the function and perform all the functions the Job class provides.

To enable the job function to get access to the toil.fileStore.FileStore instance (see toil.job.Job.run()), it is made a variable of the wrapping job called fileStore.

To specify a job’s resource requirements the following default keyword arguments can be specified:

  • memory
  • disk
  • cores

For example to wrap a function into a job we would call:

Job.wrapJobFn(myJob, memory='100k', disk='1M', cores=0.1)

EncapsulatedJob

The subclass of Job for encapsulating a job, allowing a subgraph of jobs to be treated as a single job.

class toil.job.EncapsulatedJob(job)[source]

A convenience Job class used to make a job subgraph appear to be a single job.

Let A be the root job of a job subgraph and B be another job we’d like to run after A and all its successors have completed, for this use encapsulate:

#  Job A and subgraph, Job B
A, B = A(), B()
A' = A.encapsulate()
A'.addChild(B)
#  B will run after A and all its successors have completed, A and its subgraph of
# successors in effect appear to be just one job.

The return value of an encapsulatd job (as accessed by the toil.job.Job.rv() function) is the return value of the root job, e.g. A().encapsulate().rv() and A().rv() will resolve to the same value after A or A.encapsulate() has been run.

__init__(job)[source]
Parameters:job (toil.job.Job) – the job to encapsulate.

Promise

The class used to reference return values of jobs/services not yet run/started.

class toil.job.Promise(job, path)[source]

References a return value from a toil.job.Job.run() or toil.job.Job.Service.start() method as a promise before the method itself is run.

Let T be a job. Instances of Promise (termed a promise) are returned by T.rv(), which is used to reference the return value of T’s run function. When the promise is passed to the constructor (or as an argument to a wrapped function) of a different, successor job the promise will be replaced by the actual referenced return value. This mechanism allows a return values from one job’s run method to be input argument to job before the former job’s run function has been executed.

filesToDelete = set([])

A set of IDs of files containing promised values when we know we won’t need them anymore

__init__(job, path)[source]
Parameters:
  • job (Job) – the job whose return value this promise references
  • path – see Job.rv()
class toil.job.PromisedRequirement(valueOrCallable, *args)[source]
__init__(valueOrCallable, *args)[source]

Class for dynamically allocating job function resource requirements involving toil.job.Promise instances.

Use when resource requirements depend on the return value of a parent function. PromisedRequirements can be modified by passing a function that takes the Promise as input.

For example, let f, g, and h be functions. Then a Toil workflow can be defined as follows:: A = Job.wrapFn(f) B = A.addChildFn(g, cores=PromisedRequirement(A.rv()) C = B.addChildFn(h, cores=PromisedRequirement(lambda x: 2*x, B.rv()))

Parameters:
  • valueOrCallable – A single Promise instance or a function that takes *args as input parameters.
  • *args (int or Promise) – variable length argument list
getValue()[source]

Returns PromisedRequirement value

static convertPromises(kwargs)[source]

Returns True if reserved resource keyword is a Promise or PromisedRequirement instance. Converts Promise instance to PromisedRequirement.

Parameters:kwargs – function keyword arguments
Returns:bool

Exceptions

Toil specific exceptions.

exception toil.job.JobException(message)[source]

General job exception.

__init__(message)[source]
exception toil.job.JobGraphDeadlockException(string)[source]

An exception raised in the event that a workflow contains an unresolvable dependency, such as a cycle. See toil.job.Job.checkJobGraphForDeadlocks().

__init__(string)[source]
exception toil.jobStores.abstractJobStore.ConcurrentFileModificationException(jobStoreFileID)[source]

Indicates that the file was attempted to be modified by multiple processes at once.

__init__(jobStoreFileID)[source]
Parameters:jobStoreFileID (str) – the ID of the file that was modified by multiple workers or processes concurrently
exception toil.jobStores.abstractJobStore.JobStoreExistsException(locator)[source]

Indicates that the specified job store already exists.

__init__(locator)[source]
exception toil.jobStores.abstractJobStore.NoSuchFileException(jobStoreFileID, customName=None)[source]

Indicates that the specified file does not exist.

__init__(jobStoreFileID, customName=None)[source]
Parameters:
  • jobStoreFileID (str) – the ID of the file that was mistakenly assumed to exist
  • customName (str) – optionally, an alternate name for the nonexistent file
exception toil.jobStores.abstractJobStore.NoSuchJobException(jobStoreID)[source]

Indicates that the specified job does not exist.

__init__(jobStoreID)[source]
Parameters:jobStoreID (str) – the jobStoreID that was mistakenly assumed to exist
exception toil.jobStores.abstractJobStore.NoSuchJobStoreException(locator)[source]

Indicates that the specified job store does not exist.

__init__(locator)[source]