Source code for toil.test.src.deferredFunctionTest

# Copyright (C) 2015-2021 Regents of the University of California
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import os
import signal
import time
from abc import ABCMeta
from uuid import uuid4

import psutil

from toil.exceptions import FailedJobsException
from toil.job import Job
from toil.lib.threading import cpu_count
from toil.test import ToilTest, slow

logger = logging.getLogger(__name__)


[docs] class DeferredFunctionTest(ToilTest, metaclass=ABCMeta): """Test the deferred function system.""" # This determines what job store type to use. jobStoreType = 'file' def _getTestJobStore(self): if self.jobStoreType == 'file': return self._getTestJobStorePath() elif self.jobStoreType == 'aws': return f'aws:{self.awsRegion()}:cache-tests-{uuid4()}' elif self.jobStoreType == 'google': projectID = os.getenv('TOIL_GOOGLE_PROJECTID') return f'google:{projectID}:cache-tests-{str(uuid4())}' else: raise RuntimeError('Illegal job store type.')
[docs] def setUp(self): super().setUp() testDir = self._createTempDir() self.options = Job.Runner.getDefaultOptions(self._getTestJobStore()) self.options.logLevel = 'INFO' self.options.workDir = testDir self.options.clean = 'always' self.options.logFile = os.path.join(testDir, 'logFile')
# Tests for the various defer possibilities
[docs] def testDeferredFunctionRunsWithMethod(self): """ Refer docstring in _testDeferredFunctionRuns. Test with Method """ self._testDeferredFunctionRuns(_writeNonLocalFilesMethod)
[docs] def testDeferredFunctionRunsWithClassMethod(self): """ Refer docstring in _testDeferredFunctionRuns. Test with Class Method """ self._testDeferredFunctionRuns(_writeNonLocalFilesClassMethod)
[docs] def testDeferredFunctionRunsWithLambda(self): """ Refer docstring in _testDeferredFunctionRuns. Test with Lambda """ self._testDeferredFunctionRuns(_writeNonLocalFilesLambda)
def _testDeferredFunctionRuns(self, callableFn): """ Create 2 files. Make a job that writes data to them. Register a deferred function that deletes the two files (one passed as an arg, and one as a kwarg) and later assert that the files have been deleted. :param function callableFn: The function to use in the test. :return: None """ workdir = self._createTempDir(purpose='nonLocalDir') nonLocalFile1 = os.path.join(workdir, str(uuid4())) nonLocalFile2 = os.path.join(workdir, str(uuid4())) open(nonLocalFile1, 'w').close() open(nonLocalFile2, 'w').close() assert os.path.exists(nonLocalFile1) assert os.path.exists(nonLocalFile2) A = Job.wrapJobFn(callableFn, files=(nonLocalFile1, nonLocalFile2)) Job.Runner.startToil(A, self.options) assert not os.path.exists(nonLocalFile1) assert not os.path.exists(nonLocalFile2)
[docs] @slow def testDeferredFunctionRunsWithFailures(self): """ Create 2 non local filesto use as flags. Create a job that registers a function that deletes one non local file. If that file exists, the job SIGKILLs itself. If it doesn't exist, the job registers a second deferred function to delete the second non local file and exits normally. Initially the first file exists, so the job should SIGKILL itself and neither deferred function will run (in fact, the second should not even be registered). On the restart, the first deferred function should run and the first file should not exist, but the second one should. We assert the presence of the second, then register the second deferred function and exit normally. At the end of the test, neither file should exist. Incidentally, this also tests for multiple registered deferred functions, and the case where a deferred function fails (since the first file doesn't exist on the retry). """ self.options.retryCount = 1 workdir = self._createTempDir(purpose='nonLocalDir') nonLocalFile1 = os.path.join(workdir, str(uuid4())) nonLocalFile2 = os.path.join(workdir, str(uuid4())) open(nonLocalFile1, 'w').close() open(nonLocalFile2, 'w').close() assert os.path.exists(nonLocalFile1) assert os.path.exists(nonLocalFile2) A = Job.wrapJobFn(_deferredFunctionRunsWithFailuresFn, files=(nonLocalFile1, nonLocalFile2)) Job.Runner.startToil(A, self.options) assert not os.path.exists(nonLocalFile1) assert not os.path.exists(nonLocalFile2)
[docs] @slow def testNewJobsCanHandleOtherJobDeaths(self): """ Create 2 non-local files and then create 2 jobs. The first job registers a deferred job to delete the second non-local file, deletes the first non-local file and then kills itself. The second job waits for the first file to be deleted, then sleeps for a few seconds and then spawns a child. the child of the second does nothing. However starting it should handle the untimely demise of the first job and run the registered deferred function that deletes the first file. We assert the absence of the two files at the end of the run. """ # Check to make sure we can run two jobs in parallel cpus = cpu_count() assert cpus >= 2, "Not enough CPUs to run two tasks at once" # There can be no retries self.options.retryCount = 0 workdir = self._createTempDir(purpose='nonLocalDir') nonLocalFile1 = os.path.join(workdir, str(uuid4())) nonLocalFile2 = os.path.join(workdir, str(uuid4())) open(nonLocalFile1, 'w').close() open(nonLocalFile2, 'w').close() assert os.path.exists(nonLocalFile1) assert os.path.exists(nonLocalFile2) files = [nonLocalFile1, nonLocalFile2] root = Job() # A and B here must run in parallel for this to work A = Job.wrapJobFn(_testNewJobsCanHandleOtherJobDeaths_A, files=files, cores=1) B = Job.wrapJobFn(_testNewJobsCanHandleOtherJobDeaths_B, files=files, cores=1) C = Job.wrapJobFn(_testNewJobsCanHandleOtherJobDeaths_C, files=files, expectedResult=False, cores=1) root.addChild(A) root.addChild(B) B.addChild(C) try: Job.Runner.startToil(root, self.options) except FailedJobsException as e: pass
[docs] def testBatchSystemCleanupCanHandleWorkerDeaths(self): """ Create some non-local files. Create a job that registers a deferred function to delete the file and then kills its worker. Assert that the file is missing after the pipeline fails, because we're using a single-machine batch system and the leader's batch system cleanup will find and run the deferred function. """ # There can be no retries self.options.retryCount = 0 workdir = self._createTempDir(purpose='nonLocalDir') nonLocalFile1 = os.path.join(workdir, str(uuid4())) nonLocalFile2 = os.path.join(workdir, str(uuid4())) # The first file has to be non zero or meseeks will go into an infinite sleep file1 = open(nonLocalFile1, 'w') file1.write('test') file1.close() open(nonLocalFile2, 'w').close() assert os.path.exists(nonLocalFile1) assert os.path.exists(nonLocalFile2) # We only use the "A" job here, and we fill in the first file, so all # it will do is defer deleting the second file, delete the first file, # and die. A = Job.wrapJobFn(_testNewJobsCanHandleOtherJobDeaths_A, files=(nonLocalFile1, nonLocalFile2)) try: Job.Runner.startToil(A, self.options) except FailedJobsException: pass assert not os.path.exists(nonLocalFile1) assert not os.path.exists(nonLocalFile2)
def _writeNonLocalFilesMethod(job, files): """ Write some data to 2 files. Pass them to a registered deferred method. :param tuple files: the tuple of the two files to work with :return: None """ for nlf in files: with open(nlf, 'wb') as nonLocalFileHandle: nonLocalFileHandle.write(os.urandom(1 * 1024 * 1024)) job.defer(_deleteMethods._deleteFileMethod, files[0], nlf=files[1]) return None def _writeNonLocalFilesClassMethod(job, files): """ Write some data to 2 files. Pass them to a registered deferred class method. :param tuple files: the tuple of the two files to work with :return: None """ for nlf in files: with open(nlf, 'wb') as nonLocalFileHandle: nonLocalFileHandle.write(os.urandom(1 * 1024 * 1024)) job.defer(_deleteMethods._deleteFileClassMethod, files[0], nlf=files[1]) return None def _writeNonLocalFilesLambda(job, files): """ Write some data to 2 files. Pass them to a registered deferred Lambda. :param tuple files: the tuple of the two files to work with :return: None """ lmd = lambda x, nlf: [os.remove(x), os.remove(nlf)] for nlf in files: with open(nlf, 'wb') as nonLocalFileHandle: nonLocalFileHandle.write(os.urandom(1 * 1024 * 1024)) job.defer(lmd, files[0], nlf=files[1]) return None def _deferredFunctionRunsWithFailuresFn(job, files): """ Refer testDeferredFunctionRunsWithFailures :param tuple files: the tuple of the two files to work with :return: None """ job.defer(_deleteFile, files[0]) if os.path.exists(files[0]): os.kill(os.getpid(), signal.SIGKILL) else: assert os.path.exists(files[1]) job.defer(_deleteFile, files[1]) def _deleteFile(nonLocalFile, nlf=None): """ Delete nonLocalFile and nlf :param str nonLocalFile: :param str nlf: :return: None """ logger.debug("Removing file: %s", nonLocalFile) os.remove(nonLocalFile) logger.debug("Successfully removed file: %s", nonLocalFile) if nlf is not None: logger.debug("Removing file: %s", nlf) os.remove(nlf) logger.debug("Successfully removed file: %s", nlf) def _testNewJobsCanHandleOtherJobDeaths_A(job, files): """ Defer deletion of files[1], then wait for _testNewJobsCanHandleOtherJobDeaths_B to start up, and finally delete files[0] before sigkilling self. :param tuple files: the tuple of the two files to work with :return: None """ # Write the pid to files[1] such that we can be sure that this process has died before # we spawn the next job that will do the cleanup. with open(files[1], 'w') as fileHandle: fileHandle.write(str(os.getpid())) job.defer(_deleteFile, files[1]) logger.info("Deferred delete of %s", files[1]) while os.stat(files[0]).st_size == 0: time.sleep(0.5) os.remove(files[0]) os.kill(os.getpid(), signal.SIGKILL) def _testNewJobsCanHandleOtherJobDeaths_B(job, files): # Write something to files[0] such that we can be sure that this process has started # before _testNewJobsCanHandleOtherJobDeaths_A kills itself. with open(files[0], 'w') as fileHandle: fileHandle.write(str(os.getpid())) while os.path.exists(files[0]): time.sleep(0.5) # Get the pid of _testNewJobsCanHandleOtherJobDeaths_A and wait for it to truly be dead. with open(files[1]) as fileHandle: pid = int(fileHandle.read()) assert pid > 0 while psutil.pid_exists(pid): time.sleep(0.5) # Now that we are convinced that_testNewJobsCanHandleOtherJobDeaths_A has died, we can # spawn the next job def _testNewJobsCanHandleOtherJobDeaths_C(job, files, expectedResult): """ Asserts whether the files exist or not. :param Job job: Job :param list files: list of files to test :param bool expectedResult: Are we expecting the files to exist or not? """ for testFile in files: assert os.path.exists(testFile) is expectedResult class _deleteMethods: @staticmethod def _deleteFileMethod(nonLocalFile, nlf=None): """ Delete nonLocalFile and nlf :return: None """ os.remove(nonLocalFile) if nlf is not None: os.remove(nlf) @classmethod def _deleteFileClassMethod(cls, nonLocalFile, nlf=None): """ Delete nonLocalFile and nlf :return: None """ os.remove(nonLocalFile) if nlf is not None: os.remove(nlf)