Quickstart Examples

Running a basic workflow

A Toil workflow can be run with just three steps.

  1. Install Toil (see Installation)

  2. Copy and paste the following code block into helloWorld.py:

    from toil.common import Toil
    from toil.job import Job
    
    def helloWorld(message, memory="1G", cores=1, disk="1G"):
        return "Hello, world!, here's a message: %s" % message
    
    if __name__ == "__main__":
        parser = Job.Runner.getDefaultArgumentParser()
        options = parser.parse_args()
        with Toil(options) as toil:
            output = toil.start(Job.wrapFn(helloWorld, "You did it!"))
        print output
    
  3. Specify a job store and run the workflow like so:

    (venv) $ python helloWorld.py file:my-job-store
    

Note

Don’t actually type (venv) $ in at the beginning of each command. This is intended only to remind the user that they should have their virtual environment running.

Congratulations! You’ve run your first Toil workflow on the singleMachine batch system (the default) using the file job store.

The batch system is what schedules the jobs Toil creates. Toil supports many different kinds of batch systems (such as Apache Mesos and Grid Engine) which makes it easy to run your workflow in all kinds of places. The singleMachine batch system is primarily used to prepare and debug workflows on the local machine. Once ready, they can be run on a full-fledged batch system (see Batch System API).

Usually, a workflow will generate files, and Toil needs a place to keep track of things. The job store is where Toil keeps all of the intermediate files shared between jobs. The argument you passed in to your script file:my-job-store indicated where. The file: part just tells Toil you are using the file job store, which means everything is kept in a temporary directory called my-job-store. (Read more about Job Store API.)

Toil is totally customizable! Run python helloWorld.py --help to see a complete list of available options.

For something beyond a “Hello, world!” example, refer to A (more) real-world example.

Running a basic CWL workflow

The Common Workflow Language (CWL) is an emerging standard for writing workflows that are portable across multiple workflow engines and platforms. Running CWL workflows using Toil is easy.

  1. First ensure that Toil is installed with the cwl extra (see Installing Toil with extra features).

    (venv) $ pip install 'toil[cwl]'
    

    This installs the toil-cwl-runner and cwl-runner executables. These are identical - cwl-runner is the portable name for the default system CWL runner.

  2. Copy and paste the following code block into example.cwl:

    cwlVersion: v1.0
    class: CommandLineTool
    baseCommand: echo
    stdout: output.txt
    inputs:
      message:
        type: string
        inputBinding:
          position: 1
    outputs:
      output:
        type: stdout
    

    and this code into example-job.yaml:

    message: Hello world!
    
  3. To run the workflow simply enter

    (venv) $ toil-cwl-runner example.cwl example-job.yaml
    

    Your output will be in output.txt

    (venv) $ cat output.txt
    Hello world!
    

To learn more about CWL, see the CWL User Guide (from where this example was shamelessly borrowed).

To run this workflow on an AWS cluster have a look at Running a CWL Workflow on AWS.

For information on using CWL with Toil see the section CWL in Toil

A (more) real-world example

For a more detailed example and explanation, we’ve developed a sample pipeline that merge-sorts a temporary file. This is not supposed to be an efficient sorting program, rather a more fully worked example of what Toil is capable of.

Running the example

  1. Download the example code.

  2. Run it with the default settings:

    (venv) $ python sort.py file:jobStore
    

    The workflow created a file called sortedFile.txt in your current directory. Have a look at it and notice that it contains a whole lot of sorted lines!

    This workflow does a smart merge sort on a file it generates. A file called fileToSort.txt. The sort is smart because each step of the process—splitting the file into separate chunks, sorting these chunks, and merging them back together—is compartmentalized into a job. Each job can specify it’s own resource requirements and will only be run after the jobs it depends upon have run. Jobs without dependencies will be run in parallel.

  3. Run with custom options:

    (venv) $ python sort.py file:jobStore --numLines=5000 --lineLength=10 --workDir=/tmp/ --overwriteOutput=True
    

    Here we see that we can add our own options to a Toil script. The first two options determine the number of lines and how many characters are in each line. The last option is a built-in Toil option where temporary files unique to a job are kept.

