Developing a Python Workflow

This tutorial walks through the features of Toil necessary for developing a workflow using the Toil Python API.

Scripting Quick Start

To begin, consider this short Toil Python workflow which illustrates defining a workflow:

import os

from toil.common import Toil
from toil.job import Job
from toil.lib.io import mkdtemp


def helloWorld(message):
    return f"Hello, world!, here's a message: {message}"


if __name__ == "__main__":
    jobstore: str = mkdtemp("tutorial_quickstart")
    os.rmdir(jobstore)
    options = Job.Runner.getDefaultOptions(jobstore)
    options.logLevel = "OFF"
    options.clean = "always"

    hello_job = Job.wrapFn(helloWorld, "Woot")

    with Toil(options) as toil:
        print(toil.start(hello_job))  # prints "Hello, world!, ..."

The workflow consists of a single job. The resource requirements for that job are (optionally) specified by keyword arguments (memory, cores, disk). The workflow is run using toil.job.Job.Runner.getDefaultOptions(). Below we explain the components of this code in detail.

Job Basics

The atomic unit of work in a Toil workflow is a Job. User code extends this base class, or uses helper methods like toil.job.Job.addChildJobFn(), to define units of work. For example, here is a more long-winded class-based version of the job in the quick start example:

from toil.job import Job

class HelloWorld(Job):
    def __init__(self, message):
        Job.__init__(self,  memory="2G", cores=2, disk="3G")
        self.message = message

    def run(self, fileStore):
        return f"Hello, world! Here's a message: {self.message}"

In the example a class, HelloWorld, is defined. The constructor requests 2 gigabytes of memory, 2 cores and 3 gigabytes of local disk to complete the work.

The toil.job.Job.run() method is the function the user overrides to get work done. Here it just returns a message.

It is also possible to log a message using toil.job.Job.log(), which will be registered in the log output of the leader process of the workflow:

...
    def run(self, fileStore):
        self.log(f"Hello, world! Here's a message: {self.message}")

Invoking a Workflow

We can add to the previous example to turn it into a complete workflow by adding the necessary function calls to create an instance of HelloWorld and to run this as a workflow containing a single job. For example:

import os

from toil.common import Toil
from toil.job import Job
from toil.lib.io import mkdtemp


class HelloWorld(Job):
    def __init__(self, message):
        Job.__init__(self)
        self.message = message

    def run(self, fileStore):
        return f"Hello, world!, here's a message: {self.message}"


if __name__ == "__main__":
    jobstore: str = mkdtemp("tutorial_invokeworkflow")
    os.rmdir(jobstore)
    options = Job.Runner.getDefaultOptions(jobstore)
    options.logLevel = "OFF"
    options.clean = "always"

    hello_job = HelloWorld("Woot")

    with Toil(options) as toil:
        print(toil.start(hello_job))

Note

Do not include a . in the name of your python script (besides .py at the end). This is to allow toil to import the types and functions defined in your file while starting a new process.

This uses the toil.common.Toil class, which is used to run and resume Toil workflows. It is used as a context manager and allows for preliminary setup, such as staging of files into the job store on the leader node. An instance of the class is initialized by specifying an options object. The actual workflow is then invoked by calling the toil.common.Toil.start() method, passing the root job of the workflow, or, if a workflow is being restarted, toil.common.Toil.restart() should be used. Note that the context manager should have explicit if else branches addressing restart and non restart cases. The boolean value for these if else blocks is toil.options.restart.

For example:

import os

from toil.common import Toil
from toil.job import Job
from toil.lib.io import mkdtemp


class HelloWorld(Job):
    def __init__(self, message):
        Job.__init__(self)
        self.message = message

    def run(self, fileStore):
        return f"Hello, world!, I have a message: {self.message}"


if __name__ == "__main__":
    jobstore: str = mkdtemp("tutorial_invokeworkflow2")
    os.rmdir(jobstore)
    options = Job.Runner.getDefaultOptions(jobstore)
    options.logLevel = "INFO"
    options.clean = "always"

    with Toil(options) as toil:
        if not toil.options.restart:
            job = HelloWorld("Woot!")
            output = toil.start(job)
        else:
            output = toil.restart()
    print(output)

The call to toil.job.Job.Runner.getDefaultOptions() creates a set of default options for the workflow. The only argument is a description of how to store the workflow’s state in what we call a job-store. Here the job-store is contained in a directory within the current working directory called “toilWorkflowRun”. Alternatively this string can encode other ways to store the necessary state, e.g. an S3 bucket object store location. By default the job-store is deleted if the workflow completes successfully.

