Source code for toil.test.src.jobFileStoreTest

# Copyright (C) 2015-2021 Regents of the University of California
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import errno
import logging
import os
import random

from toil.common import Toil
from toil.fileStores import FileID
from toil.job import Job
from toil.test import ToilTest, slow

logger = logging.getLogger(__name__)

PREFIX_LENGTH=200


# TODO: This test is ancient and while similar tests exist in `fileStoreTest.py`, none of them look
# at the contents of read files and thus we will let this test remain as-is.
[docs] class JobFileStoreTest(ToilTest): """ Tests testing the methods defined in :class:toil.fileStores.abstractFileStore.AbstractFileStore. """
[docs] def testCachingFileStore(self): options = Job.Runner.getDefaultOptions(self._getTestJobStorePath()) with Toil(options) as workflow: workflow.start(Job.wrapJobFn(simpleFileStoreJob))
[docs] def testNonCachingFileStore(self): options = Job.Runner.getDefaultOptions(self._getTestJobStorePath()) options.caching = False with Toil(options) as workflow: workflow.start(Job.wrapJobFn(simpleFileStoreJob))
def _testJobFileStore(self, retryCount=0, badWorker=0.0, stringNo=1, stringLength=1000000, testNo=2): """ Creates a chain of jobs, each reading and writing files using the toil.fileStores.abstractFileStore.AbstractFileStore interface. Verifies the files written are always what we expect. """ for test in range(testNo): #Make a list of random strings, each of 100k chars and hash the first 200 #base prefix to the string def randomString(): chars = "ABCDEFGHIJKLMNOPQRSTUVWXYZ" s = "".join([random.choice(chars) for i in range(stringLength)]) return s[:PREFIX_LENGTH], s #Total length is 2 million characters (20 strings of length 100K each) testStrings = dict([randomString() for i in range(stringNo)]) options = Job.Runner.getDefaultOptions(self._getTestJobStorePath()) options.logLevel = "DEBUG" options.retryCount=retryCount options.badWorker=badWorker options.badWorkerFailInterval = 1.0 chainLength = 10 # Run the workflow, the return value being the number of failed jobs Job.Runner.startToil(Job.wrapJobFn(fileTestJob, [], testStrings, chainLength), options)
[docs] def testJobFileStore(self): """ Tests case that about half the files are cached """ self._testJobFileStore(retryCount=0, badWorker=0.0, stringNo=5, stringLength=1000000)
[docs] @slow def testJobFileStoreWithBadWorker(self): """ Tests case that about half the files are cached and the worker is randomly failing. """ self._testJobFileStore(retryCount=100, badWorker=0.5, stringNo=5, stringLength=1000000)
[docs] def fileTestJob(job, inputFileStoreIDs, testStrings, chainLength): """ Test job exercises toil.fileStores.abstractFileStore.AbstractFileStore functions """ outputFileStoreIds = [] #Strings passed to the next job in the chain #Load the input jobStoreFileIDs and check that they map to the #same set of random input strings, exercising the different functions in the fileStore interface for fileStoreID in inputFileStoreIDs: if random.random() > 0.5: #Read the file for the fileStoreID, randomly picking a way to invoke readGlobalFile if random.random() > 0.5: local_path = job.fileStore.getLocalTempFileName() if random.random() > 0.5 else None cache = random.random() > 0.5 tempFile = job.fileStore.readGlobalFile(fileStoreID, local_path, cache=cache) with open(tempFile) as fH: string = fH.readline() logging.info("Downloaded %s to local path %s with cache %s and got %s with %d letters", fileStoreID, local_path, cache, tempFile, len(string)) else: #Check the local file is as we expect with job.fileStore.readGlobalFileStream(fileStoreID, 'utf-8') as fH: string = fH.readline() logging.info("Streamed %s and got %d letters", fileStoreID, len(string)) #Check the string we get back is what we expect assert string[:PREFIX_LENGTH] in testStrings, f"Could not find string: {string[:PREFIX_LENGTH]}" assert testStrings[string[:PREFIX_LENGTH]] == string, f"Mismatch in string: {string[:PREFIX_LENGTH]}" #This allows the file to be passed to the next job outputFileStoreIds.append(fileStoreID) else: #This tests deletion logging.info("Deleted %s", fileStoreID) job.fileStore.deleteGlobalFile(fileStoreID) #Fill out the output strings until we have the same number as the input strings #exercising different ways of writing files to the file store while len(outputFileStoreIds) < len(testStrings): #Pick a string and write it into a file testString = random.choice(list(testStrings.values())) if random.random() > 0.5: #Make a local copy of the file tempFile = job.fileStore.getLocalTempFile() if random.random() > 0.5 \ else os.path.join(job.fileStore.getLocalTempDir(), "temp.txt") with open(tempFile, 'w') as fH: fH.write(testString) #Write a local copy of the file using the local file fileStoreID = job.fileStore.writeGlobalFile(tempFile) # Make sure it returned a valid and correct FileID with the right size assert isinstance(fileStoreID, FileID) assert fileStoreID.size == len(testString.encode('utf-8')) outputFileStoreIds.append(fileStoreID) else: #Use the writeGlobalFileStream method to write the file with job.fileStore.writeGlobalFileStream() as (fH, fileStoreID): fH.write(testString.encode('utf-8')) outputFileStoreIds.append(fileStoreID) #Make sure it returned a valid and correct FileID with the right size assert isinstance(fileStoreID, FileID) assert fileStoreID.size == len(testString.encode('utf-8')) if chainLength > 0: #Make a child that will read these files and check it gets the same results job.addChildJobFn(fileTestJob, outputFileStoreIds, testStrings, chainLength-1)
fileStoreString = "Testing writeGlobalFile" streamingFileStoreString = "Testing writeGlobalFileStream"
[docs] def simpleFileStoreJob(job): localFilePath = os.path.join(job.fileStore.getLocalTempDir(), "parentTemp.txt") with open(localFilePath, 'w') as f: f.write(fileStoreString) testID1 = job.fileStore.writeGlobalFile(localFilePath) testID2 = None with job.fileStore.writeGlobalFileStream() as (f, fileID): f.write(streamingFileStoreString.encode('utf-8')) testID2 = fileID job.addChildJobFn(fileStoreChild, testID1, testID2)
[docs] def fileStoreChild(job, testID1, testID2): with job.fileStore.readGlobalFileStream(testID1) as f: assert(f.read().decode('utf-8') == fileStoreString) localFilePath = os.path.join(job.fileStore.getLocalTempDir(), "childTemp.txt") job.fileStore.readGlobalFile(testID2, localFilePath) with open(localFilePath) as f: assert(f.read() == streamingFileStoreString) job.fileStore.deleteLocalFile(testID2) try: job.fileStore.deleteLocalFile(testID1) except OSError as e: if e.errno == errno.ENOENT: # indicates that the file was not found pass else: raise else: raise RuntimeError("Deleting a non-existant file did not throw an exception") for fileID in (testID1, testID2): job.fileStore.deleteGlobalFile(fileID)