Describing the source code

To understand the details of what’s going on inside. Let’s start with the main() function. It looks like a lot of code, but don’t worry, we’ll break it down piece by piece.

def main(options=None):
    if not options:
        # deal with command line arguments
        parser = ArgumentParser()
        Job.Runner.addToilOptions(parser)
        parser.add_argument('--numLines', default=defaultLines, help='Number of lines in file to sort.', type=int)
        parser.add_argument('--lineLength', default=defaultLineLen, help='Length of lines in file to sort.', type=int)
        parser.add_argument("--fileToSort", help="The file you wish to sort")
        parser.add_argument("--outputFile", help="Where the sorted output will go")
        parser.add_argument("--overwriteOutput", help="Write over the output file if it already exists.", default=True)
        parser.add_argument("--N", dest="N",
                            help="The threshold below which a serial sort function is used to sort file. "
                                 "All lines must of length less than or equal to N or program will fail",
                            default=10000)
        parser.add_argument('--downCheckpoints', action='store_true',
                            help='If this option is set, the workflow will make checkpoints on its way through'
                                 'the recursive "down" part of the sort')
        parser.add_argument("--sortMemory", dest="sortMemory",
                        help="Memory for jobs that sort chunks of the file.",
                        default=None)
    
        parser.add_argument("--mergeMemory", dest="mergeMemory",
                        help="Memory for jobs that collate results.",
                        default=None)

        options = parser.parse_args()
    if not hasattr(options, "sortMemory") or not options.sortMemory:
        options.sortMemory = sortMemory
    if not hasattr(options, "mergeMemory") or not options.mergeMemory:
        options.mergeMemory = sortMemory

    # do some input verification
    sortedFileName = options.outputFile or "sortedFile.txt"
    if not options.overwriteOutput and os.path.exists(sortedFileName):
        print("the output file {} already exists. Delete it to run the sort example again or use --overwriteOutput=True".format(sortedFileName))
        exit()

    fileName = options.fileToSort
    if options.fileToSort is None:
        # make the file ourselves
        fileName = 'fileToSort.txt'
        if os.path.exists(fileName):
            print "Sorting existing file", fileName
        else:
            print 'No sort file specified. Generating one automatically called %s.' % fileName
            makeFileToSort(fileName=fileName, lines=options.numLines, lineLen=options.lineLength)
    else:
        if not os.path.exists(options.fileToSort):
            raise RuntimeError("File to sort does not exist: %s" % options.fileToSort)

    if int(options.N) <= 0:
        raise RuntimeError("Invalid value of N: %s" % options.N)

    # Now we are ready to run
    with Toil(options) as workflow:
        sortedFileURL = 'file://' + os.path.abspath(sortedFileName)
        if not workflow.options.restart:
            sortFileURL = 'file://' + os.path.abspath(fileName)
            sortFileID = workflow.importFile(sortFileURL)
            sortedFileID = workflow.start(Job.wrapJobFn(setup, sortFileID, int(options.N), options.downCheckpoints, options=options,
                                                    memory=sortMemory))
        else:
            sortedFileID = workflow.restart()
        workflow.exportFile(sortedFileID, sortedFileURL)

First we make a parser to process command line arguments using the argparse module. It’s important that we add the call to Job.Runner.addToilOptions() to initialize our parser with all of Toil’s default options. Then we add the command line arguments unique to this workflow, and parse the input. The help message listed with the arguments should give you a pretty good idea of what they can do.

Next we do a little bit of verification of the input arguments. The option --fileToSort allows you to specify a file that needs to be sorted. If this option isn’t given, it’s here that we make our own file with the call to makeFileToSort().

Finally we come to the context manager that initializes the workflow. We create a path to the input file prepended with 'file://' as per the documentation for toil.common.Toil() when staging a file that is stored locally. Notice that we have to check whether or not the workflow is restarting so that we don’t import the file more than once. Finally we can kick off the workflow by calling toil.common.Toil.start() on the job setup. When the workflow ends we capture its output (the sorted file’s fileID) and use that in toil.common.Toil.exportFile() to move the sorted file from the job store back into “userland”.

