Source code for toil.test.src.restartDAGTest
# Copyright (C) 2015-2021 Regents of the University of California
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import os
import shutil
import signal
from toil.common import Toil
from toil.exceptions import FailedJobsException
from toil.job import Job
from toil.test import ToilTest, slow
logger = logging.getLogger(__name__)
[docs]
class RestartDAGTest(ToilTest):
"""
Tests that restarted job DAGs don't run children of jobs that failed in the first run till the
parent completes successfully in the restart.
"""
[docs]
def setUp(self):
super().setUp()
self.tempDir = self._createTempDir(purpose='tempDir')
self.testJobStore = self._getTestJobStorePath()
[docs]
def tearDown(self):
super().tearDown()
shutil.rmtree(self.testJobStore)
[docs]
@slow
def testRestartedWorkflowSchedulesCorrectJobsOnFailedParent(self):
self._testRestartedWorkflowSchedulesCorrectJobs('raise')
[docs]
@slow
def testRestartedWorkflowSchedulesCorrectJobsOnKilledParent(self):
self._testRestartedWorkflowSchedulesCorrectJobs('kill')
def _testRestartedWorkflowSchedulesCorrectJobs(self, failType):
"""
Creates a diamond DAG
/->passingParent-\
root |-->child
\\->failingParent--/
where root and passingParent are guaranteed to pass, while failingParent will fail.
child should not run on start or restart and we assert that by ensuring that a file create
iff child is run is not present on the system.
:param str failType: Does failingParent fail on an assertionError, or is it killed.
"""
# Specify options
options = Job.Runner.getDefaultOptions(self.testJobStore)
options.logLevel = 'DEBUG'
options.retryCount = 0
options.clean = "never"
parentFile = os.path.join(self.tempDir, 'parent')
childFile = os.path.join(self.tempDir, 'child')
# Make the first job
root = Job.wrapJobFn(passingFn)
passingParent = Job.wrapJobFn(passingFn)
failingParent = Job.wrapJobFn(failingFn, failType=failType, fileName=parentFile)
child = Job.wrapJobFn(passingFn, fileName=childFile)
# define the DAG
root.addChild(passingParent)
root.addChild(failingParent)
passingParent.addChild(child)
failingParent.addChild(child)
failReasons = []
assert not os.path.exists(childFile)
# Run the test
for runMode in 'start', 'restart':
self.errorRaised = None
try:
with Toil(options) as toil:
if runMode == 'start':
toil.start(root)
else:
toil.restart()
except Exception as e:
logger.exception(e)
self.errorRaised = e
finally:
# The processing of an AssertionError and FailedJobsException is the same so we do
# it together in this finally clause.
if self.errorRaised is not None:
if not os.path.exists(parentFile):
failReasons.append('The failing parent file did not exist on toil "%s".'
% runMode)
if os.path.exists(childFile):
failReasons.append('The child file existed. i.e. the child was run on '
'toil "%s".' % runMode)
if isinstance(self.errorRaised, FailedJobsException):
if self.errorRaised.numberOfFailedJobs != 3:
failReasons.append('FailedJobsException was raised on toil "%s" but '
'the number of failed jobs (%s) was not 3.'
% (runMode, self.errorRaised.numberOfFailedJobs))
elif isinstance(self.errorRaised, AssertionError):
failReasons.append('Toil raised an AssertionError instead of a '
'FailedJobsException on toil "%s".' % runMode)
else:
failReasons.append("Toil raised error: %s" % self.errorRaised)
self.errorRaised = None
options.restart = True
else:
self.fail('No errors were raised on toil "%s".' % runMode)
if failReasons:
self.fail('Test failed for ({}) reasons:\n\t{}'.format(len(failReasons),
'\n\t'.join(failReasons)))
[docs]
def passingFn(job, fileName=None):
"""
This function is guaranteed to pass as it does nothing out of the ordinary. If fileName is
provided, it will be created.
:param str fileName: The name of a file that must be created if provided.
"""
if fileName is not None:
# Emulates system touch.
open(fileName, 'w').close()
[docs]
def failingFn(job, failType, fileName):
"""
This function is guaranteed to fail via a raised assertion, or an os.kill
:param job: Job
:param str failType: 'raise' or 'kill
:param str fileName: The name of a file that must be created.
"""
assert failType in ('raise', 'kill')
# Use that function to avoid code redundancy
passingFn(job, fileName)
if failType == 'raise':
assert False
else:
os.kill(os.getpid(), signal.SIGKILL)