The workflow is executed in the final line, which creates an instance of HelloWorld and runs it as a workflow. Note all Toil workflows start from a single starting job, referred to as the root job. The return value of the root job is returned as the result of the completed workflow (see promises below to see how this is a useful feature!).

Specifying Commandline Arguments

To allow command line control of the options we can use the toil.job.Job.Runner.getDefaultArgumentParser() method to create a argparse.ArgumentParser object which can be used to parse command line options for a Toil Python workflow. For example:

from toil.common import Toil
from toil.job import Job


class HelloWorld(Job):
    def __init__(self, message):
        Job.__init__(self)
        self.message = message

    def run(self, fileStore):
        return "Hello, world!, here's a message: %s" % self.message


if __name__ == "__main__":
    parser = Job.Runner.getDefaultArgumentParser()
    options = parser.parse_args()
    options.logLevel = "OFF"
    options.clean = "always"

    hello_job = HelloWorld("Woot")

    with Toil(options) as toil:
        print(toil.start(hello_job))

This creates a fully fledged Toil Python workflow with all the options Toil exposes as command line arguments. Running this program with --help will print the full list of options.

Alternatively an existing argparse.ArgumentParser object can have Toil command line options added to it with the toil.job.Job.Runner.addToilOptions() method.

Resuming a Workflow

In the event that a workflow fails, either because of programmatic error within the jobs being run, or because of node failure, the workflow can be resumed. Workflows can only not be reliably resumed if the job-store itself becomes corrupt.

Critical to resumption is that jobs can be rerun, even if they have apparently completed successfully. Put succinctly, a user defined job should not corrupt its input arguments. That way, regardless of node, network or leader failure the job can be restarted and the workflow resumed.

To resume a workflow specify the “restart” option in the options object passed to toil.common.Toil.start(). If node failures are expected it can also be useful to use the integer “retryCount” option, which will attempt to rerun a job retryCount number of times before marking it fully failed.

In the common scenario that a small subset of jobs fail (including retry attempts) within a workflow Toil will continue to run other jobs until it can do no more, at which point toil.common.Toil.start() will raise a toil.exceptions.FailedJobsException exception. Typically at this point the user can decide to fix the script and resume the workflow or delete the job-store manually and rerun the complete workflow.

Functions and Job Functions

Defining jobs by creating class definitions generally involves the boilerplate of creating a constructor. To avoid this the classes toil.job.FunctionWrappingJob and toil.job.JobFunctionWrappingTarget allow functions to be directly converted to jobs. For example, the quick start example (repeated here):

import os

from toil.common import Toil
from toil.job import Job
from toil.lib.io import mkdtemp


def helloWorld(message):
    return f"Hello, world!, here's a message: {message}"


if __name__ == "__main__":
    jobstore: str = mkdtemp("tutorial_quickstart")
    os.rmdir(jobstore)
    options = Job.Runner.getDefaultOptions(jobstore)
    options.logLevel = "OFF"
    options.clean = "always"

    hello_job = Job.wrapFn(helloWorld, "Woot")

    with Toil(options) as toil:
        print(toil.start(hello_job))  # prints "Hello, world!, ..."

Is equivalent to the previous example, but using a function to define the job.

The function call:

Job.wrapFn(helloWorld, "Woot")

Creates the instance of the toil.job.FunctionWrappingTarget that wraps the function.

The keyword arguments memory, cores and disk allow resource requirements to be specified as before. Even if they are not included as keyword arguments within a function header they can be passed as arguments when wrapping a function as a job and will be used to specify resource requirements.

We can also use the function wrapping syntax to a job function, a function whose first argument is a reference to the wrapping job. Just like a self argument in a class, this allows access to the methods of the wrapping job, see toil.job.JobFunctionWrappingTarget. For example:

import os

from toil.common import Toil
from toil.job import Job
from toil.lib.io import mkdtemp


def helloWorld(job, message):
    job.log(f"Hello world, I have a message: {message}")


if __name__ == "__main__":
    jobstore: str = mkdtemp("tutorial_jobfunctions")
    os.rmdir(jobstore)
    options = Job.Runner.getDefaultOptions(jobstore)
    options.logLevel = "INFO"
    options.clean = "always"

    hello_job = Job.wrapJobFn(helloWorld, "Woot!")

    with Toil(options) as toil:
        toil.start(hello_job)

Here helloWorld() is a job function. It uses the toil.job.Job.log() to log a message that will be printed to the output console. Here the only subtle difference to note is the line:

hello_job = Job.wrapJobFn(helloWorld, "Woot")