Next let’s look at the job that begins the actual workflow, setup.

def setup(job, inputFile, N, downCheckpoints, options):
    """
    Sets up the sort.
    Returns the FileID of the sorted file
    """
    job.log("Starting the merge sort")
    return job.addChildJobFn(down,
                             inputFile, N,
                             downCheckpoints,
                             options = options,
                             memory='1000M').rv()

setup really only does two things. First it writes to the logs using Job.log() and then calls addChildJobFn(). Child jobs run directly after the current job. This function turns the ‘job function’ down into an actual job and passes in the inputs including an optional resource requirement, memory. The job doesn’t actually get run until the call to Job.rv(). Once the job down finishes, its output is returned here.

Now we can look at what down does.

def down(job, inputFileStoreID, N, downCheckpoints, options, memory=sortMemory):
    """
    Input is a file and a range into that file to sort and an output location in which
    to write the sorted file.
    If the range is larger than a threshold N the range is divided recursively and
    a follow on job is then created which merges back the results else
    the file is sorted and placed in the output.
    """
    # Read the file
    inputFile = job.fileStore.readGlobalFile(inputFileStoreID, cache=False)
    length = os.path.getsize(inputFile)
    if length > N:
        # We will subdivide the file
        job.log("Splitting file: %s of size: %s"
                % (inputFileStoreID, length), level=logging.CRITICAL)
        # Split the file into two copies
        midPoint = getMidPoint(inputFile, 0, length)
        t1 = job.fileStore.getLocalTempFile()
        with open(t1, 'w') as fH:
            copySubRangeOfFile(inputFile, 0, midPoint+1, fH)
        t2 = job.fileStore.getLocalTempFile()
        with open(t2, 'w') as fH:
            copySubRangeOfFile(inputFile, midPoint+1, length, fH)
        # Call down recursively. By giving the rv() of the two jobs as inputs to the follow-on job, up,
        # we communicate the dependency without hindering concurrency.
        return job.addFollowOnJobFn(up,
                                    job.addChildJobFn(down, job.fileStore.writeGlobalFile(t1), N, downCheckpoints,
                                                      checkpoint=downCheckpoints, options=options, memory=options.sortMemory).rv(),
                                    job.addChildJobFn(down, job.fileStore.writeGlobalFile(t2), N, downCheckpoints,
                                                      checkpoint=downCheckpoints, options=options, memory=options.mergeMemory).rv(), options=options, memory=options.sortMemory).rv()
    else:
        # We can sort this bit of the file
        job.log("Sorting file: %s of size: %s"
                % (inputFileStoreID, length), level=logging.CRITICAL)
        # Sort the copy and write back to the fileStore
        shutil.copyfile(inputFile, inputFile + '.sort')
        sort(inputFile + '.sort')
        return job.fileStore.writeGlobalFile(inputFile + '.sort')

Down is the recursive part of the workflow. First we read the file into the local filestore by calling Job.FileStore.readGlobalFile(). This puts a copy of the file in the temp directory for this particular job. This storage will disappear once this job ends. For a detailed explanation of the filestore, job store, and their interfaces have a look at Managing files within a workflow.

Next down checks the base case of the recursion: is the length of the input file less than N (remember N was an option we added to the workflow in main). In the base case, we just sort the file, and return the file ID of this new sorted file.

If the base case fails, then the file is split into two new tempFiles using Job.FileStore.getLocalTempFile() and the helper function copySubRangeOfFile. Finally we add a follow on Job up with Job.addFollowOnJobFn(). We’ve already seen child jobs. A follow-on Job is a job that runs after the current job and all of its children (and their children and follow-ons) have completed. Using a follow-on makes sense because up is responsible for merging the files together and we don’t want to merge the files together until we know they are sorted. Again, the return value of the follow-on job is requested using Job.rv().

Looking at up

