Source code for toil.test.src.autoDeploymentTest

import logging
import subprocess
import time
from contextlib import contextmanager

from toil.exceptions import FailedJobsException
from toil.lib.iterables import concat
from toil.test import (ApplianceTestSupport,
                       needs_local_appliance,
                       needs_mesos,
                       slow)
from toil.version import exactPython

logger = logging.getLogger(__name__)


[docs] @needs_mesos @needs_local_appliance @slow class AutoDeploymentTest(ApplianceTestSupport): """ Tests various auto-deployment scenarios. Using the appliance, i.e. a docker container, for these tests allows for running worker processes on the same node as the leader process while keeping their file systems separate from each other and the leader process. Separate file systems are crucial to prove that auto-deployment does its job. """
[docs] def setUp(self): logging.basicConfig(level=logging.INFO) super().setUp()
@contextmanager def _venvApplianceCluster(self): """ Creates an appliance cluster with a virtualenv at './venv' on the leader and a temporary directory on the host mounted at /data in the leader and worker containers. """ dataDirPath = self._createTempDir(purpose='data') with self._applianceCluster(mounts={dataDirPath: '/data'}) as (leader, worker): leader.runOnAppliance('virtualenv', '--system-site-packages', '--never-download', # prevent silent upgrades to pip etc '--python', exactPython, 'venv') leader.runOnAppliance('venv/bin/pip', 'list') # For diagnostic purposes yield leader, worker # TODO: Are we sure the python in the appliance we are testing is the same # as the one we are testing from? If not, how can we get the version it is? sitePackages = f'venv/lib/{exactPython}/site-packages'
[docs] def testRestart(self): """ Test whether auto-deployment works on restart. """ with self._venvApplianceCluster() as (leader, worker): def userScript(): from toil.common import Toil from toil.job import Job # noinspection PyUnusedLocal def job(job, disk='10M', cores=1, memory='10M'): assert False if __name__ == '__main__': options = Job.Runner.getDefaultArgumentParser().parse_args() with Toil(options) as toil: if toil.config.restart: toil.restart() else: toil.start(Job.wrapJobFn(job)) userScript = self._getScriptSource(userScript) leader.deployScript(path=self.sitePackages, packagePath='foo.bar', script=userScript) pythonArgs = ['venv/bin/python', '-m', 'foo.bar'] toilArgs = ['--logDebug', '--batchSystem=mesos', '--mesosEndpoint=localhost:5050', '--defaultMemory=10M', '/data/jobstore'] command = concat(pythonArgs, toilArgs) self.assertRaises(subprocess.CalledProcessError, leader.runOnAppliance, *command) # Deploy an updated version of the script ... userScript = userScript.replace('assert False', 'assert True') leader.deployScript(path=self.sitePackages, packagePath='foo.bar', script=userScript) # ... and restart Toil. command = concat(pythonArgs, '--restart', toilArgs) leader.runOnAppliance(*command)
[docs] def testSplitRootPackages(self): """ Test whether auto-deployment works with a virtualenv in which jobs are defined in completely separate branches of the package hierarchy. Initially, auto-deployment did deploy the entire virtualenv but jobs could only be defined in one branch of the package hierarchy. We define a branch as the maximum set of fully qualified package paths that share the same first component. IOW, a.b and a.c are in the same branch, while a.b and d.c are not. """ with self._venvApplianceCluster() as (leader, worker): # Deploy the library module with job definitions def libraryModule(): # noinspection PyUnusedLocal def libraryJob(job): open('/data/foo.txt', 'w').close() leader.deployScript(path=self.sitePackages, packagePath='toil_lib.foo', script=libraryModule) # Deploy the user script def userScript(): # noinspection PyUnresolvedReferences from toil_lib.foo import libraryJob from toil.common import Toil from toil.job import Job # noinspection PyUnusedLocal def job(job, disk='10M', cores=1, memory='10M'): # Double the requirements to prevent chaining as chaining might hide problems # in auto-deployment code. job.addChildJobFn(libraryJob, disk='20M', cores=cores, memory=memory) if __name__ == '__main__': options = Job.Runner.getDefaultArgumentParser().parse_args() with Toil(options) as toil: if toil.config.restart: toil.restart() else: toil.start(Job.wrapJobFn(job)) leader.deployScript(path=self.sitePackages, packagePath='toil_script.bar', script=userScript) # Assert that output file isn't there worker.runOnAppliance('test', '!', '-f', '/data/foo.txt') # Just being paranoid self.assertRaises(subprocess.CalledProcessError, worker.runOnAppliance, 'test', '-f', '/data/foo.txt') leader.runOnAppliance('venv/bin/python', '-m', 'toil_script.bar', '--logDebug', '--batchSystem=mesos', '--mesosEndpoint=localhost:5050', '--defaultMemory=10M', '/data/jobstore') # Assert that out output file is there worker.runOnAppliance('test', '-f', '/data/foo.txt')
[docs] def testUserTypesInJobFunctionArgs(self): """ Test encapsulated, function-wrapping jobs where the function arguments reference user-defined types. Mainly written to cover https://github.com/BD2KGenomics/toil/issues/1259 but then also revealed https://github.com/BD2KGenomics/toil/issues/1278. """ with self._venvApplianceCluster() as (leader, worker): def userScript(): from toil.common import Toil from toil.job import Job # A user-defined type, i.e. a type defined in the user script class X: pass # noinspection PyUnusedLocal def job(job, x, disk='10M', cores=1, memory='10M'): return x if __name__ == '__main__': options = Job.Runner.getDefaultArgumentParser().parse_args() x = X() with Toil(options) as toil: r = toil.start(Job.wrapJobFn(job, x).encapsulate()) # Assert that the return value is of type X, but not X from the __main__ # module but X from foo.bar, the canonical name for the user module. The # translation from __main__ to foo.bar is a side effect of auto-deployment. assert r.__class__ is not X import foo.bar assert r.__class__ is foo.bar.X # Assert that a copy was made. This is a side effect of pickling/unpickling. assert x is not r userScript = self._getScriptSource(userScript) leader.deployScript(path=self.sitePackages, packagePath='foo.bar', script=userScript) leader.runOnAppliance('venv/bin/python', '-m', 'foo.bar', '--logDebug', '--batchSystem=mesos', '--mesosEndpoint=localhost:5050', '--defaultMemory=10M', '--defaultDisk=10M', '/data/jobstore')
[docs] def testDeferralWithConcurrentEncapsulation(self): """ Ensure that the following DAG succeeds:: ┌───────────┐ │ Root (W1) │ └───────────┘ ┌──────────┴─────────┐ ▼ ▼ ┌────────────────┐ ┌────────────────────┐ │ Deferring (W2) │ │ Encapsulating (W3) │═══════════════╗ └────────────────┘ └────────────────────┘ ║ │ ║ ▼ ▼ ┌───────────────────┐ ┌────────────────┐ │ Encapsulated (W3) │ │ Follow-on (W6) │ └───────────────────┘ └────────────────┘ │ │ ┌───────┴────────┐ │ ▼ ▼ ▼ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │ Dummy 1 (W4) │ │ Dummy 2 (W5) │ │ Last (W6) │ └──────────────┘ └──────────────┘ └──────────────┘ The Wn numbers denote the worker processes that a particular job is run in. `Deferring` adds a deferred function and then runs for a long time. The deferred function will be present in the cache state for the duration of `Deferred`. `Follow-on` is the generic Job instance that's added by encapsulating a job. It runs on the same worker node but in a separate worker process, as the first job in that worker. Because … 1) it is the first job in its worker process (the user script has not been made available on the sys.path by a previous job in that worker) and 2) it shares the cache state with the `Deferring` job and 3) it is an instance of Job (and so does not introduce the user script to sys.path itself), … it might cause problems with deserializing a deferred function defined in the user script. `Encapsulated` has two children to ensure that `Follow-on` is run in a separate worker. """ with self._venvApplianceCluster() as (leader, worker): def userScript(): from toil.common import Toil from toil.job import Job def root(rootJob): def nullFile(): return rootJob.fileStore.jobStore.import_file('file:///dev/null') startFile = nullFile() endFile = nullFile() rootJob.addChildJobFn(deferring, startFile, endFile) encapsulatedJob = Job.wrapJobFn(encapsulated, startFile) encapsulatedJob.addChildFn(dummy) encapsulatedJob.addChildFn(dummy) encapsulatingJob = encapsulatedJob.encapsulate() rootJob.addChild(encapsulatingJob) encapsulatingJob.addChildJobFn(last, endFile) def dummy(): pass def deferred(): pass # noinspection PyUnusedLocal def deferring(job, startFile, endFile): job.defer(deferred) job.fileStore.jobStore.delete_file(startFile) timeout = time.time() + 10 while job.fileStore.jobStore.file_exists(endFile): assert time.time() < timeout time.sleep(1) def encapsulated(job, startFile): timeout = time.time() + 10 while job.fileStore.jobStore.file_exists(startFile): assert time.time() < timeout time.sleep(1) def last(job, endFile): job.fileStore.jobStore.delete_file(endFile) if __name__ == '__main__': options = Job.Runner.getDefaultArgumentParser().parse_args() with Toil(options) as toil: rootJob = Job.wrapJobFn(root) toil.start(rootJob) userScript = self._getScriptSource(userScript) leader.deployScript(path=self.sitePackages, packagePath='foo.bar', script=userScript) leader.runOnAppliance('venv/bin/python', '-m', 'foo.bar', '--logDebug', '--batchSystem=mesos', '--mesosEndpoint=localhost:5050', '--retryCount=0', '--defaultMemory=10M', '--defaultDisk=10M', '/data/jobstore')
[docs] def testDeferralWithFailureAndEncapsulation(self): """ Ensure that the following DAG succeeds:: ┌───────────┐ │ Root (W1) │ └───────────┘ ┌──────────┴─────────┐ ▼ ▼ ┌────────────────┐ ┌────────────────────┐ │ Deferring (W2) │ │ Encapsulating (W3) │═══════════════════════╗ └────────────────┘ └────────────────────┘ ║ │ ║ ▼ ▼ ┌───────────────────┐ ┌────────────────┐ │ Encapsulated (W3) │════════════╗ │ Follow-on (W7) │ └───────────────────┘ ║ └────────────────┘ │ ║ ┌──────┴──────┐ ║ ▼ ▼ ▼ ┌────────────┐┌────────────┐ ┌──────────────┐ │ Dummy (W4) ││ Dummy (W5) │ │ Trigger (W6) │ └────────────┘└────────────┘ └──────────────┘ `Trigger` causes `Deferring` to crash. `Follow-on` runs next, detects `Deferring`'s left-overs and runs the deferred function. `Follow-on` is an instance of `Job` and the first job in its worker process. This test ensures that despite these circumstances, the user script is loaded before the deferred functions defined in it are being run. `Encapsulated` has two children to ensure that `Follow-on` is run in a new worker. That's the only way to guarantee that the user script has not been loaded yet, which would cause the test to succeed coincidentally. We want to test that auto-deploying and loading of the user script are done properly *before* deferred functions are being run and before any jobs have been executed by that worker. """ with self._venvApplianceCluster() as (leader, worker): def userScript(): import os import time from toil.common import Toil from toil.job import Job TIMEOUT = 10 def root(rootJob): def nullFile(): return rootJob.fileStore.jobStore.import_file('file:///dev/null') startFile = nullFile() endFile = nullFile() rootJob.addChildJobFn(deferring, startFile, endFile) encapsulatedJob = Job.wrapJobFn(encapsulated, startFile) encapsulatedJob.addChildFn(dummy) encapsulatedJob.addChildFn(dummy) encapsulatedJob.addFollowOnJobFn(trigger, endFile) encapsulatingJob = encapsulatedJob.encapsulate() rootJob.addChild(encapsulatingJob) def dummy(): pass def deferredFile(config): """ Return path to a file at the root of the job store, exploiting the fact that the job store is shared between leader and worker container. """ prefix = 'file:' locator = config.jobStore assert locator.startswith(prefix) return os.path.join(locator[len(prefix):], 'testDeferredFile') def deferred(deferredFilePath): """ The deferred function that is supposed to run. """ os.unlink(deferredFilePath) # noinspection PyUnusedLocal def deferring(job, startFile, endFile): """ A job that adds the deferred function and then crashes once the `trigger` job tells it to. """ job.defer(deferred, deferredFile(job._config)) jobStore = job.fileStore.jobStore jobStore.delete_file(startFile) with jobStore.update_file_stream(endFile) as fH: fH.write(str(os.getpid())) timeout = time.time() + TIMEOUT while jobStore.file_exists(endFile): assert time.time() < timeout time.sleep(1) os.kill(os.getpid(), 9) def encapsulated(job, startFile): """ A job that waits until the `deferring` job is running and waiting to be crashed. """ timeout = time.time() + TIMEOUT while job.fileStore.jobStore.file_exists(startFile): assert time.time() < timeout time.sleep(1) def trigger(job, endFile): """ A job that determines the PID of the worker running the `deferring` job, tells the `deferring` job to crash and then waits for the corresponding worker process to end. By waiting we can be sure that the `follow-on` job finds the left-overs of the `deferring` job. """ import errno jobStore = job.fileStore.jobStore with jobStore.read_file_stream(endFile) as fH: pid = int(fH.read()) os.kill(pid, 0) jobStore.delete_file(endFile) timeout = time.time() + TIMEOUT while True: try: os.kill(pid, 0) except OSError as e: if e.errno == errno.ESRCH: break else: raise else: assert time.time() < timeout time.sleep(1) def tryUnlink(deferredFilePath): try: os.unlink(deferredFilePath) except OSError as e: if e.errno == errno.ENOENT: pass else: raise if __name__ == '__main__': import errno options = Job.Runner.getDefaultArgumentParser().parse_args() with Toil(options) as toil: deferredFilePath = deferredFile(toil.config) open(deferredFilePath, 'w').close() try: assert os.path.exists(deferredFilePath) try: toil.start(Job.wrapJobFn(root)) except FailedJobsException as e: assert e.numberOfFailedJobs == 2 # `root` and `deferring` assert not os.path.exists(deferredFilePath), \ 'Apparently, the deferred function did not run.' else: assert False, 'Workflow should not have succeeded.' finally: tryUnlink(deferredFilePath) userScript = self._getScriptSource(userScript) leader.deployScript(path=self.sitePackages, packagePath='foo.bar', script=userScript) leader.runOnAppliance('venv/bin/python', '-m', 'foo.bar', '--logDebug', '--batchSystem=mesos', '--mesosEndpoint=localhost:5050', '--retryCount=0', '--defaultMemory=10M', '--defaultDisk=10M', '/data/jobstore')