Source code for toil.test.src.restartDAGTest

# Copyright (C) 2015-2021 Regents of the University of California
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


import logging
import os
import shutil
import signal

from toil.common import Toil
from toil.exceptions import FailedJobsException
from toil.job import Job
from toil.test import ToilTest, slow

logger = logging.getLogger(__name__)


[docs] class RestartDAGTest(ToilTest): """ Tests that restarted job DAGs don't run children of jobs that failed in the first run till the parent completes successfully in the restart. """
[docs] def setUp(self): super().setUp() self.tempDir = self._createTempDir(purpose="tempDir") self.testJobStore = self._getTestJobStorePath()
[docs] def tearDown(self): super().tearDown() shutil.rmtree(self.testJobStore)
[docs] @slow def testRestartedWorkflowSchedulesCorrectJobsOnFailedParent(self): self._testRestartedWorkflowSchedulesCorrectJobs("raise")
[docs] @slow def testRestartedWorkflowSchedulesCorrectJobsOnKilledParent(self): self._testRestartedWorkflowSchedulesCorrectJobs("kill")
def _testRestartedWorkflowSchedulesCorrectJobs(self, failType): """ Creates a diamond DAG /->passingParent-\ root |-->child \\->failingParent--/ where root and passingParent are guaranteed to pass, while failingParent will fail. child should not run on start or restart and we assert that by ensuring that a file create iff child is run is not present on the system. :param str failType: Does failingParent fail on an assertionError, or is it killed. """ # Specify options options = Job.Runner.getDefaultOptions(self.testJobStore) options.logLevel = "DEBUG" options.retryCount = 0 options.clean = "never" parentFile = os.path.join(self.tempDir, "parent") childFile = os.path.join(self.tempDir, "child") # Make the first job root = Job.wrapJobFn(passingFn) passingParent = Job.wrapJobFn(passingFn) failingParent = Job.wrapJobFn(failingFn, failType=failType, fileName=parentFile) child = Job.wrapJobFn(passingFn, fileName=childFile) # define the DAG root.addChild(passingParent) root.addChild(failingParent) passingParent.addChild(child) failingParent.addChild(child) failReasons = [] assert not os.path.exists(childFile) # Run the test for runMode in "start", "restart": self.errorRaised = None try: with Toil(options) as toil: if runMode == "start": toil.start(root) else: toil.restart() except Exception as e: logger.exception(e) self.errorRaised = e finally: # The processing of an AssertionError and FailedJobsException is the same so we do # it together in this finally clause. if self.errorRaised is not None: if not os.path.exists(parentFile): failReasons.append( 'The failing parent file did not exist on toil "%s".' % runMode ) if os.path.exists(childFile): failReasons.append( "The child file existed. i.e. the child was run on " 'toil "%s".' % runMode ) if isinstance(self.errorRaised, FailedJobsException): if self.errorRaised.numberOfFailedJobs != 3: failReasons.append( 'FailedJobsException was raised on toil "%s" but ' "the number of failed jobs (%s) was not 3." % (runMode, self.errorRaised.numberOfFailedJobs) ) elif isinstance(self.errorRaised, AssertionError): failReasons.append( "Toil raised an AssertionError instead of a " 'FailedJobsException on toil "%s".' % runMode ) else: failReasons.append("Toil raised error: %s" % self.errorRaised) self.errorRaised = None options.restart = True else: self.fail('No errors were raised on toil "%s".' % runMode) if failReasons: self.fail( "Test failed for ({}) reasons:\n\t{}".format( len(failReasons), "\n\t".join(failReasons) ) )
[docs] def passingFn(job, fileName=None): """ This function is guaranteed to pass as it does nothing out of the ordinary. If fileName is provided, it will be created. :param str fileName: The name of a file that must be created if provided. """ if fileName is not None: # Emulates system touch. open(fileName, "w").close()
[docs] def failingFn(job, failType, fileName): """ This function is guaranteed to fail via a raised assertion, or an os.kill :param job: Job :param str failType: 'raise' or 'kill :param str fileName: The name of a file that must be created. """ assert failType in ("raise", "kill") # Use that function to avoid code redundancy passingFn(job, fileName) if failType == "raise": assert False else: os.kill(os.getpid(), signal.SIGKILL)