Which uses the function toil.job.Job.wrapJobFn() to wrap the job function instead of toil.job.Job.wrapFn() which wraps a vanilla function.

Workflows with Multiple Jobs

A parent job can have child jobs and follow-on jobs. These relationships are specified by methods of the job class, e.g. toil.job.Job.addChild() and toil.job.Job.addFollowOn().

Considering a set of jobs the nodes in a job graph and the child and follow-on relationships the directed edges of the graph, we say that a job B that is on a directed path of child/follow-on edges from a job A in the job graph is a successor of A, similarly A is a predecessor of B.

A parent job’s child jobs are run directly after the parent job has completed, and in parallel. The follow-on jobs of a job are run after its child jobs and their successors have completed. They are also run in parallel. Follow-ons allow the easy specification of cleanup tasks that happen after a set of parallel child tasks. The following shows a simple example that uses the earlier helloWorld() job function:

from toil.common import Toil
from toil.job import Job


def helloWorld(job, message):
    job.log(f"Hello world, I have a message: {message}")


if __name__ == "__main__":
    parser = Job.Runner.getDefaultArgumentParser()
    options = parser.parse_args()
    options.logLevel = "INFO"
    options.clean = "always"

    j1 = Job.wrapJobFn(helloWorld, "first")
    j2 = Job.wrapJobFn(helloWorld, "second or third")
    j3 = Job.wrapJobFn(helloWorld, "second or third")
    j4 = Job.wrapJobFn(helloWorld, "last")

    j1.addChild(j2)
    j1.addChild(j3)
    j1.addFollowOn(j4)

    with Toil(options) as toil:
        toil.start(j1)

In the example four jobs are created, first j1 is run, then j2 and j3 are run in parallel as children of j1, finally j4 is run as a follow-on of j1.

There are multiple short hand functions to achieve the same workflow, for example:

from toil.common import Toil
from toil.job import Job


def helloWorld(job, message):
    job.log(f"Hello world, I have a message: {message}")


if __name__ == "__main__":
    parser = Job.Runner.getDefaultArgumentParser()
    options = parser.parse_args()
    options.logLevel = "INFO"
    options.clean = "always"

    j1 = Job.wrapJobFn(helloWorld, "first")
    j2 = j1.addChildJobFn(helloWorld, "second or third")
    j3 = j1.addChildJobFn(helloWorld, "second or third")
    j4 = j1.addFollowOnJobFn(helloWorld, "last")

    with Toil(options) as toil:
        toil.start(j1)

Equivalently defines the workflow, where the functions toil.job.Job.addChildJobFn() and toil.job.Job.addFollowOnJobFn() are used to create job functions as children or follow-ons of an earlier job.

Jobs graphs are not limited to trees, and can express arbitrary directed acyclic graphs. For a precise definition of legal graphs see toil.job.Job.checkJobGraphForDeadlocks(). The previous example could be specified as a DAG as follows:

from toil.common import Toil
from toil.job import Job


def helloWorld(job, message):
    job.log(f"Hello world, I have a message: {message}")


if __name__ == "__main__":
    parser = Job.Runner.getDefaultArgumentParser()
    options = parser.parse_args()
    options.logLevel = "INFO"
    options.clean = "always"

    j1 = Job.wrapJobFn(helloWorld, "first")
    j2 = j1.addChildJobFn(helloWorld, "second or third")
    j3 = j1.addChildJobFn(helloWorld, "second or third")
    j4 = j2.addChildJobFn(helloWorld, "last")
    j3.addChild(j4)

    with Toil(options) as toil:
        toil.start(j1)

Note the use of an extra child edge to make j4 a child of both j2 and j3.

Dynamic Job Creation

The previous examples show a workflow being defined outside of a job. However, Toil also allows jobs to be created dynamically within jobs. For example:

import os

from toil.common import Toil
from toil.job import Job
from toil.lib.io import mkdtemp


def binaryStringFn(job, depth, message=""):
    if depth > 0:
        job.addChildJobFn(binaryStringFn, depth-1, message + "0")
        job.addChildJobFn(binaryStringFn, depth-1, message + "1")
    else:
        job.log(f"Binary string: {message}")


if __name__ == "__main__":
    jobstore: str = mkdtemp("tutorial_dynamic")
    os.rmdir(jobstore)
    options = Job.Runner.getDefaultOptions(jobstore)
    options.logLevel = "INFO"
    options.clean = "always"

    with Toil(options) as toil:
        toil.start(Job.wrapJobFn(binaryStringFn, depth=5))

