Source code for toil.test.src.resumabilityTest
# Copyright (C) 2015-2021 Regents of the University of California
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
from toil.exceptions import FailedJobsException
from toil.job import Job
from toil.jobStores.abstractJobStore import NoSuchFileException
from toil.test import ToilTest, slow
[docs]
class ResumabilityTest(ToilTest):
"""
https://github.com/BD2KGenomics/toil/issues/808
"""
[docs]
@slow
def test(self):
"""
Tests that a toil workflow that fails once can be resumed without a NoSuchJobException.
"""
options = Job.Runner.getDefaultOptions(self._getTestJobStorePath())
options.logLevel = "INFO"
options.retryCount = 0
root = Job.wrapJobFn(parent)
with self.assertRaises(FailedJobsException):
# This one is intended to fail.
Job.Runner.startToil(root, options)
# Resume the workflow. Unfortunately, we have to check for
# this bug using the logging output, since although the
# NoSuchJobException causes the worker to fail, the batch
# system code notices that the job has been deleted despite
# the failure and avoids the failure.
options.restart = True
tempDir = self._createTempDir()
options.logFile = os.path.join(tempDir, "log.txt")
Job.Runner.startToil(root, options)
with open(options.logFile) as f:
logString = f.read()
# We are looking for e.g. "Batch system is reporting that
# the jobGraph with batch system ID: 1 and jobGraph
# store ID: n/t/jobwbijqL failed with exit value 1"
self.assertTrue("failed with exit value" not in logString)
[docs]
def test_chaining(self):
"""
Tests that a job which is chained to and fails can resume and succeed.
"""
options = Job.Runner.getDefaultOptions(self._getTestJobStorePath())
options.logLevel = "DEBUG"
options.retryCount = 0
tempDir = self._createTempDir()
options.logFile = os.path.join(tempDir, "log.txt")
root = Job.wrapJobFn(chaining_parent)
with self.assertRaises(FailedJobsException):
# This one is intended to fail.
Job.Runner.startToil(root, options)
with open(options.logFile, 'r') as f:
log_content = f.read()
# Make sure we actually did do chaining
assert "Chaining from" in log_content
# Because of the chaining, the problem we are looking for is the job
# with the root ID not being able to load the body of a job with a
# different ID. That doesn't look like a job deleted despite failure.
options.restart = True
Job.Runner.startToil(root, options)
[docs]
def parent(job):
"""
Set up a bunch of dummy child jobs, and a bad job that needs to be
restarted as the follow on.
"""
for _ in range(5):
job.addChildJobFn(goodChild)
job.addFollowOnJobFn(badChild)
[docs]
def chaining_parent(job):
"""
Set up a failing job to chain to.
"""
job.addFollowOnJobFn(badChild)
[docs]
def goodChild(job):
"""
Does nothing.
"""
return
[docs]
def badChild(job):
"""
Fails the first time it's run, succeeds the second time.
"""
try:
with job.fileStore.jobStore.read_shared_file_stream("alreadyRun") as fileHandle:
fileHandle.read()
except NoSuchFileException as ex:
with job.fileStore.jobStore.write_shared_file_stream("alreadyRun", encrypted=False) as fileHandle:
fileHandle.write(b"failed once\n")
raise RuntimeError(f"this is an expected error: {str(ex)}")