Source code for toil.test.sort.restart_sort

# Copyright (C) 2015-2021 Regents of the University of California
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""
A demonstration of toil. Sorts the lines of a file into ascending order by doing a parallel merge sort.
This is an intentionally buggy version that doesn't include restart() for testing purposes.
"""
import codecs
import os
import random
import shutil

from configargparse import ArgumentParser

from toil.common import Toil
from toil.job import Job
from toil.realtimeLogger import RealtimeLogger

defaultLines = 1000
defaultLineLen = 50
sortMemory = "600M"


[docs] def setup(job, inputFile, N, downCheckpoints, options): """ Sets up the sort. Returns the FileID of the sorted file """ RealtimeLogger.info("Starting the merge sort") return job.addChildJobFn( down, inputFile, N, "root", downCheckpoints, options=options, preemptible=True, memory=sortMemory, ).rv()
[docs] def down(job, inputFileStoreID, N, path, downCheckpoints, options, memory=sortMemory): """ Input is a file, a subdivision size N, and a path in the hierarchy of jobs. If the range is larger than a threshold N the range is divided recursively and a follow on job is then created which merges back the results else the file is sorted and placed in the output. """ RealtimeLogger.info("Down job starting: %s" % path) # Read the file inputFile = job.fileStore.readGlobalFile(inputFileStoreID, cache=False) length = os.path.getsize(inputFile) if length > N: # We will subdivide the file RealtimeLogger.critical( "Splitting file: %s of size: %s" % (inputFileStoreID, length) ) # Split the file into two copies midPoint = getMidPoint(inputFile, 0, length) t1 = job.fileStore.getLocalTempFile() with open(t1, "w") as fH: fH.write(copySubRangeOfFile(inputFile, 0, midPoint + 1)) t2 = job.fileStore.getLocalTempFile() with open(t2, "w") as fH: fH.write(copySubRangeOfFile(inputFile, midPoint + 1, length)) # Call down recursively. By giving the rv() of the two jobs as inputs to the follow-on job, up, # we communicate the dependency without hindering concurrency. result = job.addFollowOnJobFn( up, job.addChildJobFn( down, job.fileStore.writeGlobalFile(t1), N, path + "/0", downCheckpoints, checkpoint=downCheckpoints, options=options, preemptible=True, memory=options.sortMemory, ).rv(), job.addChildJobFn( down, job.fileStore.writeGlobalFile(t2), N, path + "/1", downCheckpoints, checkpoint=downCheckpoints, options=options, preemptible=True, memory=options.mergeMemory, ).rv(), path + "/up", preemptible=True, options=options, memory=options.sortMemory, ).rv() else: # We can sort this bit of the file RealtimeLogger.critical( "Sorting file: %s of size: %s" % (inputFileStoreID, length) ) # Sort the copy and write back to the fileStore shutil.copyfile(inputFile, inputFile + ".sort") sort(inputFile + ".sort") result = job.fileStore.writeGlobalFile(inputFile + ".sort") RealtimeLogger.info("Down job finished: %s" % path) return result
[docs] def up(job, inputFileID1, inputFileID2, path, options, memory=sortMemory): """ Merges the two files and places them in the output. """ RealtimeLogger.info("Up job starting: %s" % path) with job.fileStore.writeGlobalFileStream() as (fileHandle, outputFileStoreID): fileHandle = codecs.getwriter("utf-8")(fileHandle) with job.fileStore.readGlobalFileStream(inputFileID1) as inputFileHandle1: inputFileHandle1 = codecs.getreader("utf-8")(inputFileHandle1) with job.fileStore.readGlobalFileStream(inputFileID2) as inputFileHandle2: inputFileHandle2 = codecs.getreader("utf-8")(inputFileHandle2) RealtimeLogger.info( "Merging %s and %s to %s" % (inputFileID1, inputFileID2, outputFileStoreID) ) merge(inputFileHandle1, inputFileHandle2, fileHandle) # Cleanup up the input files - these deletes will occur after the completion is successful. job.fileStore.deleteGlobalFile(inputFileID1) job.fileStore.deleteGlobalFile(inputFileID2) RealtimeLogger.info("Up job finished: %s" % path) return outputFileStoreID
[docs] def sort(file): """Sorts the given file.""" with open(file) as f: lines = f.readlines() lines.sort() with open(file, "w") as f: for line in lines: f.write(line)
[docs] def merge(fileHandle1, fileHandle2, outputFileHandle): """ Merges together two files maintaining sorted order. All handles must be text-mode streams. """ line2 = fileHandle2.readline() for line1 in fileHandle1.readlines(): while len(line2) != 0 and line2 <= line1: outputFileHandle.write(line2) line2 = fileHandle2.readline() outputFileHandle.write(line1) while len(line2) != 0: outputFileHandle.write(line2) line2 = fileHandle2.readline()
[docs] def copySubRangeOfFile(inputFile, fileStart, fileEnd): """ Copies the range (in bytes) between fileStart and fileEnd to the given output file handle. """ with open(inputFile) as fileHandle: fileHandle.seek(fileStart) data = fileHandle.read(fileEnd - fileStart) assert len(data) == fileEnd - fileStart return data
[docs] def getMidPoint(file, fileStart, fileEnd): """ Finds the point in the file to split. Returns an int i such that fileStart <= i < fileEnd """ with open(file) as f: midPoint = (fileStart + fileEnd) // 2 assert midPoint >= fileStart f.seek(midPoint) line = f.readline() assert len(line) >= 1 if len(line) + midPoint < fileEnd: return midPoint + len(line) - 1 f.seek(fileStart) line = f.readline() assert len(line) >= 1 assert len(line) + fileStart <= fileEnd return len(line) + fileStart - 1
[docs] def makeFileToSort(fileName, lines=defaultLines, lineLen=defaultLineLen): with open(fileName, "w") as f: for _ in range(lines): line = ( "".join(random.choice("actgACTGNXYZ") for _ in range(lineLen - 1)) + "\n" ) f.write(line)
[docs] def main(options=None): if not options: # deal with command line arguments parser = ArgumentParser() Job.Runner.addToilOptions(parser) parser.add_argument( "--numLines", default=defaultLines, help="Number of lines in file to sort.", type=int, ) parser.add_argument( "--lineLength", default=defaultLineLen, help="Length of lines in file to sort.", type=int, ) parser.add_argument("--fileToSort", help="The file you wish to sort") parser.add_argument("--outputFile", help="Where the sorted output will go") parser.add_argument( "--overwriteOutput", help="Write over the output file if it already exists.", default=True, ) parser.add_argument( "--N", dest="N", help="The threshold below which a serial sort function is used to sort file. " "All lines must of length less than or equal to N or program will fail", default=10000, ) parser.add_argument( "--downCheckpoints", action="store_true", help="If this option is set, the workflow will make checkpoints on its way through" 'the recursive "down" part of the sort', ) parser.add_argument( "--sortMemory", dest="sortMemory", help="Memory for jobs that sort chunks of the file.", default=None, ) parser.add_argument( "--mergeMemory", dest="mergeMemory", help="Memory for jobs that collate results.", default=None, ) options = parser.parse_args() if not hasattr(options, "sortMemory") or not options.sortMemory: options.sortMemory = sortMemory if not hasattr(options, "mergeMemory") or not options.mergeMemory: options.mergeMemory = sortMemory # do some input verification sortedFileName = options.outputFile or "sortedFile.txt" if not options.overwriteOutput and os.path.exists(sortedFileName): print( f"Output file {sortedFileName} already exists. " f"Delete it to run the sort example again or use --overwriteOutput=True" ) exit() fileName = options.fileToSort if options.fileToSort is None: # make the file ourselves fileName = "fileToSort.txt" if os.path.exists(fileName): print(f"Sorting existing file: {fileName}") else: print( f"No sort file specified. Generating one automatically called: {fileName}." ) makeFileToSort( fileName=fileName, lines=options.numLines, lineLen=options.lineLength ) else: if not os.path.exists(options.fileToSort): raise RuntimeError("File to sort does not exist: %s" % options.fileToSort) if int(options.N) <= 0: raise RuntimeError("Invalid value of N: %s" % options.N) # Now we are ready to run with Toil(options) as workflow: sortedFileURL = "file://" + os.path.abspath(sortedFileName) # raise Exception('test') if not workflow.options.restart: sortFileURL = "file://" + os.path.abspath(fileName) sortFileID = workflow.importFile(sortFileURL) sortedFileID = workflow.start( Job.wrapJobFn( setup, sortFileID, int(options.N), options.downCheckpoints, options=options, memory=sortMemory, ) ) """ The else block is removed here to test that the job store is not destroyed when attempting to resume without restart(). """
if __name__ == "__main__": main()