The job function binaryStringFn logs all possible binary strings of length n (here n=5), creating a total of 2^(n+2) - 1 jobs dynamically and recursively. Static and dynamic creation of jobs can be mixed in a Toil workflow, with jobs defined within a job or job function being created at run time.

Promises

The previous example of dynamic job creation shows variables from a parent job being passed to a child job. Such forward variable passing is naturally specified by recursive invocation of successor jobs within parent jobs. This can also be achieved statically by passing around references to the return variables of jobs. In Toil this is achieved with promises, as illustrated in the following example:

import os

from toil.common import Toil
from toil.job import Job
from toil.lib.io import mkdtemp


def fn(job, i):
    job.log("i is: %s" % i, level=100)
    return i + 1


if __name__ == "__main__":
    jobstore: str = mkdtemp("tutorial_promises")
    os.rmdir(jobstore)
    options = Job.Runner.getDefaultOptions(jobstore)
    options.logLevel = "INFO"
    options.clean = "always"

    j1 = Job.wrapJobFn(fn, 1)
    j2 = j1.addChildJobFn(fn, j1.rv())
    j3 = j1.addFollowOnJobFn(fn, j2.rv())

    with Toil(options) as toil:
        toil.start(j1)

Running this workflow results in three log messages from the jobs: i is 1 from j1, i is 2 from j2 and i is 3 from j3.

The return value from the first job is promised to the second job by the call to toil.job.Job.rv() in the following line:

j2 = j1.addChildFn(fn, j1.rv())

The value of j1.rv() is a promise, rather than the actual return value of the function, because j1 for the given input has at that point not been evaluated. A promise (toil.job.Promise) is essentially a pointer to for the return value that is replaced by the actual return value once it has been evaluated. Therefore, when j2 is run the promise becomes 2.

Promises also support indexing of return values:

def parent(job):
    indexable = Job.wrapJobFn(fn)
    job.addChild(indexable)
    job.addFollowOnFn(raiseWrap, indexable.rv(2))

def raiseWrap(arg):
    raise RuntimeError(arg) # raises "2"

def fn(job):
    return (0, 1, 2, 3)

Promises can be quite useful. For example, we can combine dynamic job creation with promises to achieve a job creation process that mimics the functional patterns possible in many programming languages:

import os

from toil.common import Toil
from toil.job import Job
from toil.lib.io import mkdtemp


def binaryStrings(job, depth, message=""):
    if depth > 0:
        s = [job.addChildJobFn(binaryStrings, depth - 1, message + "0").rv(),
             job.addChildJobFn(binaryStrings, depth - 1, message + "1").rv()]
        return job.addFollowOnFn(merge, s).rv()
    return [message]


def merge(strings):
    return strings[0] + strings[1]


if __name__ == "__main__":
    jobstore: str = mkdtemp("tutorial_promises2")
    os.rmdir(jobstore)
    options = Job.Runner.getDefaultOptions(jobstore)
    options.loglevel = "OFF"
    options.clean = "always"

    with Toil(options) as toil:
        print(toil.start(Job.wrapJobFn(binaryStrings, depth=5)))

The return value l of the workflow is a list of all binary strings of length 10, computed recursively. Although a toy example, it demonstrates how closely Toil workflows can mimic typical programming patterns.

Promised Requirements

Promised requirements are a special case of Promises that allow a job’s return value to be used as another job’s resource requirements.

This is useful when, for example, a job’s storage requirement is determined by a file staged to the job store by an earlier job:

import os

from toil.common import Toil
from toil.job import Job, PromisedRequirement
from toil.lib.io import mkdtemp


def parentJob(job):
    downloadJob = Job.wrapJobFn(stageFn, "file://" + os.path.realpath(__file__), cores=0.1, memory='32M', disk='1M')
    job.addChild(downloadJob)

    analysis = Job.wrapJobFn(analysisJob,
                             fileStoreID=downloadJob.rv(0),
                             disk=PromisedRequirement(downloadJob.rv(1)))
    job.addFollowOn(analysis)


def stageFn(job, url):
    importedFile = job.fileStore.import_file(url)
    return importedFile, importedFile.size


def analysisJob(job, fileStoreID):
    # now do some analysis on the file
    pass


if __name__ == "__main__":
    jobstore: str = mkdtemp("tutorial_requirements")
    os.rmdir(jobstore)
    options = Job.Runner.getDefaultOptions(jobstore)
    options.logLevel = "INFO"
    options.clean = "always"

    with Toil(options) as toil:
        toil.start(Job.wrapJobFn(parentJob))

Note that this also makes use of the size attribute of the FileID object. This promised requirements mechanism can also be used in combination with an aggregator for multiple jobs’ output values:

