Source code for toil.test.src.checkpointTest

# Copyright (C) 2015-2021 Regents of the University of California
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# See the License for the specific language governing permissions and
# limitations under the License.

from toil.exceptions import FailedJobsException
from toil.job import Job
from toil.jobStores.abstractJobStore import NoSuchFileException
from toil.test import ToilTest, slow

[docs] class CheckpointTest(ToilTest):
[docs] def testCheckpointNotRetried(self): """A checkpoint job should not be retried if the workflow has a retryCount of 0.""" options = Job.Runner.getDefaultOptions(self._getTestJobStorePath()) options.retryCount = 0 # We set the workflow to *succeed* if the checkpointed job is # retried despite the retry count of 0. So, if the workflow # fails, this test succeeds. # This ends up being a bit confusing, but it's the easiest way # to check that the job wasn't retried short of parsing the # log. with self.assertRaises(FailedJobsException): Job.Runner.startToil(CheckRetryCount(numFailuresBeforeSuccess=1), options)
[docs] @slow def testCheckpointRetriedOnce(self): """A checkpoint job should be retried exactly once if the workflow has a retryCount of 1.""" options = Job.Runner.getDefaultOptions(self._getTestJobStorePath()) options.retryCount = 1 # This should succeed (the checkpoint should be retried once, then succeed) try: Job.Runner.startToil(CheckRetryCount(numFailuresBeforeSuccess=1), options) except FailedJobsException:"The checkpoint job wasn't retried enough times.") # This should fail (the checkpoint should be retried once, then fail again) with self.assertRaises(FailedJobsException): Job.Runner.startToil(CheckRetryCount(numFailuresBeforeSuccess=2), options)
[docs] @slow def testCheckpointedRestartSucceeds(self): """A checkpointed job should succeed on restart of a failed run if its child job succeeds.""" options = Job.Runner.getDefaultOptions(self._getTestJobStorePath()) # The child job should fail the first time with self.assertRaises(FailedJobsException): Job.Runner.startToil(CheckpointFailsFirstTime(), options) # The second time, everything should work options.restart = True try: Job.Runner.startToil(CheckpointFailsFirstTime(), options) except FailedJobsException:"Checkpointed workflow restart doesn't clean failures.")
[docs] class CheckRetryCount(Job): """Fail N times, succeed on the next try.""" def __init__(self, numFailuresBeforeSuccess): super().__init__(checkpoint=True) self.numFailuresBeforeSuccess = numFailuresBeforeSuccess
[docs] def getNumRetries(self, fileStore): """Mark a retry in the fileStore, and return the number of retries so far.""" try: with fileStore.jobStore.read_shared_file_stream("checkpointRun") as f: timesRun = int('utf-8')) except NoSuchFileException: timesRun = 0 with fileStore.jobStore.write_shared_file_stream("checkpointRun") as f: f.write(str(timesRun + 1).encode('utf-8')) return timesRun
[docs] def run(self, fileStore): retryCount = self.getNumRetries(fileStore) fileStore.logToMaster(str(retryCount)) if retryCount < self.numFailuresBeforeSuccess: self.addChild(AlwaysFail())
[docs] class AlwaysFail(Job):
[docs] def run(self, fileStore): raise RuntimeError(":(")
[docs] class CheckpointFailsFirstTime(Job): def __init__(self): super().__init__(checkpoint=True)
[docs] def run(self, fileStore): self.addChild(FailOnce())
[docs] class FailOnce(Job): """Fail the first time the workflow is run, but succeed thereafter."""
[docs] def run(self, fileStore): if fileStore.jobStore.config.workflowAttemptNumber < 1: raise RuntimeError("first time around")