def up(job, inputFileID1, inputFileID2, options, memory=sortMemory):
    """
    Merges the two files and places them in the output.
    """
    with job.fileStore.writeGlobalFileStream() as (fileHandle, outputFileStoreID):
        with job.fileStore.readGlobalFileStream(inputFileID1) as inputFileHandle1:
            with job.fileStore.readGlobalFileStream(inputFileID2) as inputFileHandle2:
                merge(inputFileHandle1, inputFileHandle2, fileHandle)
                job.log("Merging %s and %s to %s"
                        % (inputFileID1, inputFileID2, outputFileStoreID))
        # Cleanup up the input files - these deletes will occur after the completion is successful.
        job.fileStore.deleteGlobalFile(inputFileID1)
        job.fileStore.deleteGlobalFile(inputFileID2)
        return outputFileStoreID

we see that the two input files are merged together and the output is written to a new file using job.FileStore.writeGlobalFileStream(). After a little cleanup, the output file is returned.

Once the final up finishes and all of the rv() promises are fulfilled, main receives the sorted file’s ID which it uses in exportFile to send it to the user.

There are other things in this example that we didn’t go over such as Checkpoints and the details of much of the the Toil API.

At the end of the script the lines:

if __name__ == '__main__'
    main()

are included to ensure that the main function is only run once in the ‘__main__’ process invoked by you, the user. In Toil terms, by invoking the script you created the leader process in which the main() function is run. A worker process is a separate process whose sole purpose is to host the execution of one or more jobs defined in that script. In any Toil workflow there is always one leader process, and potentially many worker processes.

When using the single-machine batch system (the default), the worker processes will be running on the same machine as the leader process. With full-fledged batch systems like Mesos the worker processes will typically be started on separate machines. The boilerplate ensures that the pipeline is only started once–on the leader–but not when its job functions are imported and executed on the individual workers.

Typing python sort.py --help will show the complete list of arguments for the workflow which includes both Toil’s and ones defined inside sort.py. A complete explanation of Toil’s arguments can be found in Toil Workflow Options and Command Line Interface.

Logging

By default, Toil logs a lot of information related to the current environment in addition to messages from the batch system and jobs. This can be configured with the --logLevel flag. For example, to only log CRITICAL level messages to the screen:

(venv) $ python sort.py file:jobStore --logLevel=critical --overwriteOutput=True

This hides most of the information we get from the Toil run. For more detail, we can run the pipeline with --logLevel=debug to see a comprehensive output. For more information, see Logging.

Error Handling and Resuming Pipelines

With Toil, you can recover gracefully from a bug in your pipeline without losing any progress from successfully-completed jobs. To demonstrate this, let’s add a bug to our example code to see how Toil handles a failure and how we can resume a pipeline after that happens. Add a bad assertion at line 52 of the example (the first line of down()):

def down(job, inputFileStoreID, N, downCheckpoints, memory=sortMemory):
    ...
    assert 1 == 2, "Test error!"

When we run the pipeline, Toil will show a detailed failure log with a traceback:

(venv) $ python sort.py file:jobStore
...
---TOIL WORKER OUTPUT LOG---
...
m/j/jobonrSMP    Traceback (most recent call last):
m/j/jobonrSMP      File "toil/src/toil/worker.py", line 340, in main
m/j/jobonrSMP        job._runner(jobGraph=jobGraph, jobStore=jobStore, fileStore=fileStore)
m/j/jobonrSMP      File "toil/src/toil/job.py", line 1270, in _runner
m/j/jobonrSMP        returnValues = self._run(jobGraph, fileStore)
m/j/jobonrSMP      File "toil/src/toil/job.py", line 1217, in _run
m/j/jobonrSMP        return self.run(fileStore)
m/j/jobonrSMP      File "toil/src/toil/job.py", line 1383, in run
m/j/jobonrSMP        rValue = userFunction(*((self,) + tuple(self._args)), **self._kwargs)
m/j/jobonrSMP      File "toil/example.py", line 30, in down
m/j/jobonrSMP        assert 1 == 2, "Test error!"
m/j/jobonrSMP    AssertionError: Test error!

If we try and run the pipeline again, Toil will give us an error message saying that a job store of the same name already exists. By default, in the event of a failure, the job store is preserved so that the workflow can be restarted, starting from the previously failed jobs. We can restart the pipeline by running:

