# Copyright (C) 2015-2021 Regents of the University of California
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""A demonstration of toil. Sorts the lines of a file into ascending order by doing a parallel merge sort.
"""
import codecs
import os
import random
import shutil
from configargparse import ArgumentParser
from toil.common import Toil
from toil.job import Job
from toil.realtimeLogger import RealtimeLogger
defaultLines = 1000
defaultLineLen = 50
sortMemory = "600M"
[docs]
def setup(job, inputFile, N, downCheckpoints, options):
"""
Sets up the sort.
Returns the FileID of the sorted file
"""
RealtimeLogger.info("Starting the merge sort")
return job.addChildJobFn(
down,
inputFile,
N,
"root",
downCheckpoints,
options=options,
preemptible=True,
memory=sortMemory,
).rv()
[docs]
def down(job, inputFileStoreID, N, path, downCheckpoints, options, memory=sortMemory):
"""
Input is a file, a subdivision size N, and a path in the hierarchy of jobs.
If the range is larger than a threshold N the range is divided recursively and
a follow on job is then created which merges back the results else
the file is sorted and placed in the output.
"""
RealtimeLogger.info("Down job starting: %s" % path)
# Read the file
inputFile = job.fileStore.readGlobalFile(inputFileStoreID, cache=False)
length = os.path.getsize(inputFile)
if length > N:
# We will subdivide the file
RealtimeLogger.critical(
"Splitting file: %s of size: %s" % (inputFileStoreID, length)
)
# Split the file into two copies
midPoint = getMidPoint(inputFile, 0, length)
t1 = job.fileStore.getLocalTempFile()
with open(t1, "w") as fH:
fH.write(copySubRangeOfFile(inputFile, 0, midPoint + 1))
t2 = job.fileStore.getLocalTempFile()
with open(t2, "w") as fH:
fH.write(copySubRangeOfFile(inputFile, midPoint + 1, length))
# Call down recursively. By giving the rv() of the two jobs as inputs to the follow-on job, up,
# we communicate the dependency without hindering concurrency.
result = job.addFollowOnJobFn(
up,
job.addChildJobFn(
down,
job.fileStore.writeGlobalFile(t1),
N,
path + "/0",
downCheckpoints,
checkpoint=downCheckpoints,
options=options,
preemptible=True,
memory=options.sortMemory,
).rv(),
job.addChildJobFn(
down,
job.fileStore.writeGlobalFile(t2),
N,
path + "/1",
downCheckpoints,
checkpoint=downCheckpoints,
options=options,
preemptible=True,
memory=options.mergeMemory,
).rv(),
path + "/up",
preemptible=True,
options=options,
memory=options.sortMemory,
).rv()
else:
# We can sort this bit of the file
RealtimeLogger.critical(
"Sorting file: %s of size: %s" % (inputFileStoreID, length)
)
# Sort the copy and write back to the fileStore
shutil.copyfile(inputFile, inputFile + ".sort")
sort(inputFile + ".sort")
result = job.fileStore.writeGlobalFile(inputFile + ".sort")
RealtimeLogger.info("Down job finished: %s" % path)
return result
[docs]
def up(job, inputFileID1, inputFileID2, path, options, memory=sortMemory):
"""
Merges the two files and places them in the output.
"""
RealtimeLogger.info("Up job starting: %s" % path)
with job.fileStore.writeGlobalFileStream() as (fileHandle, outputFileStoreID):
fileHandle = codecs.getwriter("utf-8")(fileHandle)
with job.fileStore.readGlobalFileStream(inputFileID1) as inputFileHandle1:
inputFileHandle1 = codecs.getreader("utf-8")(inputFileHandle1)
with job.fileStore.readGlobalFileStream(inputFileID2) as inputFileHandle2:
inputFileHandle2 = codecs.getreader("utf-8")(inputFileHandle2)
RealtimeLogger.info(
"Merging %s and %s to %s"
% (inputFileID1, inputFileID2, outputFileStoreID)
)
merge(inputFileHandle1, inputFileHandle2, fileHandle)
# Cleanup up the input files - these deletes will occur after the completion is successful.
job.fileStore.deleteGlobalFile(inputFileID1)
job.fileStore.deleteGlobalFile(inputFileID2)
RealtimeLogger.info("Up job finished: %s" % path)
return outputFileStoreID
[docs]
def sort(file):
"""Sorts the given file."""
with open(file) as f:
lines = f.readlines()
lines.sort()
with open(file, "w") as f:
for line in lines:
f.write(line)
[docs]
def merge(fileHandle1, fileHandle2, outputFileHandle):
"""
Merges together two files maintaining sorted order.
All handles must be text-mode streams.
"""
line2 = fileHandle2.readline()
for line1 in fileHandle1.readlines():
while len(line2) != 0 and line2 <= line1:
outputFileHandle.write(line2)
line2 = fileHandle2.readline()
outputFileHandle.write(line1)
while len(line2) != 0:
outputFileHandle.write(line2)
line2 = fileHandle2.readline()
[docs]
def copySubRangeOfFile(inputFile, fileStart, fileEnd):
"""
Copies the range (in bytes) between fileStart and fileEnd to the given
output file handle.
"""
with open(inputFile) as fileHandle:
fileHandle.seek(fileStart)
data = fileHandle.read(fileEnd - fileStart)
assert len(data) == fileEnd - fileStart
return data
[docs]
def getMidPoint(file, fileStart, fileEnd):
"""
Finds the point in the file to split.
Returns an int i such that fileStart <= i < fileEnd
"""
with open(file) as f:
midPoint = (fileStart + fileEnd) // 2
assert midPoint >= fileStart
f.seek(midPoint)
line = f.readline()
assert len(line) >= 1
if len(line) + midPoint < fileEnd:
return midPoint + len(line) - 1
f.seek(fileStart)
line = f.readline()
assert len(line) >= 1
assert len(line) + fileStart <= fileEnd
return len(line) + fileStart - 1
[docs]
def makeFileToSort(fileName, lines=defaultLines, lineLen=defaultLineLen):
with open(fileName, "w") as f:
for _ in range(lines):
line = (
"".join(random.choice("actgACTGNXYZ") for _ in range(lineLen - 1))
+ "\n"
)
f.write(line)
[docs]
def main(options=None):
if not options:
# deal with command line arguments
parser = ArgumentParser()
Job.Runner.addToilOptions(parser)
parser.add_argument(
"--numLines",
default=defaultLines,
help="Number of lines in file to sort.",
type=int,
)
parser.add_argument(
"--lineLength",
default=defaultLineLen,
help="Length of lines in file to sort.",
type=int,
)
parser.add_argument("--fileToSort", help="The file you wish to sort")
parser.add_argument("--outputFile", help="Where the sorted output will go")
parser.add_argument(
"--overwriteOutput",
help="Write over the output file if it already exists.",
default=True,
)
parser.add_argument(
"--N",
dest="N",
help="The threshold below which a serial sort function is used to sort file. "
"All lines must of length less than or equal to N or program will fail",
default=10000,
)
parser.add_argument(
"--downCheckpoints",
action="store_true",
help="If this option is set, the workflow will make checkpoints on its way through"
'the recursive "down" part of the sort',
)
parser.add_argument(
"--sortMemory",
dest="sortMemory",
help="Memory for jobs that sort chunks of the file.",
default=None,
)
parser.add_argument(
"--mergeMemory",
dest="mergeMemory",
help="Memory for jobs that collate results.",
default=None,
)
options = parser.parse_args()
if not hasattr(options, "sortMemory") or not options.sortMemory:
options.sortMemory = sortMemory
if not hasattr(options, "mergeMemory") or not options.mergeMemory:
options.mergeMemory = sortMemory
# do some input verification
sortedFileName = options.outputFile or "sortedFile.txt"
if not options.overwriteOutput and os.path.exists(sortedFileName):
print(
f"Output file {sortedFileName} already exists. "
f"Delete it to run the sort example again or use --overwriteOutput=True"
)
exit()
fileName = options.fileToSort
if options.fileToSort is None:
# make the file ourselves
fileName = "fileToSort.txt"
if os.path.exists(fileName):
print(f"Sorting existing file: {fileName}")
else:
print(
f"No sort file specified. Generating one automatically called: {fileName}."
)
makeFileToSort(
fileName=fileName, lines=options.numLines, lineLen=options.lineLength
)
else:
if not os.path.exists(options.fileToSort):
raise RuntimeError("File to sort does not exist: %s" % options.fileToSort)
if int(options.N) <= 0:
raise RuntimeError("Invalid value of N: %s" % options.N)
# Now we are ready to run
with Toil(options) as workflow:
sortedFileURL = "file://" + os.path.abspath(sortedFileName)
if not workflow.options.restart:
sortFileURL = "file://" + os.path.abspath(fileName)
sortFileID = workflow.importFile(sortFileURL)
sortedFileID = workflow.start(
Job.wrapJobFn(
setup,
sortFileID,
int(options.N),
options.downCheckpoints,
options=options,
memory=sortMemory,
)
)
else:
sortedFileID = workflow.restart()
workflow.exportFile(sortedFileID, sortedFileURL)
if __name__ == "__main__":
main()