Source code for toil.test.src.jobFileStoreTest

# Copyright (C) 2015-2021 Regents of the University of California
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import errno
import logging
import os
import random

from toil.common import Toil
from toil.fileStores import FileID
from toil.job import Job
from toil.test import ToilTest, slow

logger = logging.getLogger(__name__)

PREFIX_LENGTH = 200


# TODO: This test is ancient and while similar tests exist in `fileStoreTest.py`, none of them look
# at the contents of read files and thus we will let this test remain as-is.
[docs] class JobFileStoreTest(ToilTest): """ Tests testing the methods defined in :class:toil.fileStores.abstractFileStore.AbstractFileStore. """
[docs] def testCachingFileStore(self): options = Job.Runner.getDefaultOptions(self._getTestJobStorePath()) with Toil(options) as workflow: workflow.start(Job.wrapJobFn(simpleFileStoreJob))
[docs] def testNonCachingFileStore(self): options = Job.Runner.getDefaultOptions(self._getTestJobStorePath()) options.caching = False with Toil(options) as workflow: workflow.start(Job.wrapJobFn(simpleFileStoreJob))
def _testJobFileStore( self, retryCount=0, badWorker=0.0, stringNo=1, stringLength=1000000, testNo=2 ): """ Creates a chain of jobs, each reading and writing files using the toil.fileStores.abstractFileStore.AbstractFileStore interface. Verifies the files written are always what we expect. """ for test in range(testNo): # Make a list of random strings, each of 100k chars and hash the first 200 # base prefix to the string def randomString(): chars = "ABCDEFGHIJKLMNOPQRSTUVWXYZ" s = "".join([random.choice(chars) for i in range(stringLength)]) return s[:PREFIX_LENGTH], s # Total length is 2 million characters (20 strings of length 100K each) testStrings = dict([randomString() for i in range(stringNo)]) options = Job.Runner.getDefaultOptions(self._getTestJobStorePath()) options.logLevel = "DEBUG" options.retryCount = retryCount options.badWorker = badWorker options.badWorkerFailInterval = 1.0 chainLength = 10 # Run the workflow, the return value being the number of failed jobs Job.Runner.startToil( Job.wrapJobFn(fileTestJob, [], testStrings, chainLength), options )
[docs] def testJobFileStore(self): """ Tests case that about half the files are cached """ self._testJobFileStore( retryCount=0, badWorker=0.0, stringNo=5, stringLength=1000000 )
[docs] @slow def testJobFileStoreWithBadWorker(self): """ Tests case that about half the files are cached and the worker is randomly failing. """ self._testJobFileStore( retryCount=100, badWorker=0.5, stringNo=5, stringLength=1000000 )
[docs] def fileTestJob(job, inputFileStoreIDs, testStrings, chainLength): """ Test job exercises toil.fileStores.abstractFileStore.AbstractFileStore functions """ outputFileStoreIds = [] # Strings passed to the next job in the chain # Load the input jobStoreFileIDs and check that they map to the # same set of random input strings, exercising the different functions in the fileStore interface for fileStoreID in inputFileStoreIDs: if random.random() > 0.5: # Read the file for the fileStoreID, randomly picking a way to invoke readGlobalFile if random.random() > 0.5: local_path = ( job.fileStore.getLocalTempFileName() if random.random() > 0.5 else None ) cache = random.random() > 0.5 tempFile = job.fileStore.readGlobalFile( fileStoreID, local_path, cache=cache ) with open(tempFile) as fH: string = fH.readline() logging.info( "Downloaded %s to local path %s with cache %s and got %s with %d letters", fileStoreID, local_path, cache, tempFile, len(string), ) else: # Check the local file is as we expect with job.fileStore.readGlobalFileStream(fileStoreID, "utf-8") as fH: string = fH.readline() logging.info("Streamed %s and got %d letters", fileStoreID, len(string)) # Check the string we get back is what we expect assert ( string[:PREFIX_LENGTH] in testStrings ), f"Could not find string: {string[:PREFIX_LENGTH]}" assert ( testStrings[string[:PREFIX_LENGTH]] == string ), f"Mismatch in string: {string[:PREFIX_LENGTH]}" # This allows the file to be passed to the next job outputFileStoreIds.append(fileStoreID) else: # This tests deletion logging.info("Deleted %s", fileStoreID) job.fileStore.deleteGlobalFile(fileStoreID) # Fill out the output strings until we have the same number as the input strings # exercising different ways of writing files to the file store while len(outputFileStoreIds) < len(testStrings): # Pick a string and write it into a file testString = random.choice(list(testStrings.values())) if random.random() > 0.5: # Make a local copy of the file tempFile = ( job.fileStore.getLocalTempFile() if random.random() > 0.5 else os.path.join(job.fileStore.getLocalTempDir(), "temp.txt") ) with open(tempFile, "w") as fH: fH.write(testString) # Write a local copy of the file using the local file fileStoreID = job.fileStore.writeGlobalFile(tempFile) # Make sure it returned a valid and correct FileID with the right size assert isinstance(fileStoreID, FileID) assert fileStoreID.size == len(testString.encode("utf-8")) outputFileStoreIds.append(fileStoreID) else: # Use the writeGlobalFileStream method to write the file with job.fileStore.writeGlobalFileStream() as (fH, fileStoreID): fH.write(testString.encode("utf-8")) outputFileStoreIds.append(fileStoreID) # Make sure it returned a valid and correct FileID with the right size assert isinstance(fileStoreID, FileID) assert fileStoreID.size == len(testString.encode("utf-8")) if chainLength > 0: # Make a child that will read these files and check it gets the same results job.addChildJobFn(fileTestJob, outputFileStoreIds, testStrings, chainLength - 1)
fileStoreString = "Testing writeGlobalFile" streamingFileStoreString = "Testing writeGlobalFileStream"
[docs] def simpleFileStoreJob(job): localFilePath = os.path.join(job.fileStore.getLocalTempDir(), "parentTemp.txt") with open(localFilePath, "w") as f: f.write(fileStoreString) testID1 = job.fileStore.writeGlobalFile(localFilePath) testID2 = None with job.fileStore.writeGlobalFileStream() as (f, fileID): f.write(streamingFileStoreString.encode("utf-8")) testID2 = fileID job.addChildJobFn(fileStoreChild, testID1, testID2)
[docs] def fileStoreChild(job, testID1, testID2): with job.fileStore.readGlobalFileStream(testID1) as f: assert f.read().decode("utf-8") == fileStoreString localFilePath = os.path.join(job.fileStore.getLocalTempDir(), "childTemp.txt") job.fileStore.readGlobalFile(testID2, localFilePath) with open(localFilePath) as f: assert f.read() == streamingFileStoreString job.fileStore.deleteLocalFile(testID2) try: job.fileStore.deleteLocalFile(testID1) except OSError as e: if e.errno == errno.ENOENT: # indicates that the file was not found pass else: raise else: raise RuntimeError("Deleting a non-existant file did not throw an exception") for fileID in (testID1, testID2): job.fileStore.deleteGlobalFile(fileID)