(venv) $ python sort.py file:jobStore --restart --overwriteOutput=True

We can also change the number of times Toil will attempt to retry a failed job:

(venv) $ python sort.py --retryCount 2 --restart --overwriteOutput=True

You’ll now see Toil attempt to rerun the failed job until it runs out of tries. --retryCount is useful for non-systemic errors, like downloading a file that may experience a sporadic interruption, or some other non-deterministic failure.

To successfully restart our pipeline, we can edit our script to comment out line 30, or remove it, and then run

(venv) $ python sort.py --restart --overwriteOutput=True

The pipeline will run successfully, and the job store will be removed on the pipeline’s completion.

Collecting Statistics

A Toil pipeline can be run with the --stats flag to allows collection of statistics:

(venv) $ python sort.py --stats --overwriteOutput=True

Once the pipeline finishes, the job store will be left behind, allowing us to get information on the total runtime and stats pertaining to each job function:

(venv) $ toil stats file:jobStore
...
Batch System: singleMachine
Default Cores: 1  Default Memory: 2097152K
...

Once we’re done, we can clean up the job store by running

(venv) $ toil clean file:jobStore

Note, by default if --stats is not included and the pipeline finishes successfully then toil clean is run automatically and the job store is cleaned up. This was the case with the above examples. See options to prevent this behavior.

Launching a Toil Workflow in AWS

After having installed the aws extra for Toil during the Installation and set up AWS (see Preparing your AWS environment), the user can run the basic helloWorld.py script (Running a basic workflow) on a VM in AWS just by modifying the run command.

Note that when running in AWS, users can either run the workflow on a single instance or run it on a cluster (which is running across multiple containers on multliple AWS instances). For more information on running Toil workflows on a cluster, see Running in AWS.

  1. Launch a cluster in AWS using the launch-cluster command. The arguments keyPairName, leaderNodeType, and zone are required to launch a cluster.

    (venv) $ toil launch-cluster <cluster-name> \
     --keyPairName <AWS-key-pair-name> \
    --leaderNodeType t2.medium \
     --zone us-west-2a
    
  2. Copy helloWorld.py to the /tmp directory on the leader node using the rsync-cluster command. Note that the command requires defining the file to copy as well as the target location on the cluster leader node.:

    (venv) $ toil rsync-cluster --zone us-west-2a <cluster-name> helloWorld.py :/tmp
    
  3. Login to the cluster leader node using the ssh-cluster command. Note this command will log you in as the root user

    (venv) $ toil ssh-cluster --zone us-west-2a <cluster-name>
    
  4. Run the Toil script in the cluster. In this particular case, we create an S3 bucket called my-S3-bucket in the us-west-2 availability zone to store intermediate job results.

    $ python /tmp/helloWorld.py aws:us-west-2:my-S3-bucket
    

    Along with some other INFO log messages, you should get the following output in your terminal window: Hello, world!, here's a message: You did it!

  5. Exit from the SSH connection.

    $ exit
    
  6. Use the destroy-cluster command to destroy the cluster. Note this command will destroy the cluster leader node and any resources created to run the job, including the S3 bucket.

    (venv) $ toil destroy-cluster --zone us-west-2a <cluster-name>
    

Running a CWL Workflow on AWS

After having installed the aws and cwl extras for Toil during the Installation and set up AWS (see Preparing your AWS environment), the user can run a CWL workflow with Toil on AWS.

  1. First launch a node in AWS using the launch-cluster command.

    (venv) $ toil launch-cluster <cluster-name> \
    --keyPairName <AWS-key-pair-name> \
    --leaderNodeType t2.micro \
    --zone us-west-2a
    
  2. Copy example.cwl and example-job.cwl from the CWL example to the node using the rsync-cluster command.

    (venv) $ toil rsync-cluster --zone us-west-2a <cluster-name> \
    example.cwl example-job.cwl :/tmp
    
  3. Launch the CWL workflow using the ssh-cluster utility.

    (venv) $ toil ssh-cluster --zone us-west-2a <cluster-name> \
    toil-cwl-runner \
    /tmp/example.cwl \
    /tmp/example-job.yml
    

    Tip

    When running a CWL workflow on AWS, input files can be provided either on the local file system or in S3 buckets using s3:// URI references. Final output files will be copied to the local file system of the leader node.

  4. Destroy the cluster.

    (venv) $ toil destroy-cluster --zone us-west-2a <cluster-name>
    