def parentJob(job):
    aggregator = []
    for fileNum in range(0, 10):
        downloadJob = Job.wrapJobFn(stageFn, "file://" + os.path.realpath(__file__), cores=0.1, memory='32M', disk='1M')
        job.addChild(downloadJob)
        aggregator.append(downloadJob)

    analysis = Job.wrapJobFn(analysisJob,
                             fileStoreID=downloadJob.rv(0),
                             disk=PromisedRequirement(lambda xs: sum(xs), [j.rv(1) for j in aggregator]))
    job.addFollowOn(analysis)

Limitations

Just like regular promises, the return value must be determined prior to scheduling any job that depends on the return value. In our example above, notice how the dependent jobs were follow ons to the parent while promising jobs are children of the parent. This ordering ensures that all promises are properly fulfilled.

FileID

The toil.fileStore.FileID class is a small wrapper around Python’s builtin string class. It is used to represent a file’s ID in the file store, and has a size attribute that is the file’s size in bytes. This object is returned by importFile and writeGlobalFile.

Managing files within a workflow

It is frequently the case that a workflow will want to create files, both persistent and temporary, during its run. The toil.fileStores.abstractFileStore.AbstractFileStore class is used by jobs to manage these files in a manner that guarantees cleanup and resumption on failure.

The toil.job.Job.run() method has a file store instance as an argument. The following example shows how this can be used to create temporary files that persist for the length of the job, be placed in a specified local disk of the node and that will be cleaned up, regardless of failure, when the job finishes:

import os

from toil.common import Toil
from toil.job import Job
from toil.lib.io import mkdtemp


class LocalFileStoreJob(Job):
    def run(self, fileStore):
        # self.tempDir will always contain the name of a directory within the allocated disk space reserved for the job
        scratchDir = self.tempDir

        # Similarly create a temporary file.
        scratchFile = fileStore.getLocalTempFile()


if __name__ == "__main__":
    jobstore: str = mkdtemp("tutorial_managing")
    os.rmdir(jobstore)
    options = Job.Runner.getDefaultOptions(jobstore)
    options.logLevel = "INFO"
    options.clean = "always"

    # Create an instance of FooJob which will have at least 2 gigabytes of storage space.
    j = LocalFileStoreJob(disk="2G")

    # Run the workflow
    with Toil(options) as toil:
        toil.start(j)

Job functions can also access the file store for the job. The equivalent of the LocalFileStoreJob class is

def localFileStoreJobFn(job):
    scratchDir = job.tempDir
    scratchFile = job.fileStore.getLocalTempFile()

Note that the fileStore attribute is accessed as an attribute of the job argument.

In addition to temporary files that exist for the duration of a job, the file store allows the creation of files in a global store, which persists during the workflow and are globally accessible (hence the name) between jobs. For example:

import os

from toil.common import Toil
from toil.job import Job
from toil.lib.io import mkdtemp


def globalFileStoreJobFn(job):
    job.log("The following example exercises all the methods provided "
            "by the toil.fileStores.abstractFileStore.AbstractFileStore class")

    # Create a local temporary file.
    scratchFile = job.fileStore.getLocalTempFile()

    # Write something in the scratch file.
    with open(scratchFile, 'w') as fH:
        fH.write("What a tangled web we weave")

    # Write a copy of the file into the file-store; fileID is the key that can be used to retrieve the file.
    # This write is asynchronous by default
    fileID = job.fileStore.writeGlobalFile(scratchFile)

    # Write another file using a stream; fileID2 is the
    # key for this second file.
    with job.fileStore.writeGlobalFileStream(cleanup=True) as (fH, fileID2):
        fH.write(b"Out brief candle")

    # Now read the first file; scratchFile2 is a local copy of the file that is read-only by default.
    scratchFile2 = job.fileStore.readGlobalFile(fileID)

    # Read the second file to a desired location: scratchFile3.
    scratchFile3 = os.path.join(job.tempDir, "foo.txt")
    job.fileStore.readGlobalFile(fileID2, userPath=scratchFile3)

    # Read the second file again using a stream.
    with job.fileStore.readGlobalFileStream(fileID2) as fH:
        print(fH.read())  # This prints "Out brief candle"

    # Delete the first file from the global file-store.
    job.fileStore.deleteGlobalFile(fileID)

    # It is unnecessary to delete the file keyed by fileID2 because we used the cleanup flag,
    # which removes the file after this job and all its successors have run (if the file still exists)


