Source code for toil.test.sort.sort

# Copyright (C) 2015-2021 Regents of the University of California
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""A demonstration of toil. Sorts the lines of a file into ascending order by doing a parallel merge sort.
"""
import codecs
import os
import random
import shutil

from configargparse import ArgumentParser

from toil.common import Toil
from toil.job import Job
from toil.realtimeLogger import RealtimeLogger

defaultLines = 1000
defaultLineLen = 50
sortMemory = '600M'


[docs] def setup(job, inputFile, N, downCheckpoints, options): """ Sets up the sort. Returns the FileID of the sorted file """ RealtimeLogger.info("Starting the merge sort") return job.addChildJobFn(down, inputFile, N, 'root', downCheckpoints, options = options, preemptible=True, memory=sortMemory).rv()
[docs] def down(job, inputFileStoreID, N, path, downCheckpoints, options, memory=sortMemory): """ Input is a file, a subdivision size N, and a path in the hierarchy of jobs. If the range is larger than a threshold N the range is divided recursively and a follow on job is then created which merges back the results else the file is sorted and placed in the output. """ RealtimeLogger.info("Down job starting: %s" % path) # Read the file inputFile = job.fileStore.readGlobalFile(inputFileStoreID, cache=False) length = os.path.getsize(inputFile) if length > N: # We will subdivide the file RealtimeLogger.critical("Splitting file: %s of size: %s" % (inputFileStoreID, length)) # Split the file into two copies midPoint = getMidPoint(inputFile, 0, length) t1 = job.fileStore.getLocalTempFile() with open(t1, 'w') as fH: fH.write(copySubRangeOfFile(inputFile, 0, midPoint+1)) t2 = job.fileStore.getLocalTempFile() with open(t2, 'w') as fH: fH.write(copySubRangeOfFile(inputFile, midPoint+1, length)) # Call down recursively. By giving the rv() of the two jobs as inputs to the follow-on job, up, # we communicate the dependency without hindering concurrency. result = job.addFollowOnJobFn(up, job.addChildJobFn(down, job.fileStore.writeGlobalFile(t1), N, path + '/0', downCheckpoints, checkpoint=downCheckpoints, options=options, preemptible=True, memory=options.sortMemory).rv(), job.addChildJobFn(down, job.fileStore.writeGlobalFile(t2), N, path + '/1', downCheckpoints, checkpoint=downCheckpoints, options=options, preemptible=True, memory=options.mergeMemory).rv(), path + '/up', preemptible=True, options=options, memory=options.sortMemory).rv() else: # We can sort this bit of the file RealtimeLogger.critical("Sorting file: %s of size: %s" % (inputFileStoreID, length)) # Sort the copy and write back to the fileStore shutil.copyfile(inputFile, inputFile + '.sort') sort(inputFile + '.sort') result = job.fileStore.writeGlobalFile(inputFile + '.sort') RealtimeLogger.info("Down job finished: %s" % path) return result
[docs] def up(job, inputFileID1, inputFileID2, path, options, memory=sortMemory): """ Merges the two files and places them in the output. """ RealtimeLogger.info("Up job starting: %s" % path) with job.fileStore.writeGlobalFileStream() as (fileHandle, outputFileStoreID): fileHandle = codecs.getwriter('utf-8')(fileHandle) with job.fileStore.readGlobalFileStream(inputFileID1) as inputFileHandle1: inputFileHandle1 = codecs.getreader('utf-8')(inputFileHandle1) with job.fileStore.readGlobalFileStream(inputFileID2) as inputFileHandle2: inputFileHandle2 = codecs.getreader('utf-8')(inputFileHandle2) RealtimeLogger.info("Merging %s and %s to %s" % (inputFileID1, inputFileID2, outputFileStoreID)) merge(inputFileHandle1, inputFileHandle2, fileHandle) # Cleanup up the input files - these deletes will occur after the completion is successful. job.fileStore.deleteGlobalFile(inputFileID1) job.fileStore.deleteGlobalFile(inputFileID2) RealtimeLogger.info("Up job finished: %s" % path) return outputFileStoreID
[docs] def sort(file): """Sorts the given file.""" with open(file) as f: lines = f.readlines() lines.sort() with open(file, 'w') as f: for line in lines: f.write(line)
[docs] def merge(fileHandle1, fileHandle2, outputFileHandle): """ Merges together two files maintaining sorted order. All handles must be text-mode streams. """ line2 = fileHandle2.readline() for line1 in fileHandle1.readlines(): while len(line2) != 0 and line2 <= line1: outputFileHandle.write(line2) line2 = fileHandle2.readline() outputFileHandle.write(line1) while len(line2) != 0: outputFileHandle.write(line2) line2 = fileHandle2.readline()
[docs] def copySubRangeOfFile(inputFile, fileStart, fileEnd): """ Copies the range (in bytes) between fileStart and fileEnd to the given output file handle. """ with open(inputFile) as fileHandle: fileHandle.seek(fileStart) data = fileHandle.read(fileEnd - fileStart) assert len(data) == fileEnd - fileStart return data
[docs] def getMidPoint(file, fileStart, fileEnd): """ Finds the point in the file to split. Returns an int i such that fileStart <= i < fileEnd """ with open(file) as f: midPoint = (fileStart + fileEnd) // 2 assert midPoint >= fileStart f.seek(midPoint) line = f.readline() assert len(line) >= 1 if len(line) + midPoint < fileEnd: return midPoint + len(line) - 1 f.seek(fileStart) line = f.readline() assert len(line) >= 1 assert len(line) + fileStart <= fileEnd return len(line) + fileStart - 1
[docs] def makeFileToSort(fileName, lines=defaultLines, lineLen=defaultLineLen): with open(fileName, 'w') as f: for _ in range(lines): line = "".join(random.choice('actgACTGNXYZ') for _ in range(lineLen - 1)) + '\n' f.write(line)
[docs] def main(options=None): if not options: # deal with command line arguments parser = ArgumentParser() Job.Runner.addToilOptions(parser) parser.add_argument('--numLines', default=defaultLines, help='Number of lines in file to sort.', type=int) parser.add_argument('--lineLength', default=defaultLineLen, help='Length of lines in file to sort.', type=int) parser.add_argument("--fileToSort", help="The file you wish to sort") parser.add_argument("--outputFile", help="Where the sorted output will go") parser.add_argument("--overwriteOutput", help="Write over the output file if it already exists.", default=True) parser.add_argument("--N", dest="N", help="The threshold below which a serial sort function is used to sort file. " "All lines must of length less than or equal to N or program will fail", default=10000) parser.add_argument('--downCheckpoints', action='store_true', help='If this option is set, the workflow will make checkpoints on its way through' 'the recursive "down" part of the sort') parser.add_argument("--sortMemory", dest="sortMemory", help="Memory for jobs that sort chunks of the file.", default=None) parser.add_argument("--mergeMemory", dest="mergeMemory", help="Memory for jobs that collate results.", default=None) options = parser.parse_args() if not hasattr(options, "sortMemory") or not options.sortMemory: options.sortMemory = sortMemory if not hasattr(options, "mergeMemory") or not options.mergeMemory: options.mergeMemory = sortMemory # do some input verification sortedFileName = options.outputFile or "sortedFile.txt" if not options.overwriteOutput and os.path.exists(sortedFileName): print(f'Output file {sortedFileName} already exists. ' f'Delete it to run the sort example again or use --overwriteOutput=True') exit() fileName = options.fileToSort if options.fileToSort is None: # make the file ourselves fileName = 'fileToSort.txt' if os.path.exists(fileName): print(f'Sorting existing file: {fileName}') else: print(f'No sort file specified. Generating one automatically called: {fileName}.') makeFileToSort(fileName=fileName, lines=options.numLines, lineLen=options.lineLength) else: if not os.path.exists(options.fileToSort): raise RuntimeError("File to sort does not exist: %s" % options.fileToSort) if int(options.N) <= 0: raise RuntimeError("Invalid value of N: %s" % options.N) # Now we are ready to run with Toil(options) as workflow: sortedFileURL = 'file://' + os.path.abspath(sortedFileName) if not workflow.options.restart: sortFileURL = 'file://' + os.path.abspath(fileName) sortFileID = workflow.importFile(sortFileURL) sortedFileID = workflow.start(Job.wrapJobFn(setup, sortFileID, int(options.N), options.downCheckpoints, options=options, memory=sortMemory)) else: sortedFileID = workflow.restart() workflow.exportFile(sortedFileID, sortedFileURL)
if __name__ == '__main__': main()