# Copyright (C) 2015-2021 Regents of the University of California
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
A demonstration of toil. Sorts the lines of a file into ascending order by doing a parallel merge sort.
This is an intentionally buggy version that doesn't include restart() for testing purposes.
"""
import codecs
import os
import random
import shutil
from configargparse import ArgumentParser
from toil.common import Toil
from toil.job import Job
from toil.realtimeLogger import RealtimeLogger
defaultLines = 1000
defaultLineLen = 50
sortMemory = "600M"
[docs]
def setup(job, inputFile, N, downCheckpoints, options):
"""
Sets up the sort.
Returns the FileID of the sorted file
"""
RealtimeLogger.info("Starting the merge sort")
return job.addChildJobFn(
down,
inputFile,
N,
"root",
downCheckpoints,
options=options,
preemptible=True,
memory=sortMemory,
).rv()
[docs]
def down(job, inputFileStoreID, N, path, downCheckpoints, options, memory=sortMemory):
"""
Input is a file, a subdivision size N, and a path in the hierarchy of jobs.
If the range is larger than a threshold N the range is divided recursively and
a follow on job is then created which merges back the results else
the file is sorted and placed in the output.
"""
RealtimeLogger.info("Down job starting: %s" % path)
# Read the file
inputFile = job.fileStore.readGlobalFile(inputFileStoreID, cache=False)
length = os.path.getsize(inputFile)
if length > N:
# We will subdivide the file
RealtimeLogger.critical(
"Splitting file: %s of size: %s" % (inputFileStoreID, length)
)
# Split the file into two copies
midPoint = getMidPoint(inputFile, 0, length)
t1 = job.fileStore.getLocalTempFile()
with open(t1, "w") as fH:
fH.write(copySubRangeOfFile(inputFile, 0, midPoint + 1))
t2 = job.fileStore.getLocalTempFile()
with open(t2, "w") as fH:
fH.write(copySubRangeOfFile(inputFile, midPoint + 1, length))
# Call down recursively. By giving the rv() of the two jobs as inputs to the follow-on job, up,
# we communicate the dependency without hindering concurrency.
result = job.addFollowOnJobFn(
up,
job.addChildJobFn(
down,
job.fileStore.writeGlobalFile(t1),
N,
path + "/0",
downCheckpoints,
checkpoint=downCheckpoints,
options=options,
preemptible=True,
memory=options.sortMemory,
).rv(),
job.addChildJobFn(
down,
job.fileStore.writeGlobalFile(t2),
N,
path + "/1",
downCheckpoints,
checkpoint=downCheckpoints,
options=options,
preemptible=True,
memory=options.mergeMemory,
).rv(),
path + "/up",
preemptible=True,
options=options,
memory=options.sortMemory,
).rv()
else:
# We can sort this bit of the file
RealtimeLogger.critical(
"Sorting file: %s of size: %s" % (inputFileStoreID, length)
)
# Sort the copy and write back to the fileStore
shutil.copyfile(inputFile, inputFile + ".sort")
sort(inputFile + ".sort")
result = job.fileStore.writeGlobalFile(inputFile + ".sort")
RealtimeLogger.info("Down job finished: %s" % path)
return result
[docs]
def up(job, inputFileID1, inputFileID2, path, options, memory=sortMemory):
"""
Merges the two files and places them in the output.
"""
RealtimeLogger.info("Up job starting: %s" % path)
with job.fileStore.writeGlobalFileStream() as (fileHandle, outputFileStoreID):
fileHandle = codecs.getwriter("utf-8")(fileHandle)
with job.fileStore.readGlobalFileStream(inputFileID1) as inputFileHandle1:
inputFileHandle1 = codecs.getreader("utf-8")(inputFileHandle1)
with job.fileStore.readGlobalFileStream(inputFileID2) as inputFileHandle2:
inputFileHandle2 = codecs.getreader("utf-8")(inputFileHandle2)
RealtimeLogger.info(
"Merging %s and %s to %s"
% (inputFileID1, inputFileID2, outputFileStoreID)
)
merge(inputFileHandle1, inputFileHandle2, fileHandle)
# Cleanup up the input files - these deletes will occur after the completion is successful.
job.fileStore.deleteGlobalFile(inputFileID1)
job.fileStore.deleteGlobalFile(inputFileID2)
RealtimeLogger.info("Up job finished: %s" % path)
return outputFileStoreID
[docs]
def sort(file):
"""Sorts the given file."""
with open(file) as f:
lines = f.readlines()
lines.sort()
with open(file, "w") as f:
for line in lines:
f.write(line)
[docs]
def merge(fileHandle1, fileHandle2, outputFileHandle):
"""
Merges together two files maintaining sorted order.
All handles must be text-mode streams.
"""
line2 = fileHandle2.readline()
for line1 in fileHandle1.readlines():
while len(line2) != 0 and line2 <= line1:
outputFileHandle.write(line2)
line2 = fileHandle2.readline()
outputFileHandle.write(line1)
while len(line2) != 0:
outputFileHandle.write(line2)
line2 = fileHandle2.readline()
[docs]
def copySubRangeOfFile(inputFile, fileStart, fileEnd):
"""
Copies the range (in bytes) between fileStart and fileEnd to the given
output file handle.
"""
with open(inputFile) as fileHandle:
fileHandle.seek(fileStart)
data = fileHandle.read(fileEnd - fileStart)
assert len(data) == fileEnd - fileStart
return data
[docs]
def getMidPoint(file, fileStart, fileEnd):
"""
Finds the point in the file to split.
Returns an int i such that fileStart <= i < fileEnd
"""
with open(file) as f:
midPoint = (fileStart + fileEnd) // 2
assert midPoint >= fileStart
f.seek(midPoint)
line = f.readline()
assert len(line) >= 1
if len(line) + midPoint < fileEnd:
return midPoint + len(line) - 1
f.seek(fileStart)
line = f.readline()
assert len(line) >= 1
assert len(line) + fileStart <= fileEnd
return len(line) + fileStart - 1
[docs]
def makeFileToSort(fileName, lines=defaultLines, lineLen=defaultLineLen):
with open(fileName, "w") as f:
for _ in range(lines):
line = (
"".join(random.choice("actgACTGNXYZ") for _ in range(lineLen - 1))
+ "\n"
)
f.write(line)
[docs]
def main(options=None):
if not options:
# deal with command line arguments
parser = ArgumentParser()
Job.Runner.addToilOptions(parser)
parser.add_argument(
"--numLines",
default=defaultLines,
help="Number of lines in file to sort.",
type=int,
)
parser.add_argument(
"--lineLength",
default=defaultLineLen,
help="Length of lines in file to sort.",
type=int,
)
parser.add_argument("--fileToSort", help="The file you wish to sort")
parser.add_argument("--outputFile", help="Where the sorted output will go")
parser.add_argument(
"--overwriteOutput",
help="Write over the output file if it already exists.",
default=True,
)
parser.add_argument(
"--N",
dest="N",
help="The threshold below which a serial sort function is used to sort file. "
"All lines must of length less than or equal to N or program will fail",
default=10000,
)
parser.add_argument(
"--downCheckpoints",
action="store_true",
help="If this option is set, the workflow will make checkpoints on its way through"
'the recursive "down" part of the sort',
)
parser.add_argument(
"--sortMemory",
dest="sortMemory",
help="Memory for jobs that sort chunks of the file.",
default=None,
)
parser.add_argument(
"--mergeMemory",
dest="mergeMemory",
help="Memory for jobs that collate results.",
default=None,
)
options = parser.parse_args()
if not hasattr(options, "sortMemory") or not options.sortMemory:
options.sortMemory = sortMemory
if not hasattr(options, "mergeMemory") or not options.mergeMemory:
options.mergeMemory = sortMemory
# do some input verification
sortedFileName = options.outputFile or "sortedFile.txt"
if not options.overwriteOutput and os.path.exists(sortedFileName):
print(
f"Output file {sortedFileName} already exists. "
f"Delete it to run the sort example again or use --overwriteOutput=True"
)
exit()
fileName = options.fileToSort
if options.fileToSort is None:
# make the file ourselves
fileName = "fileToSort.txt"
if os.path.exists(fileName):
print(f"Sorting existing file: {fileName}")
else:
print(
f"No sort file specified. Generating one automatically called: {fileName}."
)
makeFileToSort(
fileName=fileName, lines=options.numLines, lineLen=options.lineLength
)
else:
if not os.path.exists(options.fileToSort):
raise RuntimeError("File to sort does not exist: %s" % options.fileToSort)
if int(options.N) <= 0:
raise RuntimeError("Invalid value of N: %s" % options.N)
# Now we are ready to run
with Toil(options) as workflow:
sortedFileURL = "file://" + os.path.abspath(sortedFileName)
# raise Exception('test')
if not workflow.options.restart:
sortFileURL = "file://" + os.path.abspath(fileName)
sortFileID = workflow.importFile(sortFileURL)
sortedFileID = workflow.start(
Job.wrapJobFn(
setup,
sortFileID,
int(options.N),
options.downCheckpoints,
options=options,
memory=sortMemory,
)
)
"""
The else block is removed here to test that the job store is not
destroyed when attempting to resume without restart().
"""
if __name__ == "__main__":
main()