if __name__ == "__main__":
    jobstore: str = mkdtemp("tutorial_managing2")
    os.rmdir(jobstore)
    options = Job.Runner.getDefaultOptions(jobstore)
    options.logLevel = "INFO"
    options.clean = "always"

    with Toil(options) as toil:
        toil.start(Job.wrapJobFn(globalFileStoreJobFn))

The example demonstrates the global read, write and delete functionality of the file-store, using both local copies of the files and streams to read and write the files. It covers all the methods provided by the file store interface.

What is obvious is that the file-store provides no functionality to update an existing “global” file, meaning that files are, barring deletion, immutable. Also worth noting is that there is no file system hierarchy for files in the global file store. These limitations allow us to fairly easily support different object stores and to use caching to limit the amount of network file transfer between jobs.

Staging of Files into the Job Store

External files can be imported into or exported out of the job store prior to running a workflow when the toil.common.Toil context manager is used on the leader. The context manager provides methods toil.common.Toil.importFile(), and toil.common.Toil.exportFile() for this purpose. The destination and source locations of such files are described with URLs passed to the two methods. Local files can be imported and exported as relative paths, and should be relative to the directory where the toil workflow is initially run from.

Using absolute paths and appropriate schema where possible (prefixing with “file://” or “s3:/” for example), make imports and exports less ambiguous and is recommended.

A list of the currently supported URLs can be found at toil.jobStores.abstractJobStore.AbstractJobStore.importFile(). To import an external file into the job store as a shared file, pass the optional sharedFileName parameter to that method.

If a workflow fails for any reason an imported file acts as any other file in the job store. If the workflow was configured such that it not be cleaned up on a failed run, the file will persist in the job store and needs not be staged again when the workflow is resumed.

Example:

import os

from toil.common import Toil
from toil.job import Job
from toil.lib.io import mkdtemp


class HelloWorld(Job):
    def __init__(self, id):
        Job.__init__(self)
        self.inputFileID = id

    def run(self, fileStore):
        with fileStore.readGlobalFileStream(self.inputFileID, encoding='utf-8') as fi:
            with fileStore.writeGlobalFileStream(encoding='utf-8') as (fo, outputFileID):
                fo.write(fi.read() + 'World!')
        return outputFileID


if __name__ == "__main__":
    jobstore: str = mkdtemp("tutorial_staging")
    os.rmdir(jobstore)
    options = Job.Runner.getDefaultOptions(jobstore)
    options.logLevel = "INFO"
    options.clean = "always"

    with Toil(options) as toil:
        if not toil.options.restart:
            ioFileDirectory = os.path.join(os.path.dirname(os.path.abspath(__file__)), "stagingExampleFiles")
            inputFileID = toil.importFile("file://" + os.path.abspath(os.path.join(ioFileDirectory, "in.txt")))
            outputFileID = toil.start(HelloWorld(inputFileID))
        else:
            outputFileID = toil.restart()

        toil.exportFile(outputFileID, "file://" + os.path.abspath(os.path.join(ioFileDirectory, "out.txt")))

Using Docker Containers in Toil

Docker containers are commonly used with Toil. The combination of Toil and Docker allows for pipelines to be fully portable between any platform that has both Toil and Docker installed. Docker eliminates the need for the user to do any other tool installation or environment setup.

In order to use Docker containers with Toil, Docker must be installed on all workers of the cluster. Instructions for installing Docker can be found on the Docker website.

When using Toil-based autoscaling, Docker will be automatically set up on the cluster’s worker nodes, so no additional installation steps are necessary. Further information on using Toil-based autoscaling can be found in the Running a Workflow with Autoscaling documentation.

In order to use docker containers in a Toil workflow, the container can be built locally or downloaded in real time from an online docker repository like Quay. If the container is not in a repository, the container’s layers must be accessible on each node of the cluster.

When invoking docker containers from within a Toil workflow, it is strongly recommended that you use dockerCall(), a toil job function provided in toil.lib.docker. dockerCall leverages docker’s own python API, and provides container cleanup on job failure. When docker containers are run without this feature, failed jobs can result in resource leaks. Docker’s API can be found at docker-py.

In order to use dockerCall, your installation of Docker must be set up to run without sudo. Instructions for setting this up can be found here.

An example of a basic dockerCall is below:

dockerCall(job=job,
            tool='quay.io/ucsc_cgl/bwa',
            workDir=job.tempDir,
            parameters=['index', '/data/reference.fa'])

Note the assumption that reference.fa file is located in /data. This is Toil’s standard convention as a mount location to reduce boilerplate when calling dockerCall. Users can choose their own mount locations by supplying a volumes kwarg to dockerCall, such as: volumes={working_dir: {‘bind’: ‘/data’, ‘mode’: ‘rw’}}, where working_dir is an absolute path on the user’s filesystem.

dockerCall can also be added to workflows like any other job function:

import os

from toil.common import Toil
from toil.job import Job
from toil.lib.docker import apiDockerCall
from toil.lib.io import mkdtemp

align = Job.wrapJobFn(apiDockerCall,
                      image='ubuntu',
                      working_dir=os.getcwd(),
                      parameters=['ls', '-lha'])

if __name__ == "__main__":
    jobstore: str = mkdtemp("tutorial_docker")
    os.rmdir(jobstore)
    options = Job.Runner.getDefaultOptions(jobstore)
    options.logLevel = "INFO"
    options.clean = "always"

    with Toil(options) as toil:
        toil.start(align)

cgl-docker-lib contains dockerCall-compatible Dockerized tools that are commonly used in bioinformatics analysis.

The documentation provides guidelines for developing your own Docker containers that can be used with Toil and dockerCall. In order for a container to be compatible with dockerCall, it must have an ENTRYPOINT set to a wrapper script, as described in cgl-docker-lib containerization standards. This can be set by passing in the optional keyword argument, ‘entrypoint’. Example:

entrypoint=[“/bin/bash”,”-c”]

dockerCall supports currently the 75 keyword arguments found in the python Docker API, under the ‘run’ command.

Services

It is sometimes desirable to run services, such as a database or server, concurrently with a workflow. The toil.job.Job.Service class provides a simple mechanism for spawning such a service within a Toil workflow, allowing precise specification of the start and end time of the service, and providing start and end methods to use for initialization and cleanup. The following simple, conceptual example illustrates how services work:

import os

from toil.common import Toil
from toil.job import Job
from toil.lib.io import mkdtemp


class DemoService(Job.Service):
    def start(self, fileStore):
        # Start up a database/service here
        # Return a value that enables another process to connect to the database
        return "loginCredentials"

    def check(self):
        # A function that if it returns False causes the service to quit
        # If it raises an exception the service is killed and an error is reported
        return True

    def stop(self, fileStore):
        # Cleanup the database here
        pass


j = Job()
s = DemoService()
loginCredentialsPromise = j.addService(s)


def dbFn(loginCredentials):
    # Use the login credentials returned from the service's start method to connect to the service
    pass


j.addChildFn(dbFn, loginCredentialsPromise)


if __name__ == "__main__":
    jobstore: str = mkdtemp("tutorial_services")
    os.rmdir(jobstore)
    options = Job.Runner.getDefaultOptions(jobstore)
    options.logLevel = "INFO"
    options.clean = "always"

    with Toil(options) as toil:
        toil.start(j)

In this example the DemoService starts a database in the start method, returning an object from the start method indicating how a client job would access the database. The service’s stop method cleans up the database, while the service’s check method is polled periodically to check the service is alive.

A DemoService instance is added as a service of the root job j, with resource requirements specified. The return value from toil.job.Job.addService() is a promise to the return value of the service’s start method. When the promised is fulfilled it will represent how to connect to the database. The promise is passed to a child job of j, which uses it to make a database connection. The services of a job are started before any of its successors have been run and stopped after all the successors of the job have completed successfully.

Multiple services can be created per job, all run in parallel. Additionally, services can define sub-services using toil.job.Job.Service.addChild(). This allows complex networks of services to be created, e.g. Apache Spark clusters, within a workflow.

Checkpoints

Services complicate resuming a workflow after failure, because they can create complex dependencies between jobs. For example, consider a service that provides a database that multiple jobs update. If the database service fails and loses state, it is not clear that just restarting the service will allow the workflow to be resumed, because jobs that created that state may have already finished. To get around this problem Toil supports checkpoint jobs, specified as the boolean keyword argument checkpoint to a job or wrapped function, e.g.:

j = Job(checkpoint=True)

A checkpoint job is rerun if one or more of its successors fails its retry attempts, until it itself has exhausted its retry attempts. Upon restarting a checkpoint job all its existing successors are first deleted, and then the job is rerun to define new successors. By checkpointing a job that defines a service, upon failure of the service the database and the jobs that access the service can be redefined and rerun.

To make the implementation of checkpoint jobs simple, a job can only be a checkpoint if when first defined it has no successors, i.e. it can only define successors within its run method.

Encapsulation

Let A be a root job potentially with children and follow-ons. Without an encapsulated job the simplest way to specify a job B which runs after A and all its successors is to create a parent of A, call it Ap, and then make B a follow-on of Ap. e.g.:

import os

from toil.common import Toil
from toil.job import Job
from toil.lib.io import mkdtemp

if __name__ == "__main__":
    # A is a job with children and follow-ons, for example:
    A = Job()
    A.addChild(Job())
    A.addFollowOn(Job())

    # B is a job which needs to run after A and its successors
    B = Job()

    # The way to do this without encapsulation is to make a parent of A, Ap, and make B a follow-on of Ap.
    Ap = Job()
    Ap.addChild(A)
    Ap.addFollowOn(B)

    jobstore: str = mkdtemp("tutorial_encapsulations")
    os.rmdir(jobstore)
    options = Job.Runner.getDefaultOptions(jobstore)
    options.logLevel = "INFO"
    options.clean = "always"

    with Toil(options) as toil:
        print(toil.start(Ap))

An encapsulated job E(A) of A saves making Ap, instead we can write:

import os

from toil.common import Toil
from toil.job import Job
from toil.lib.io import mkdtemp

if __name__ == "__main__":
    # A
    A = Job()
    A.addChild(Job())
    A.addFollowOn(Job())

    # Encapsulate A
    A = A.encapsulate()

    # B is a job which needs to run after A and its successors
    B = Job()

    # With encapsulation A and its successor subgraph appear to be a single job, hence:
    A.addChild(B)

    jobstore: str = mkdtemp("tutorial_encapsulations2")
    os.rmdir(jobstore)
    options = Job.Runner.getDefaultOptions(jobstore)
    options.logLevel = "INFO"
    options.clean = "always"

    with Toil(options) as toil:
        print(toil.start(A))

Note the call to toil.job.Job.encapsulate() creates the toil.job.Job.EncapsulatedJob.

Depending on Toil

If you are packing your workflow(s) as a pip-installable distribution on PyPI, you might be tempted to declare Toil as a dependency in your setup.py, via the install_requires keyword argument to setup(). Unfortunately, this does not work, for two reasons: For one, Toil uses Setuptools’ extra mechanism to manage its own optional dependencies. If you explicitly declared a dependency on Toil, you would have to hard-code a particular combination of extras (or no extras at all), robbing the user of the choice what Toil extras to install. Secondly, and more importantly, declaring a dependency on Toil would only lead to Toil being installed on the leader node of a cluster, but not the worker nodes. Auto-deployment does not work here because Toil cannot auto-deploy itself, the classic “Which came first, chicken or egg?” problem.

In other words, you shouldn’t explicitly depend on Toil. Document the dependency instead (as in “This workflow needs Toil version X.Y.Z to be installed”) and optionally add a version check to your setup.py. Refer to the check_version() function in the toil-lib project’s setup.py for an example. Alternatively, you can also just depend on toil-lib and you’ll get that check for free.

If your workflow depends on a dependency of Toil, consider not making that dependency explicit either. If you do, you risk a version conflict between your project and Toil. The pip utility may silently ignore that conflict, breaking either Toil or your workflow. It is safest to simply assume that Toil installs that dependency for you. The only downside is that you are locked into the exact version of that dependency that Toil declares. But such is life with Python, which, unlike Java, has no means of dependencies belonging to different software components within the same process, and whose favored software distribution utility is incapable of properly resolving overlapping dependencies and detecting conflicts.

Best Practices for Dockerizing Toil Workflows

Computational Genomics Lab’s Dockstore based production system provides workflow authors a way to run Dockerized versions of their pipeline in an automated, scalable fashion. To be compatible with this system of a workflow should meet the following requirements. In addition to the Docker container, a common workflow language descriptor file is needed. For inputs:

  • Only command line arguments should be used for configuring the workflow. If the workflow relies on a configuration file, like Toil-RNAseq or ProTECT, a wrapper script inside the Docker container can be used to parse the CLI and generate the necessary configuration file.

  • All inputs to the pipeline should be explicitly enumerated rather than implicit. For example, don’t rely on one FASTQ read’s path to discover the location of its pair. This is necessary since all inputs are mapped to their own isolated directories when the Docker is called via Dockstore.

  • All inputs must be documented in the CWL descriptor file. Examples of this file can be seen in both Toil-RNAseq and ProTECT.

For outputs:

  • All outputs should be written to a local path rather than S3.

  • Take care to package outputs in a local and user-friendly way. For example, don’t tar up all output if there are specific files that will care to see individually.

  • All output file names should be deterministic and predictable. For example, don’t prepend the name of an output file with PASS/FAIL depending on the outcome of the pipeline.

  • All outputs must be documented in the CWL descriptor file. Examples of this file can be seen in both Toil-RNAseq and ProTECT.