Running a Workflow with Autoscaling on AWS - Cactus

Cactus is a reference-free whole-genome multiple alignment program.

  1. Download pestis.tar.gz.

  2. Launch a leader node in AWS using the launch-cluster command.

    (venv) $ toil launch-cluster <cluster-name> \
    --keyPairName <AWS-key-pair-name> \
    --leaderNodeType t2.medium \
    --zone us-west-2c
    (venv) $ export TOIL_AWS_ZONE=us-west-2c
    
  3. Copy the required files, i.e., seqFile.txt (a text file containing the locations of the input sequences as well as their phylogenetic tree, see here), organisms’ genome sequence files in FASTA format, and configuration files (e.g. blockTrim1.xml, if desired), up to the leader node.

    (venv) $ toil rsync-cluster <cluster-name> pestis-short-aws-seqFile.txt :/tmp
    (venv) $ toil rsync-cluster <cluster-name> GCF_000169655.1_ASM16965v1_genomic.fna :/tmp
    (venv) $ toil rsync-cluster <cluster-name> GCF_000006645.1_ASM664v1_genomic.fna :/tmp
    (venv) $ toil rsync-cluster <cluster-name> GCF_000182485.1_ASM18248v1_genomic.fna :/tmp
    (venv) $ toil rsync-cluster <cluster-name> GCF_000013805.1_ASM1380v1_genomic.fna :/tmp
    (venv) $ toil rsync-cluster <cluster-name> setup_leaderNode.sh :/tmp
    (venv) $ toil rsync-cluster <cluster-name> blockTrim1.xml :/tmp
    (venv) $ toil rsync-cluster <cluster-name> blockTrim3.xml :/tmp
    
  4. Log into the leader node.

    (venv) $ toil ssh-cluster <cluster-name>
    
  5. Set up the environment of the leader node to run Cactus.

    $ bash /tmp/setup_leaderNode.sh
    $ source cact_venv/bin/activate
    (cact_venv) $ cd cactus
    (cact_venv) $ pip install --upgrade .
    
  6. Run Cactus as an autoscaling workflow.

    (cact_venv) $ TOIL_APPLIANCE_SELF=quay.io/ucsc_cgl/toil:3.11.0 cactus --provisioner aws \
    --nodeTypes c3.4xlarge --maxNodes 2 --minNodes 0 --retry 10 --batchSystem mesos --disableCaching \
    --logDebug --logFile /logFile_pestis3 --configFile /tmp/blockTrim3.xml aws:us-west-2:cactus-pestis \
    /tmp/pestis-short-aws-seqFile.txt /tmp/pestis_output3.hal
    

    Note

    In this example, we specify the version of Toil to be 3.11.0; if the latest one is desired, please eliminate TOIL_APPLIANCE_SELF=quay.io/ucsc_cgl/toil:3.11.0. The flag --maxNodes 2 creates up to two instances of type c3.4xlarge and launches Mesos slave containers inside them. The flag --logDebug is equal to --logLevel DEBUG. --logFile /logFile_pestis3: Write log in a file named logFile_pestis3 under / folder. The --configFile flag is not required, depending on whether a specific configuration file is intended to run the alignment. Toil creates a bucket in S3 called aws:us-west-2:cactus-pestis to store intermediate job files and metadata. The result file, named pestis_output3.hal, is stored under /tmp folder of the leader node. Use cactus --help to see all the Cactus and Toil flags available.

  7. Log out of the leader node.

    (cact_venv) $ exit
    
  8. Download the resulted output to local machine.

    (venv) $ toil rsync-cluster <cluster-name> :/tmp/pestis_output3.hal <path-of-folder-on-local-machine>
    
  9. Destroy the cluster.

    (venv) $ toil destroy-cluster <cluster-name>