# Copyright (C) 2015-2021 Regents of the University of California
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import errno
import logging
import os
import random
from toil.common import Toil
from toil.fileStores import FileID
from toil.job import Job
from toil.test import ToilTest, slow
logger = logging.getLogger(__name__)
PREFIX_LENGTH = 200
# TODO: This test is ancient and while similar tests exist in `fileStoreTest.py`, none of them look
# at the contents of read files and thus we will let this test remain as-is.
[docs]
class JobFileStoreTest(ToilTest):
"""
Tests testing the methods defined in :class:toil.fileStores.abstractFileStore.AbstractFileStore.
"""
[docs]
def testCachingFileStore(self):
options = Job.Runner.getDefaultOptions(self._getTestJobStorePath())
with Toil(options) as workflow:
workflow.start(Job.wrapJobFn(simpleFileStoreJob))
[docs]
def testNonCachingFileStore(self):
options = Job.Runner.getDefaultOptions(self._getTestJobStorePath())
options.caching = False
with Toil(options) as workflow:
workflow.start(Job.wrapJobFn(simpleFileStoreJob))
def _testJobFileStore(
self, retryCount=0, badWorker=0.0, stringNo=1, stringLength=1000000, testNo=2
):
"""
Creates a chain of jobs, each reading and writing files using the
toil.fileStores.abstractFileStore.AbstractFileStore interface. Verifies the files written are always what we
expect.
"""
for test in range(testNo):
# Make a list of random strings, each of 100k chars and hash the first 200
# base prefix to the string
def randomString():
chars = "ABCDEFGHIJKLMNOPQRSTUVWXYZ"
s = "".join([random.choice(chars) for i in range(stringLength)])
return s[:PREFIX_LENGTH], s
# Total length is 2 million characters (20 strings of length 100K each)
testStrings = dict([randomString() for i in range(stringNo)])
options = Job.Runner.getDefaultOptions(self._getTestJobStorePath())
options.logLevel = "DEBUG"
options.retryCount = retryCount
options.badWorker = badWorker
options.badWorkerFailInterval = 1.0
chainLength = 10
# Run the workflow, the return value being the number of failed jobs
Job.Runner.startToil(
Job.wrapJobFn(fileTestJob, [], testStrings, chainLength), options
)
[docs]
def testJobFileStore(self):
"""
Tests case that about half the files are cached
"""
self._testJobFileStore(
retryCount=0, badWorker=0.0, stringNo=5, stringLength=1000000
)
[docs]
@slow
def testJobFileStoreWithBadWorker(self):
"""
Tests case that about half the files are cached and the worker is randomly
failing.
"""
self._testJobFileStore(
retryCount=100, badWorker=0.5, stringNo=5, stringLength=1000000
)
[docs]
def fileTestJob(job, inputFileStoreIDs, testStrings, chainLength):
"""
Test job exercises toil.fileStores.abstractFileStore.AbstractFileStore functions
"""
outputFileStoreIds = [] # Strings passed to the next job in the chain
# Load the input jobStoreFileIDs and check that they map to the
# same set of random input strings, exercising the different functions in the fileStore interface
for fileStoreID in inputFileStoreIDs:
if random.random() > 0.5:
# Read the file for the fileStoreID, randomly picking a way to invoke readGlobalFile
if random.random() > 0.5:
local_path = (
job.fileStore.getLocalTempFileName()
if random.random() > 0.5
else None
)
cache = random.random() > 0.5
tempFile = job.fileStore.readGlobalFile(
fileStoreID, local_path, cache=cache
)
with open(tempFile) as fH:
string = fH.readline()
logging.info(
"Downloaded %s to local path %s with cache %s and got %s with %d letters",
fileStoreID,
local_path,
cache,
tempFile,
len(string),
)
else:
# Check the local file is as we expect
with job.fileStore.readGlobalFileStream(fileStoreID, "utf-8") as fH:
string = fH.readline()
logging.info("Streamed %s and got %d letters", fileStoreID, len(string))
# Check the string we get back is what we expect
assert (
string[:PREFIX_LENGTH] in testStrings
), f"Could not find string: {string[:PREFIX_LENGTH]}"
assert (
testStrings[string[:PREFIX_LENGTH]] == string
), f"Mismatch in string: {string[:PREFIX_LENGTH]}"
# This allows the file to be passed to the next job
outputFileStoreIds.append(fileStoreID)
else:
# This tests deletion
logging.info("Deleted %s", fileStoreID)
job.fileStore.deleteGlobalFile(fileStoreID)
# Fill out the output strings until we have the same number as the input strings
# exercising different ways of writing files to the file store
while len(outputFileStoreIds) < len(testStrings):
# Pick a string and write it into a file
testString = random.choice(list(testStrings.values()))
if random.random() > 0.5:
# Make a local copy of the file
tempFile = (
job.fileStore.getLocalTempFile()
if random.random() > 0.5
else os.path.join(job.fileStore.getLocalTempDir(), "temp.txt")
)
with open(tempFile, "w") as fH:
fH.write(testString)
# Write a local copy of the file using the local file
fileStoreID = job.fileStore.writeGlobalFile(tempFile)
# Make sure it returned a valid and correct FileID with the right size
assert isinstance(fileStoreID, FileID)
assert fileStoreID.size == len(testString.encode("utf-8"))
outputFileStoreIds.append(fileStoreID)
else:
# Use the writeGlobalFileStream method to write the file
with job.fileStore.writeGlobalFileStream() as (fH, fileStoreID):
fH.write(testString.encode("utf-8"))
outputFileStoreIds.append(fileStoreID)
# Make sure it returned a valid and correct FileID with the right size
assert isinstance(fileStoreID, FileID)
assert fileStoreID.size == len(testString.encode("utf-8"))
if chainLength > 0:
# Make a child that will read these files and check it gets the same results
job.addChildJobFn(fileTestJob, outputFileStoreIds, testStrings, chainLength - 1)
fileStoreString = "Testing writeGlobalFile"
streamingFileStoreString = "Testing writeGlobalFileStream"
[docs]
def simpleFileStoreJob(job):
localFilePath = os.path.join(job.fileStore.getLocalTempDir(), "parentTemp.txt")
with open(localFilePath, "w") as f:
f.write(fileStoreString)
testID1 = job.fileStore.writeGlobalFile(localFilePath)
testID2 = None
with job.fileStore.writeGlobalFileStream() as (f, fileID):
f.write(streamingFileStoreString.encode("utf-8"))
testID2 = fileID
job.addChildJobFn(fileStoreChild, testID1, testID2)
[docs]
def fileStoreChild(job, testID1, testID2):
with job.fileStore.readGlobalFileStream(testID1) as f:
assert f.read().decode("utf-8") == fileStoreString
localFilePath = os.path.join(job.fileStore.getLocalTempDir(), "childTemp.txt")
job.fileStore.readGlobalFile(testID2, localFilePath)
with open(localFilePath) as f:
assert f.read() == streamingFileStoreString
job.fileStore.deleteLocalFile(testID2)
try:
job.fileStore.deleteLocalFile(testID1)
except OSError as e:
if e.errno == errno.ENOENT: # indicates that the file was not found
pass
else:
raise
else:
raise RuntimeError("Deleting a non-existant file did not throw an exception")
for fileID in (testID1, testID2):
job.fileStore.deleteGlobalFile(fileID)