Quickstart Examples¶
Running a basic workflow¶
A Toil workflow can be run with just three steps.
Install Toil (see Installation)
Copy and paste the following code block into
helloWorld.py
:from toil.common import Toil from toil.job import Job def helloWorld(message, memory="1G", cores=1, disk="1G"): return "Hello, world!, here's a message: %s" % message if __name__ == "__main__": parser = Job.Runner.getDefaultArgumentParser() options = parser.parse_args() with Toil(options) as toil: output = toil.start(Job.wrapFn(helloWorld, "You did it!")) print output
Specify a job store and run the workflow like so:
(venv) $ python helloWorld.py file:my-job-store
Note
Don’t actually type (venv) $
in at the beginning of each command. This is intended only to remind the user that
they should have their virtual environment running.
Congratulations! You’ve run your first Toil workflow on the singleMachine
batch system (the default) using the
file
job store.
The batch system is what schedules the jobs Toil creates. Toil supports many different kinds of batch systems
(such as Apache Mesos and Grid Engine) which makes it easy to run your workflow in all kinds of places.
The singleMachine
batch system is primarily used to prepare and debug workflows on the
local machine. Once ready, they can be run on a full-fledged batch system (see Batch System API).
Usually, a workflow will generate files, and Toil
needs a place to keep track of things. The job store is where Toil keeps all of the intermediate files shared
between jobs. The argument you passed in to your script file:my-job-store
indicated where. The file:
part just tells Toil you are using the file
job store, which means everything is kept in a temporary directory
called my-job-store
. (Read more about Job Store API.)
Toil is totally customizable! Run python helloWorld.py --help
to see a complete list of available options.
For something beyond a “Hello, world!” example, refer to A (more) real-world example.
Running a basic CWL workflow¶
The Common Workflow Language (CWL) is an emerging standard for writing workflows that are portable across multiple workflow engines and platforms. Running CWL workflows using Toil is easy.
First ensure that Toil is installed with the
cwl
extra (see Installing extra features).(venv) $ pip install toil[cwl]
This installs the
toil-cwl-runner
andcwl-runner
executables. These are identical -cwl-runner
is the portable name for the default system CWL runner.Copy and paste the following code block into
example.cwl
:cwlVersion: v1.0 class: CommandLineTool baseCommand: echo stdout: output.txt inputs: message: type: string inputBinding: position: 1 outputs: output: type: stdout
and this code into
example-job.yaml
:message: Hello world!
To run the workflow simply enter
(venv) $ toil-cwl-runner example.cwl example-job.yaml
Your output will be in
output.txt
(venv) $ cat output.txt Hello world!
To learn more about CWL, see the CWL User Guide (from where this example was shamelessly borrowed).
To run this workflow on an AWS cluster have a look at Running a CWL Workflow on AWS.
For information on using CWL with Toil see the section CWL in Toil
A (more) real-world example¶
For a more detailed example and explanation, we’ve developed a sample pipeline that merge-sorts a temporary file. This is not supposed to be an efficient sorting program, rather a more fully worked example of what Toil is capable of.
Running the example¶
Download
the example code
.Run it with the default settings:
(venv) $ python sort.py file:jobStore
The workflow created a file called
sortedFile.txt
in your current directory. Have a look at it and notice that it contains a whole lot of sorted lines!This workflow does a smart merge sort on a file it generates. A file called
fileToSort.txt
. The sort is smart because each step of the process—splitting the file into separate chunks, sorting these chunks, and merging them back together—is compartmentalized into a job. Each job can specify it’s own resource requirements and will only be run after the jobs it depends upon have run. Jobs without dependencies will be run in parallel.Run with custom options:
(venv) $ python sort.py file:jobStore --numLines=5000 --lineLength=10 --workDir=/tmp/ --overwriteOutput=True
Here we see that we can add our own options to a Toil script. The first two options determine the number of lines and how many characters are in each line. The last option is a built-in Toil option where temporary files unique to a job are kept.
Describing the source code¶
To understand the details of what’s going on inside.
Let’s start with the main()
function. It looks like a lot of code, but don’t worry, we’ll break it down piece by
piece.
def main(options=None):
if not options:
# deal with command line arguments
parser = ArgumentParser()
Job.Runner.addToilOptions(parser)
parser.add_argument('--numLines', default=defaultLines, help='Number of lines in file to sort.', type=int)
parser.add_argument('--lineLength', default=defaultLineLen, help='Length of lines in file to sort.', type=int)
parser.add_argument("--fileToSort", help="The file you wish to sort")
parser.add_argument("--outputFile", help="Where the sorted output will go")
parser.add_argument("--overwriteOutput", help="Write over the output file if it already exists.", default=True)
parser.add_argument("--N", dest="N",
help="The threshold below which a serial sort function is used to sort file. "
"All lines must of length less than or equal to N or program will fail",
default=10000)
parser.add_argument('--downCheckpoints', action='store_true',
help='If this option is set, the workflow will make checkpoints on its way through'
'the recursive "down" part of the sort')
options = parser.parse_args()
# do some input verification
sortedFileName = options.outputFile or "sortedFile.txt"
if not options.overwriteOutput and os.path.exists(sortedFileName):
print("the output file {} already exists. Delete it to run the sort example again or use --overwriteOutput=True".format(sortedFileName))
exit()
fileName = options.fileToSort
if options.fileToSort is None:
# make the file ourselves
fileName = 'fileToSort.txt'
if os.path.exists(fileName):
print "Sorting existing file", fileName
else:
print 'No sort file specified. Generating one automatically called %s.' % fileName
makeFileToSort(fileName=fileName, lines=options.numLines, lineLen=options.lineLength)
else:
if not os.path.exists(options.fileToSort):
raise RuntimeError("File to sort does not exist: %s" % options.fileToSort)
if int(options.N) <= 0:
raise RuntimeError("Invalid value of N: %s" % options.N)
# Now we are ready to run
with Toil(options) as workflow:
sortedFileURL = 'file://' + os.path.abspath(sortedFileName)
if not workflow.options.restart:
sortFileURL = 'file://' + os.path.abspath(fileName)
sortFileID = workflow.importFile(sortFileURL)
sortedFileID = workflow.start(Job.wrapJobFn(setup, sortFileID, int(options.N), options.downCheckpoints,
memory=sortMemory))
else:
sortedFileID = workflow.restart()
workflow.exportFile(sortedFileID, sortedFileURL)
First we make a parser to process command line arguments using the argparse module. It’s important that we add the
call to Job.Runner.addToilOptions()
to initialize our parser with all of Toil’s default options. Then we add
the command line arguments unique to this workflow, and parse the input. The help message listed with the arguments
should give you a pretty good idea of what they can do.
Next we do a little bit of verification of the input arguments. The option --fileToSort
allows you to specify a file
that needs to be sorted. If this option isn’t given, it’s here that we make our own file with the call to
makeFileToSort()
.
Finally we come to the context manager that initializes the workflow. We create a path to the input file prepended with
'file://'
as per the documentation for toil.common.Toil()
when staging a file that is stored locally. Notice
that we have to check whether or not the workflow is restarting so that we don’t import the file more than once.
Finally we can kick off the workflow by calling toil.common.Toil.start()
on the job setup
. When the workflow
ends we capture its output (the sorted file’s fileID) and use that in toil.common.Toil.exportFile()
to move the
sorted file from the job store back into “userland”.
Next let’s look at the job that begins the actual workflow, setup
.
def setup(job, inputFile, N, downCheckpoints):
"""
Sets up the sort.
Returns the FileID of the sorted file
"""
job.fileStore.logToMaster("Starting the merge sort")
return job.addChildJobFn(down,
inputFile, N,
downCheckpoints,
memory='1000M').rv()
setup
really only does two things. First it writes to the logs using Job.FileStore.logToMaster()
and then
calls addChildJobFn()
. Child jobs run directly after the current job. This function turns the ‘job function’
down
into an actual job and passes in the inputs including an optional resource requirement, memory
. The job
doesn’t actually get run until the call to Job.rv()
. Once the job down
finishes, its output is returned here.
Now we can look at what down
does.
def down(job, inputFileStoreID, N, downCheckpoints, memory=sortMemory):
"""
Input is a file and a range into that file to sort and an output location in which
to write the sorted file.
If the range is larger than a threshold N the range is divided recursively and
a follow on job is then created which merges back the results else
the file is sorted and placed in the output.
"""
# Read the file
inputFile = job.fileStore.readGlobalFile(inputFileStoreID, cache=False)
length = os.path.getsize(inputFile)
if length > N:
# We will subdivide the file
job.fileStore.logToMaster("Splitting file: %s of size: %s"
% (inputFileStoreID, length), level=logging.CRITICAL)
# Split the file into two copies
midPoint = getMidPoint(inputFile, 0, length)
t1 = job.fileStore.getLocalTempFile()
with open(t1, 'w') as fH:
copySubRangeOfFile(inputFile, 0, midPoint+1, fH)
t2 = job.fileStore.getLocalTempFile()
with open(t2, 'w') as fH:
copySubRangeOfFile(inputFile, midPoint+1, length, fH)
# Call down recursively. By giving the rv() of the two jobs as inputs to the follow-on job, up,
# we communicate the dependency without hindering concurrency.
return job.addFollowOnJobFn(up,
job.addChildJobFn(down, job.fileStore.writeGlobalFile(t1), N, downCheckpoints,
checkpoint=downCheckpoints, memory=sortMemory).rv(),
job.addChildJobFn(down, job.fileStore.writeGlobalFile(t2), N, downCheckpoints,
checkpoint=downCheckpoints, memory=sortMemory).rv()).rv()
else:
# We can sort this bit of the file
job.fileStore.logToMaster("Sorting file: %s of size: %s"
% (inputFileStoreID, length), level=logging.CRITICAL)
# Sort the copy and write back to the fileStore
shutil.copyfile(inputFile, inputFile + '.sort')
sort(inputFile + '.sort')
return job.fileStore.writeGlobalFile(inputFile + '.sort')
Down is the recursive part of the workflow. First we read the file into the local filestore by calling
Job.FileStore.readGlobalFile()
. This puts a copy of the file in the temp directory for this particular job. This
storage will disappear once this job ends. For a detailed explanation of the filestore, job store, and their interfaces
have a look at Managing files within a workflow.
Next down
checks the base case of the recursion: is the length of the input file less than N
(remember N
was an option we added to the workflow in main
). In the base case, we just sort the file, and return the file ID
of this new sorted file.
If the base case fails, then the file is split into two new tempFiles using Job.FileStore.getLocalTempFile()
and
the helper function copySubRangeOfFile
. Finally we add a follow on Job up
with Job.addFollowOnJobFn()
.
We’ve already seen child jobs. A follow-on Job is a job that runs after the current job and all of its children (and their children and follow-ons) have
completed. Using a follow-on makes sense because up
is responsible for merging the files together and we don’t want
to merge the files together until we know they are sorted. Again, the return value of the follow-on job is requested
using Job.rv()
.
Looking at up
def up(job, inputFileID1, inputFileID2, memory=sortMemory):
"""
Merges the two files and places them in the output.
"""
with job.fileStore.writeGlobalFileStream() as (fileHandle, outputFileStoreID):
with job.fileStore.readGlobalFileStream(inputFileID1) as inputFileHandle1:
with job.fileStore.readGlobalFileStream(inputFileID2) as inputFileHandle2:
merge(inputFileHandle1, inputFileHandle2, fileHandle)
job.fileStore.logToMaster("Merging %s and %s to %s"
% (inputFileID1, inputFileID2, outputFileStoreID))
# Cleanup up the input files - these deletes will occur after the completion is successful.
job.fileStore.deleteGlobalFile(inputFileID1)
job.fileStore.deleteGlobalFile(inputFileID2)
return outputFileStoreID
we see that the two input files are merged together and the output is written to a new file using
job.FileStore.writeGlobalFileStream()
. After a little cleanup, the output file is returned.
Once the final up
finishes and all of the rv()
promises are fulfilled, main
receives the sorted file’s ID
which it uses in exportFile
to send it to the user.
There are other things in this example that we didn’t go over such as Checkpoints and the details of much of the the Toil API.
At the end of the script the lines:
if __name__ == '__main__'
main()
are included to ensure that the main function is only run once in the ‘__main__’ process
invoked by you, the user.
In Toil terms, by invoking the script you created the leader process
in which the main()
function is run. A worker process is a separate process whose sole purpose
is to host the execution of one or more jobs defined in that script. In any Toil
workflow there is always one leader process, and potentially many worker processes.
When using the single-machine batch system (the default), the worker processes will be running on the same machine as the leader process. With full-fledged batch systems like Mesos the worker processes will typically be started on separate machines. The boilerplate ensures that the pipeline is only started once–on the leader–but not when its job functions are imported and executed on the individual workers.
Typing python sort.py --help
will show the complete list of
arguments for the workflow which includes both Toil’s and ones defined inside
sort.py
. A complete explanation of Toil’s arguments can be
found in Toil Workflow Options and Command Line Interface.
Logging¶
By default, Toil logs a lot of information related to the current environment
in addition to messages from the batch system and jobs. This can be configured
with the --logLevel
flag. For example, to only log CRITICAL
level
messages to the screen:
(venv) $ python sort.py file:jobStore --logLevel=critical --overwriteOutput=True
This hides most of the information we get from the Toil run. For more detail,
we can run the pipeline with --logLevel=debug
to see a comprehensive
output. For more information, see Logging.
Error Handling and Resuming Pipelines¶
With Toil, you can recover gracefully from a bug in your pipeline without losing
any progress from successfully-completed jobs. To demonstrate this, let’s add
a bug to our example code to see how Toil handles a failure and how we can
resume a pipeline after that happens. Add a bad assertion at line 52 of the
example (the first line of down()
):
def down(job, inputFileStoreID, N, downCheckpoints, memory=sortMemory):
...
assert 1 == 2, "Test error!"
When we run the pipeline, Toil will show a detailed failure log with a traceback:
(venv) $ python sort.py file:jobStore
...
---TOIL WORKER OUTPUT LOG---
...
m/j/jobonrSMP Traceback (most recent call last):
m/j/jobonrSMP File "toil/src/toil/worker.py", line 340, in main
m/j/jobonrSMP job._runner(jobGraph=jobGraph, jobStore=jobStore, fileStore=fileStore)
m/j/jobonrSMP File "toil/src/toil/job.py", line 1270, in _runner
m/j/jobonrSMP returnValues = self._run(jobGraph, fileStore)
m/j/jobonrSMP File "toil/src/toil/job.py", line 1217, in _run
m/j/jobonrSMP return self.run(fileStore)
m/j/jobonrSMP File "toil/src/toil/job.py", line 1383, in run
m/j/jobonrSMP rValue = userFunction(*((self,) + tuple(self._args)), **self._kwargs)
m/j/jobonrSMP File "toil/example.py", line 30, in down
m/j/jobonrSMP assert 1 == 2, "Test error!"
m/j/jobonrSMP AssertionError: Test error!
If we try and run the pipeline again, Toil will give us an error message saying that a job store of the same name already exists. By default, in the event of a failure, the job store is preserved so that the workflow can be restarted, starting from the previously failed jobs. We can restart the pipeline by running:
(venv) $ python sort.py file:jobStore --restart --overwriteOutput=True
We can also change the number of times Toil will attempt to retry a failed job:
(venv) $ python sort.py --retryCount 2 --restart --overwriteOutput=True
You’ll now see Toil attempt to rerun the failed job until it runs out of tries.
--retryCount
is useful for non-systemic errors, like downloading a file that
may experience a sporadic interruption, or some other non-deterministic failure.
To successfully restart our pipeline, we can edit our script to comment out line 30, or remove it, and then run
(venv) $ python sort.py --restart --overwriteOutput=True
The pipeline will run successfully, and the job store will be removed on the pipeline’s completion.
Collecting Statistics¶
A Toil pipeline can be run with the --stats
flag to allows collection of
statistics:
(venv) $ python sort.py --stats --overwriteOutput=True
Once the pipeline finishes, the job store will be left behind, allowing us to get information on the total runtime and stats pertaining to each job function:
(venv) $ toil stats file:jobStore
...
Batch System: singleMachine
Default Cores: 1 Default Memory: 2097152K
...
Once we’re done, we can clean up the job store by running
(venv) $ toil clean file:jobStore
Note, by default if --stats
is not included and the pipeline finishes
successfully then toil clean is run automatically and the job store is cleaned up.
This was the case with the above examples. See options to prevent this behavior.
Launching a Toil Workflow in AWS¶
After having installed the aws
extra for Toil during the Installation and set up AWS (see Preparing your AWS environment), the user can run the basic helloWorld.py
script (Running a basic workflow) on a VM in AWS just by modifying the run command.
Note that when running in AWS, users can either run the workflow on a single instance or run it on a cluster (which is running across multiple containers on multliple AWS instances). For more information on running Toil workflows on a cluster, see Running in AWS.
Launch a cluster in AWS using the launch-cluster command. The arguments
keyPairName
,nodeType
, andzone
are required to launch a cluster.(venv) $ toil launch-cluster <cluster-name> \ --keyPairName <AWS-key-pair-name> \ --nodeType t2.medium \ --zone us-west-2a
Copy
helloWorld.py
to the/tmp
directory on the leader node using the rsync-cluster command. Note that the command requires defining the file to copy as well as the target location on the cluster leader node.:(venv) $ toil rsync-cluster --zone us-west-2a <cluster-name> helloWorld.py :/tmp
Login to the cluster leader node using the ssh-cluster command. Note this command will log you in as the
root
user(venv) $ toil ssh-cluster --zone us-west-2a <cluster-name>
Run the Toil script in the cluster. In this particular case, we create an S3 bucket called
my-S3-bucket
in theus-west-2
availability zone to store intermediate job results.$ python /tmp/helloWorld.py aws:us-west-2:my-S3-bucket
Along with some other
INFO
log messages, you should get the following output in your terminal window:Hello, world!, here's a message: You did it!
Exit from the SSH connection.
$ exit
Use the destroy-cluster command to destroy the cluster. Note this command will destroy the cluster leader node and any resources created to run the job, including the S3 bucket.
(venv) $ toil destroy-cluster --zone us-west-2a <cluster-name>
Running a CWL Workflow on AWS¶
After having installed the aws
and cwl
extras for Toil during the Installation and set up AWS (see Preparing your AWS environment),
the user can run a CWL workflow with Toil on AWS.
First launch a node in AWS using the launch-cluster command.
(venv) $ toil launch-cluster <cluster-name> \ --keyPairName <AWS-key-pair-name> \ --nodeType t2.micro \ --zone us-west-2a
Copy
example.cwl
andexample-job.cwl
from the CWL example to the node using the rsync-cluster command.(venv) $ toil rsync-cluster --zone us-west-2a <cluster-name> \ example.cwl example-job.cwl :/tmp
Launch the CWL workflow using the ssh-cluster utility.
(venv) $ toil ssh-cluster --zone us-west-2a <cluster-name> \ toil-cwl-runner \ /tmp/example.cwl \ /tmp/example-job.yml
Tip
When running a CWL workflow on AWS, input files can be provided either on the local file system or in S3 buckets using
s3://
URI references. Final output files will be copied to the local file system of the leader node.Destroy the cluster.
(venv) $ toil destroy-cluster --zone us-west-2a <cluster-name>