Source code for toil.test.batchSystems.batchSystemTest

# Copyright (C) 2015-2021 Regents of the University of California
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import fcntl
import itertools
import logging
import os
import subprocess
import sys
import tempfile
import textwrap
import time
from abc import ABCMeta, abstractmethod
from fractions import Fraction
from unittest import skipIf

from toil.batchSystems.abstractBatchSystem import (
    AbstractBatchSystem,
    BatchSystemSupport,
    InsufficientSystemResources,
)

# Don't import any batch systems here that depend on extras
# in order to import properly. Import them later, in tests
# protected by annotations.
from toil.batchSystems.mesos.test import MesosTestSupport
from toil.batchSystems.registry import (
    add_batch_system_factory,
    get_batch_system,
    get_batch_systems,
    restore_batch_system_plugin_state,
    save_batch_system_plugin_state,
)
from toil.batchSystems.singleMachine import SingleMachineBatchSystem
from toil.common import Config, Toil
from toil.job import Job, JobDescription, Requirer
from toil.lib.retry import retry_flaky_test
from toil.lib.threading import cpu_count
from toil.test import (
    ToilTest,
    needs_aws_batch,
    needs_aws_s3,
    needs_fetchable_appliance,
    needs_gridengine,
    needs_htcondor,
    needs_kubernetes,
    needs_kubernetes_installed,
    needs_lsf,
    needs_mesos,
    needs_slurm,
    needs_torque,
    slow,
)

logger = logging.getLogger(__name__)

# How many cores should be utilized by this test. The test will fail if the running system
# doesn't have at least that many cores.

numCores = 2

preemptible = False

# Since we aren't always attaching the config to the jobs for these tests, we
# need to use fully specified requirements.
defaultRequirements = dict(
    memory=int(100e6), cores=1, disk=1000, preemptible=preemptible, accelerators=[]
)


[docs] class BatchSystemPluginTest(ToilTest): """ Class for testing batch system plugin functionality. """
[docs] def setUp(self): # Save plugin state so our plugin doesn't stick around after the test # (and create duplicate options) self.__state = save_batch_system_plugin_state() super().setUp()
[docs] def tearDown(self): # Restore plugin state restore_batch_system_plugin_state(self.__state) super().tearDown()
[docs] def test_add_batch_system_factory(self): def test_batch_system_factory(): # TODO: Adding the same batch system under multiple names means we # can't actually create Toil options, because each version tries to # add its arguments. return SingleMachineBatchSystem add_batch_system_factory("testBatchSystem", test_batch_system_factory) assert "testBatchSystem" in get_batch_systems() assert get_batch_system("testBatchSystem") == SingleMachineBatchSystem
[docs] class hidden: """ Hide abstract base class from unittest's test case loader http://stackoverflow.com/questions/1323455/python-unit-test-with-base-and-sub-class#answer-25695512 """
[docs] class AbstractBatchSystemTest(ToilTest, metaclass=ABCMeta): """ A base test case with generic tests that every batch system should pass. Cannot assume that the batch system actually executes commands on the local machine/filesystem. """
[docs] @abstractmethod def createBatchSystem(self) -> AbstractBatchSystem: raise NotImplementedError
[docs] def supportsWallTime(self): return False
[docs] @classmethod def createConfig(cls): """ Returns a dummy config for the batch system tests. We need a workflowID to be set up since we are running tests without setting up a jobstore. This is the class version to be used when an instance is not available. :rtype: toil.common.Config """ config = Config() from uuid import uuid4 config.workflowID = str(uuid4()) config.cleanWorkDir = "always" return config
def _createConfig(self): """ Returns a dummy config for the batch system tests. We need a workflowID to be set up since we are running tests without setting up a jobstore. :rtype: toil.common.Config """ return self.createConfig() def _mockJobDescription(self, jobStoreID=None, **kwargs): """ Create a mock-up JobDescription with the given ID and other parameters. """ # TODO: Use a real unittest.Mock? For now we make a real instance and just hack it up. desc = JobDescription(**kwargs) # Normally we can't pass in an ID, and the job serialization logic # takes care of filling it in. We set it here. if jobStoreID is not None: desc.jobStoreID = jobStoreID return desc
[docs] @classmethod def setUpClass(cls): super().setUpClass() logging.basicConfig(level=logging.DEBUG)
[docs] def setUp(self): super().setUp() self.config = self._createConfig() self.batchSystem = self.createBatchSystem() self.tempDir = self._createTempDir("testFiles")
[docs] def tearDown(self): self.batchSystem.shutdown() super().tearDown()
[docs] def get_max_startup_seconds(self) -> int: """ Get the number of seconds this test ought to wait for the first job to run. Some batch systems may need time to scale up. """ return 120
[docs] def test_available_cores(self): self.assertTrue(cpu_count() >= numCores)
[docs] @retry_flaky_test(prepare=[tearDown, setUp]) def test_run_jobs(self): jobDesc1 = self._mockJobDescription( jobName="test1", unitName=None, jobStoreID="1", requirements=defaultRequirements, ) jobDesc2 = self._mockJobDescription( jobName="test2", unitName=None, jobStoreID="2", requirements=defaultRequirements, ) job1 = self.batchSystem.issueBatchJob("sleep 1000", jobDesc1) job2 = self.batchSystem.issueBatchJob("sleep 1000", jobDesc2) issuedIDs = self._waitForJobsToIssue(2) self.assertEqual(set(issuedIDs), {job1, job2}) # Now at some point we want these jobs to become running # But since we may be testing against a live cluster (Kubernetes) # we want to handle weird cases and high cluster load as much as we can. # Wait a bit for any Dockers to download and for the # jobs to have a chance to start. # TODO: We insist on neither of these ever finishing when we test # getUpdatedBatchJob, and the sleep time is longer than the time we # should spend waiting for both to start, so if our cluster can # only run one job at a time, we will fail the test. runningJobIDs = self._waitForJobsToStart( 2, tries=self.get_max_startup_seconds() ) self.assertEqual(set(runningJobIDs), {job1, job2}) # Killing the jobs instead of allowing them to complete means this test can run very # quickly if the batch system issues and starts the jobs quickly. self.batchSystem.killBatchJobs([job1, job2]) self.assertEqual({}, self.batchSystem.getRunningBatchJobIDs()) # Issue a job and then allow it to finish by itself, causing it to be added to the # updated jobs queue. # We would like to have this touch something on the filesystem and # then check for it having happened, but we can't guarantee that # the batch system will run against the same filesystem we are # looking at. jobDesc3 = self._mockJobDescription( jobName="test3", unitName=None, jobStoreID="3", requirements=defaultRequirements, ) job3 = self.batchSystem.issueBatchJob("mktemp -d", jobDesc3) jobUpdateInfo = self.batchSystem.getUpdatedBatchJob(maxWait=1000) jobID, exitStatus, wallTime = ( jobUpdateInfo.jobID, jobUpdateInfo.exitStatus, jobUpdateInfo.wallTime, ) logger.info(f"Third job completed: {jobID} {exitStatus} {wallTime}") # Since the first two jobs were killed, the only job in the updated jobs queue should # be job 3. If the first two jobs were (incorrectly) added to the queue, this will # fail with jobID being equal to job1 or job2. self.assertEqual(jobID, job3) self.assertEqual(exitStatus, 0) if self.supportsWallTime(): self.assertTrue(wallTime > 0) else: self.assertIsNone(wallTime) # TODO: Work out a way to check if the job we asked to run actually ran. # Don't just believe the batch system, but don't assume it ran on this machine either. self.assertFalse(self.batchSystem.getUpdatedBatchJob(0)) # Make sure killBatchJobs can handle jobs that don't exist self.batchSystem.killBatchJobs([10])
[docs] def test_set_env(self): # Start with a relatively safe script script_shell = ( 'if [ "x${FOO}" == "xbar" ] ; then exit 23 ; else exit 42 ; fi' ) # Escape the semicolons script_protected = script_shell.replace(";", r"\;") # Turn into a string which convinces bash to take all args and paste them back together and run them command = 'bash -c "\\${@}" bash eval ' + script_protected jobDesc4 = self._mockJobDescription( jobName="test4", unitName=None, jobStoreID="4", requirements=defaultRequirements, ) job4 = self.batchSystem.issueBatchJob(command, jobDesc4) jobUpdateInfo = self.batchSystem.getUpdatedBatchJob(maxWait=1000) jobID, exitStatus, wallTime = ( jobUpdateInfo.jobID, jobUpdateInfo.exitStatus, jobUpdateInfo.wallTime, ) self.assertEqual(exitStatus, 42) self.assertEqual(jobID, job4) # Now set the variable and ensure that it is present self.batchSystem.setEnv("FOO", "bar") jobDesc5 = self._mockJobDescription( jobName="test5", unitName=None, jobStoreID="5", requirements=defaultRequirements, ) job5 = self.batchSystem.issueBatchJob(command, jobDesc5) jobUpdateInfo = self.batchSystem.getUpdatedBatchJob(maxWait=1000) self.assertEqual(jobUpdateInfo.exitStatus, 23) self.assertEqual(jobUpdateInfo.jobID, job5)
[docs] def test_set_job_env(self): """Test the mechanism for setting per-job environment variables to batch system jobs.""" script = 'if [ "x${FOO}" == "xbar" ] ; then exit 23 ; else exit 42 ; fi' command = 'bash -c "\\${@}" bash eval ' + script.replace(";", r"\;") # Issue a job with a job environment variable job_desc_6 = self._mockJobDescription( jobName="test6", unitName=None, jobStoreID="6", requirements=defaultRequirements, ) job6 = self.batchSystem.issueBatchJob( command, job_desc_6, job_environment={"FOO": "bar"} ) job_update_info = self.batchSystem.getUpdatedBatchJob(maxWait=1000) self.assertEqual(job_update_info.exitStatus, 23) # this should succeed self.assertEqual(job_update_info.jobID, job6) # Now check that the environment variable doesn't exist for other jobs job_desc_7 = self._mockJobDescription( jobName="test7", unitName=None, jobStoreID="7", requirements=defaultRequirements, ) job7 = self.batchSystem.issueBatchJob(command, job_desc_7) job_update_info = self.batchSystem.getUpdatedBatchJob(maxWait=1000) self.assertEqual(job_update_info.exitStatus, 42) self.assertEqual(job_update_info.jobID, job7)
[docs] def testCheckResourceRequest(self): if isinstance(self.batchSystem, BatchSystemSupport): check_resource_request = self.batchSystem.check_resource_request # Assuming we have <2000 cores, this should be too many cores self.assertRaises( InsufficientSystemResources, check_resource_request, Requirer(dict(memory=1000, cores=2000, disk="1G", accelerators=[])), ) self.assertRaises( InsufficientSystemResources, check_resource_request, Requirer(dict(memory=5, cores=2000, disk="1G", accelerators=[])), ) # This should be too much memory self.assertRaises( InsufficientSystemResources, check_resource_request, Requirer(dict(memory="5000G", cores=1, disk="1G", accelerators=[])), ) # This should be too much disk self.assertRaises( InsufficientSystemResources, check_resource_request, Requirer(dict(memory=5, cores=1, disk="2G", accelerators=[])), ) # This should be an accelerator we don't have. # All the batch systems need code to know they don't have these accelerators. self.assertRaises( InsufficientSystemResources, check_resource_request, Requirer( dict( memory=5, cores=1, disk=100, accelerators=[{"kind": "turbo-encabulator", "count": 1}], ) ), ) # These should be missing attributes self.assertRaises( AttributeError, check_resource_request, Requirer(dict(memory=5, cores=1, disk=1000)), ) self.assertRaises( AttributeError, check_resource_request, Requirer(dict(cores=1, disk=1000, accelerators=[])), ) self.assertRaises( AttributeError, check_resource_request, Requirer(dict(memory=10, disk=1000, accelerators=[])), ) # This should actually work check_resource_request( Requirer(dict(memory=10, cores=1, disk=100, accelerators=[])) )
[docs] def testScalableBatchSystem(self): # If instance of scalable batch system pass
def _waitForJobsToIssue(self, numJobs): issuedIDs = [] for it in range(20): issuedIDs = self.batchSystem.getIssuedBatchJobIDs() if len(issuedIDs) == numJobs: break time.sleep(1) return issuedIDs def _waitForJobsToStart(self, numJobs, tries=20): """ Loop until the given number of distinct jobs are in the running state, or until the given number of tries is exhausted (with 1 second polling period). Returns the list of IDs that are running. """ runningIDs = [] # prevent an endless loop, give it a few tries for it in range(tries): running = self.batchSystem.getRunningBatchJobIDs() logger.info(f"Running jobs now: {running}") runningIDs = list(running.keys()) if len(runningIDs) == numJobs: break time.sleep(1) return runningIDs
[docs] class AbstractBatchSystemJobTest(ToilTest, metaclass=ABCMeta): """ An abstract base class for batch system tests that use a full Toil workflow rather than using the batch system directly. """ cpuCount = cpu_count() allocatedCores = sorted({1, 2, cpuCount}) sleepTime = 30
[docs] @abstractmethod def getBatchSystemName(self): """ :rtype: (str, AbstractBatchSystem) """ raise NotImplementedError
[docs] def getOptions(self, tempDir): """ Configures options for Toil workflow and makes job store. :param str tempDir: path to test directory :return: Toil options object """ options = Job.Runner.getDefaultOptions(self._getTestJobStorePath()) options.logLevel = "DEBUG" options.batchSystem = self.batchSystemName options.workDir = tempDir options.maxCores = self.cpuCount return options
[docs] def setUp(self): self.batchSystemName = self.getBatchSystemName() super().setUp()
[docs] def tearDown(self): super().tearDown()
[docs] @slow def testJobConcurrency(self): """ Tests that the batch system is allocating core resources properly for concurrent tasks. """ for coresPerJob in self.allocatedCores: tempDir = self._createTempDir("testFiles") options = self.getOptions(tempDir) counterPath = os.path.join(tempDir, "counter") resetCounters(counterPath) value, maxValue = getCounters(counterPath) assert (value, maxValue) == (0, 0) root = Job() for _ in range(self.cpuCount): root.addFollowOn( Job.wrapFn( measureConcurrency, counterPath, self.sleepTime, cores=coresPerJob, memory="1M", disk="1Mi", ) ) with Toil(options) as toil: toil.start(root) _, maxValue = getCounters(counterPath) self.assertEqual(maxValue, self.cpuCount // coresPerJob)
[docs] def test_omp_threads(self): """ Test if the OMP_NUM_THREADS env var is set correctly based on jobs.cores. """ test_cases = { # mapping of the number of cores to the OMP_NUM_THREADS value 0.1: "1", 1: "1", 2: "2", } temp_dir = self._createTempDir() options = self.getOptions(temp_dir) for cores, expected_omp_threads in test_cases.items(): if os.environ.get("OMP_NUM_THREADS"): expected_omp_threads = os.environ.get("OMP_NUM_THREADS") logger.info( f"OMP_NUM_THREADS is set. Using OMP_NUM_THREADS={expected_omp_threads} instead." ) with Toil(options) as toil: output = toil.start( Job.wrapFn( get_omp_threads, memory="1Mi", cores=cores, disk="1Mi" ) ) self.assertEqual(output, expected_omp_threads)
[docs] class AbstractGridEngineBatchSystemTest(AbstractBatchSystemTest): """ An abstract class to reduce redundancy between Grid Engine, Slurm, and other similar batch systems """ def _createConfig(self): config = super()._createConfig() config.statePollingWait = 0.5 # Reduce polling wait so tests run faster # can't use _getTestJobStorePath since that method removes the directory config.jobStore = "file:" + self._createTempDir("jobStore") return config
[docs] @needs_kubernetes @needs_aws_s3 @needs_fetchable_appliance class KubernetesBatchSystemTest(hidden.AbstractBatchSystemTest): """ Tests against the Kubernetes batch system """
[docs] def supportsWallTime(self): return True
[docs] def createBatchSystem(self): # We know we have Kubernetes so we can import the batch system from toil.batchSystems.kubernetes import KubernetesBatchSystem return KubernetesBatchSystem( config=self.config, maxCores=numCores, maxMemory=1e9, maxDisk=2001 )
[docs] @needs_kubernetes_installed class KubernetesBatchSystemBenchTest(ToilTest): """ Kubernetes batch system unit tests that don't need to actually talk to a cluster. """
[docs] def test_preemptability_constraints(self): """ Make sure we generate the right preemptability constraints. """ # Make sure we can print diffs of these long strings self.maxDiff = 10000 from kubernetes.client import V1PodSpec from toil.batchSystems.kubernetes import KubernetesBatchSystem normal_spec = V1PodSpec(containers=[]) constraints = KubernetesBatchSystem.Placement() constraints.set_preemptible(False) constraints.apply(normal_spec) self.assertEqual( textwrap.dedent( """ {'node_affinity': {'preferred_during_scheduling_ignored_during_execution': None, 'required_during_scheduling_ignored_during_execution': {'node_selector_terms': [{'match_expressions': [{'key': 'eks.amazonaws.com/capacityType', 'operator': 'NotIn', 'values': ['SPOT']}, {'key': 'cloud.google.com/gke-preemptible', 'operator': 'NotIn', 'values': ['true']}], 'match_fields': None}]}}, 'pod_affinity': None, 'pod_anti_affinity': None} """ ).strip(), str(normal_spec.affinity), ) self.assertEqual(str(normal_spec.tolerations), "None") spot_spec = V1PodSpec(containers=[]) constraints = KubernetesBatchSystem.Placement() constraints.set_preemptible(True) constraints.apply(spot_spec) self.assertEqual( textwrap.dedent( """ {'node_affinity': {'preferred_during_scheduling_ignored_during_execution': [{'preference': {'match_expressions': [{'key': 'eks.amazonaws.com/capacityType', 'operator': 'In', 'values': ['SPOT']}], 'match_fields': None}, 'weight': 1}, {'preference': {'match_expressions': [{'key': 'cloud.google.com/gke-preemptible', 'operator': 'In', 'values': ['true']}], 'match_fields': None}, 'weight': 1}], 'required_during_scheduling_ignored_during_execution': None}, 'pod_affinity': None, 'pod_anti_affinity': None} """ ).strip(), str(spot_spec.affinity), ) self.assertEqual( textwrap.dedent( """ [{'effect': None, 'key': 'cloud.google.com/gke-preemptible', 'operator': None, 'toleration_seconds': None, 'value': 'true'}] """ ).strip(), str(spot_spec.tolerations), )
[docs] def test_label_constraints(self): """ Make sure we generate the right preemptability constraints. """ # Make sure we can print diffs of these long strings self.maxDiff = 10000 from kubernetes.client import V1PodSpec from toil.batchSystems.kubernetes import KubernetesBatchSystem spec = V1PodSpec(containers=[]) constraints = KubernetesBatchSystem.Placement() constraints.required_labels = [("GottaBeSetTo", ["This"])] constraints.desired_labels = [("OutghtToBeSetTo", ["That"])] constraints.prohibited_labels = [("CannotBe", ["ABadThing"])] constraints.apply(spec) self.assertEqual( textwrap.dedent( """ {'node_affinity': {'preferred_during_scheduling_ignored_during_execution': [{'preference': {'match_expressions': [{'key': 'OutghtToBeSetTo', 'operator': 'In', 'values': ['That']}], 'match_fields': None}, 'weight': 1}], 'required_during_scheduling_ignored_during_execution': {'node_selector_terms': [{'match_expressions': [{'key': 'GottaBeSetTo', 'operator': 'In', 'values': ['This']}, {'key': 'CannotBe', 'operator': 'NotIn', 'values': ['ABadThing']}], 'match_fields': None}]}}, 'pod_affinity': None, 'pod_anti_affinity': None} """ ).strip(), str(spec.affinity), ) self.assertEqual(str(spec.tolerations), "None")
[docs] @needs_aws_batch @needs_fetchable_appliance class AWSBatchBatchSystemTest(hidden.AbstractBatchSystemTest): """ Tests against the AWS Batch batch system """
[docs] def supportsWallTime(self): return True
[docs] def createBatchSystem(self): from toil.batchSystems.awsBatch import AWSBatchBatchSystem return AWSBatchBatchSystem( config=self.config, maxCores=numCores, maxMemory=1e9, maxDisk=2001 )
[docs] def get_max_startup_seconds(self) -> int: # AWS Batch may need to scale out the compute environment. return 300
[docs] @slow @needs_mesos class MesosBatchSystemTest(hidden.AbstractBatchSystemTest, MesosTestSupport): """ Tests against the Mesos batch system """
[docs] @classmethod def createConfig(cls): """ needs to set mesos_endpoint to localhost for testing since the default is now the private IP address """ config = super().createConfig() config.mesos_endpoint = "localhost:5050" return config
[docs] def supportsWallTime(self): return True
[docs] def createBatchSystem(self): # We know we have Mesos so we can import the batch system from toil.batchSystems.mesos.batchSystem import MesosBatchSystem self._startMesos(numCores) return MesosBatchSystem( config=self.config, maxCores=numCores, maxMemory=1e9, maxDisk=1001 )
[docs] def tearDown(self): self._stopMesos() super().tearDown()
[docs] def testIgnoreNode(self): self.batchSystem.ignoreNode("localhost") jobDesc = self._mockJobDescription( jobName="test2", unitName=None, jobStoreID="1", requirements=defaultRequirements, ) job = self.batchSystem.issueBatchJob("sleep 1000", jobDesc) issuedID = self._waitForJobsToIssue(1) self.assertEqual(set(issuedID), {job}) # Wait until a job starts or we go a while without that happening runningJobIDs = self._waitForJobsToStart(1, tries=20) # Make sure job is NOT running self.assertEqual(set(runningJobIDs), set({}))
[docs] def write_temp_file(s: str, temp_dir: str) -> str: """ Dump a string into a temp file and return its path. """ fd, path = tempfile.mkstemp(dir=temp_dir) try: encoded = s.encode("utf-8") assert os.write(fd, encoded) == len(encoded) except: os.unlink(path) raise else: return path finally: os.close(fd)
[docs] class SingleMachineBatchSystemTest(hidden.AbstractBatchSystemTest): """ Tests against the single-machine batch system """
[docs] def supportsWallTime(self) -> bool: return True
[docs] def createBatchSystem(self) -> AbstractBatchSystem: return SingleMachineBatchSystem( config=self.config, maxCores=numCores, maxMemory=1e9, maxDisk=2001 )
[docs] def testProcessEscape(self, hide: bool = False) -> None: """ Test to make sure that child processes and their descendants go away when the Toil workflow stops. If hide is true, will try and hide the child processes to make them hard to stop. """ def script() -> None: #!/usr/bin/env python3 import fcntl import os import signal import sys import time from typing import Any def handle_signal(sig: Any, frame: Any) -> None: sys.stderr.write(f"{os.getpid()} ignoring signal {sig}\n") if hasattr(signal, "valid_signals"): # We can just ask about the signals all_signals = signal.valid_signals() else: # Fish them out by name all_signals = [ getattr(signal, n) for n in dir(signal) if n.startswith("SIG") and not n.startswith("SIG_") ] for sig in all_signals: # Set up to ignore all signals we can and generally be obstinate if sig != signal.SIGKILL and sig != signal.SIGSTOP: signal.signal(sig, handle_signal) if len(sys.argv) > 2: # Instructed to hide if os.fork(): # Try and hide the first process immediately so getting its # pgid won't work. sys.exit(0) for depth in range(3): # Bush out into a tree of processes os.fork() if len(sys.argv) > 1: fd = os.open(sys.argv[1], os.O_RDONLY) fcntl.lockf(fd, fcntl.LOCK_SH) sys.stderr.write(f"{os.getpid()} waiting...\n") while True: # Wait around forever time.sleep(60) # Get a directory where we can safely dump files. temp_dir = self._createTempDir() script_path = write_temp_file(self._getScriptSource(script), temp_dir) # We will have all the job processes try and lock this file shared while they are alive. lockable_path = write_temp_file("", temp_dir) try: command = f"{sys.executable} {script_path} {lockable_path}" if hide: # Tell the children to stop the first child and hide out in the # process group it made. command += " hide" # Start the job self.batchSystem.issueBatchJob( command, self._mockJobDescription( jobName="fork", jobStoreID="1", requirements=defaultRequirements ), ) # Wait time.sleep(10) lockfile = open(lockable_path, "w") if not hide: # In hiding mode the job will finish, and the batch system will # clean up after it promptly. In non-hiding mode the job will # stick around until shutdown, so make sure we can see it. # Try to lock the file and make sure it fails try: fcntl.lockf(lockfile, fcntl.LOCK_EX | fcntl.LOCK_NB) assert False, "Should not be able to lock file while job is running" except OSError: pass # Shut down the batch system self.batchSystem.shutdown() # After the batch system shuts down, we should be able to get the # lock immediately, because all the children should be gone. fcntl.lockf(lockfile, fcntl.LOCK_EX | fcntl.LOCK_NB) # Then we can release it fcntl.lockf(lockfile, fcntl.LOCK_UN) finally: os.unlink(script_path) os.unlink(lockable_path)
[docs] def testHidingProcessEscape(self): """ Test to make sure that child processes and their descendants go away when the Toil workflow stops, even if the job process stops and leaves children. """ self.testProcessEscape(hide=True)
[docs] @slow class MaxCoresSingleMachineBatchSystemTest(ToilTest): """ This test ensures that single machine batch system doesn't exceed the configured number cores """
[docs] @classmethod def setUpClass(cls) -> None: super().setUpClass() logging.basicConfig(level=logging.DEBUG)
[docs] def setUp(self) -> None: super().setUp() temp_dir = self._createTempDir() # Write initial value of counter file containing a tuple of two integers (i, n) where i # is the number of currently executing tasks and n the maximum observed value of i self.counterPath = write_temp_file("0,0", temp_dir) def script() -> None: import fcntl import os import sys import time def count(delta: int) -> None: """ Adjust the first integer value in a file by the given amount. If the result exceeds the second integer value, set the second one to the first. """ fd = os.open(sys.argv[1], os.O_RDWR) try: fcntl.flock(fd, fcntl.LOCK_EX) try: s = os.read(fd, 10).decode("utf-8") value, maxValue = list(map(int, s.split(","))) value += delta if value > maxValue: maxValue = value os.lseek(fd, 0, 0) os.ftruncate(fd, 0) os.write(fd, f"{value},{maxValue}".encode()) finally: fcntl.flock(fd, fcntl.LOCK_UN) finally: os.close(fd) # Without the second argument, increment counter, sleep one second and decrement. # Othwerise, adjust the counter by the given delta, which can be useful for services. if len(sys.argv) < 3: count(1) try: time.sleep(0.5) finally: count(-1) else: count(int(sys.argv[2])) self.scriptPath = write_temp_file(self._getScriptSource(script), temp_dir)
[docs] def tearDown(self) -> None: os.unlink(self.scriptPath) os.unlink(self.counterPath)
[docs] def scriptCommand(self) -> str: return " ".join([sys.executable, self.scriptPath, self.counterPath])
[docs] @retry_flaky_test(prepare=[tearDown, setUp]) def test(self): # We'll use fractions to avoid rounding errors. Remember that not every fraction can be # represented as a floating point number. F = Fraction # This test isn't general enough to cover every possible value of minCores in # SingleMachineBatchSystem. Instead we hard-code a value and assert it. minCores = F(1, 10) self.assertEqual(float(minCores), SingleMachineBatchSystem.minCores) for maxCores in {F(minCores), minCores * 10, F(1), F(numCores, 2), F(numCores)}: for coresPerJob in { F(minCores), F(minCores * 10), F(1), F(maxCores, 2), F(maxCores), }: for load in (F(1, 10), F(1), F(10)): jobs = int(maxCores / coresPerJob * load) if jobs >= 1 and minCores <= coresPerJob < maxCores: self.assertEqual(maxCores, float(maxCores)) bs = SingleMachineBatchSystem( config=hidden.AbstractBatchSystemTest.createConfig(), maxCores=float(maxCores), # Ensure that memory or disk requirements don't get in the way. maxMemory=jobs * 10, maxDisk=jobs * 10, ) try: jobIds = set() for i in range(0, int(jobs)): desc = JobDescription( requirements=dict( cores=float(coresPerJob), memory=1, disk=1, accelerators=[], preemptible=preemptible, ), jobName=str(i), unitName="", ) jobIds.add(bs.issueBatchJob(self.scriptCommand(), desc)) self.assertEqual(len(jobIds), jobs) while jobIds: job = bs.getUpdatedBatchJob(maxWait=10) self.assertIsNotNone(job) jobId, status, wallTime = ( job.jobID, job.exitStatus, job.wallTime, ) self.assertEqual(status, 0) # would raise KeyError on absence jobIds.remove(jobId) finally: bs.shutdown() concurrentTasks, maxConcurrentTasks = getCounters( self.counterPath ) self.assertEqual(concurrentTasks, 0) logger.info( f"maxCores: {maxCores}, " f"coresPerJob: {coresPerJob}, " f"load: {load}" ) # This is the key assertion: we shouldn't run too many jobs. # Because of nondeterminism we can't guarantee hitting the limit. expectedMaxConcurrentTasks = min(maxCores // coresPerJob, jobs) self.assertLessEqual( maxConcurrentTasks, expectedMaxConcurrentTasks ) resetCounters(self.counterPath)
[docs] @skipIf( SingleMachineBatchSystem.numCores < 3, "Need at least three cores to run this test", ) def testServices(self): options = Job.Runner.getDefaultOptions(self._getTestJobStorePath()) options.logLevel = "DEBUG" options.maxCores = 3 self.assertTrue(options.maxCores <= SingleMachineBatchSystem.numCores) Job.Runner.startToil(Job.wrapJobFn(parentJob, self.scriptCommand()), options) with open(self.counterPath, "r+") as f: s = f.read() logger.info("Counter is %s", s) self.assertEqual(getCounters(self.counterPath), (0, 3))
# Toil can use only top-level functions so we have to add them here:
[docs] def parentJob(job, cmd): job.addChildJobFn(childJob, cmd)
[docs] def childJob(job, cmd): job.addService(Service(cmd)) job.addChildJobFn(grandChildJob, cmd) subprocess.check_call(cmd, shell=True)
[docs] def grandChildJob(job, cmd): job.addService(Service(cmd)) job.addChildFn(greatGrandChild, cmd) subprocess.check_call(cmd, shell=True)
[docs] def greatGrandChild(cmd): subprocess.check_call(cmd, shell=True)
[docs] class Service(Job.Service): def __init__(self, cmd): super().__init__() self.cmd = cmd
[docs] def start(self, fileStore): subprocess.check_call(self.cmd + " 1", shell=True)
[docs] def check(self): return True
[docs] def stop(self, fileStore): subprocess.check_call(self.cmd + " -1", shell=True)
[docs] @slow @needs_gridengine class GridEngineBatchSystemTest(hidden.AbstractGridEngineBatchSystemTest): """ Tests against the GridEngine batch system """
[docs] def createBatchSystem(self) -> AbstractBatchSystem: from toil.batchSystems.gridengine import GridEngineBatchSystem return GridEngineBatchSystem( config=self.config, maxCores=numCores, maxMemory=1000e9, maxDisk=1e9 )
[docs] def tearDown(self): super().tearDown() # Cleanup GridEngine output log file from qsub from glob import glob for f in glob("toil_job*.o*"): os.unlink(f)
[docs] @slow @needs_slurm class SlurmBatchSystemTest(hidden.AbstractGridEngineBatchSystemTest): """ Tests against the Slurm batch system """
[docs] def createBatchSystem(self) -> AbstractBatchSystem: from toil.batchSystems.slurm import SlurmBatchSystem return SlurmBatchSystem( config=self.config, maxCores=numCores, maxMemory=1000e9, maxDisk=1e9 )
[docs] def tearDown(self): super().tearDown() # Cleanup 'slurm-%j.out' produced by sbatch from glob import glob for f in glob("slurm-*.out"): os.unlink(f)
[docs] @slow @needs_lsf class LSFBatchSystemTest(hidden.AbstractGridEngineBatchSystemTest): """ Tests against the LSF batch system """
[docs] def createBatchSystem(self) -> AbstractBatchSystem: from toil.batchSystems.lsf import LSFBatchSystem return LSFBatchSystem( config=self.config, maxCores=numCores, maxMemory=1000e9, maxDisk=1e9 )
[docs] @slow @needs_torque class TorqueBatchSystemTest(hidden.AbstractGridEngineBatchSystemTest): """ Tests against the Torque batch system """ def _createDummyConfig(self): config = super()._createDummyConfig() # can't use _getTestJobStorePath since that method removes the directory config.jobStore = self._createTempDir("jobStore") return config
[docs] def createBatchSystem(self) -> AbstractBatchSystem: from toil.batchSystems.torque import TorqueBatchSystem return TorqueBatchSystem( config=self.config, maxCores=numCores, maxMemory=1000e9, maxDisk=1e9 )
[docs] def tearDown(self): super().tearDown() # Cleanup 'toil_job-%j.out' produced by sbatch from glob import glob for f in glob("toil_job_*.[oe]*"): os.unlink(f)
[docs] @slow @needs_htcondor class HTCondorBatchSystemTest(hidden.AbstractGridEngineBatchSystemTest): """ Tests against the HTCondor batch system """
[docs] def createBatchSystem(self) -> AbstractBatchSystem: from toil.batchSystems.htcondor import HTCondorBatchSystem return HTCondorBatchSystem( config=self.config, maxCores=numCores, maxMemory=1000e9, maxDisk=1e9 )
[docs] def tearDown(self): super().tearDown()
[docs] class SingleMachineBatchSystemJobTest(hidden.AbstractBatchSystemJobTest): """ Tests Toil workflow against the SingleMachine batch system """
[docs] def getBatchSystemName(self): return "single_machine"
[docs] @slow @retry_flaky_test( prepare=[ hidden.AbstractBatchSystemJobTest.tearDown, hidden.AbstractBatchSystemJobTest.setUp, ] ) def testConcurrencyWithDisk(self): """ Tests that the batch system is allocating disk resources properly """ tempDir = self._createTempDir("testFiles") options = Job.Runner.getDefaultOptions(self._getTestJobStorePath()) options.workDir = tempDir from toil import physicalDisk availableDisk = physicalDisk(options.workDir) logger.info("Testing disk concurrency limits with %s disk space", availableDisk) # More disk might become available by the time Toil starts, so we limit it here options.maxDisk = availableDisk options.batchSystem = self.batchSystemName counterPath = os.path.join(tempDir, "counter") resetCounters(counterPath) value, maxValue = getCounters(counterPath) assert (value, maxValue) == (0, 0) half_disk = availableDisk // 2 more_than_half_disk = half_disk + 500 logger.info("Dividing into parts of %s and %s", half_disk, more_than_half_disk) root = Job() # Physically, we're asking for 50% of disk and 50% of disk + 500bytes in the two jobs. The # batchsystem should not allow the 2 child jobs to run concurrently. root.addChild( Job.wrapFn( measureConcurrency, counterPath, self.sleepTime, cores=1, memory="1M", disk=half_disk, ) ) root.addChild( Job.wrapFn( measureConcurrency, counterPath, self.sleepTime, cores=1, memory="1M", disk=more_than_half_disk, ) ) Job.Runner.startToil(root, options) _, maxValue = getCounters(counterPath) logger.info("After run: %s disk space", physicalDisk(options.workDir)) self.assertEqual(maxValue, 1)
[docs] @skipIf( SingleMachineBatchSystem.numCores < 4, "Need at least four cores to run this test", ) @slow def testNestedResourcesDoNotBlock(self): """ Resources are requested in the order Memory > Cpu > Disk. Test that unavailability of cpus for one job that is scheduled does not block another job that can run. """ tempDir = self._createTempDir("testFiles") options = Job.Runner.getDefaultOptions(self._getTestJobStorePath()) options.workDir = tempDir options.maxCores = 4 from toil import physicalMemory availableMemory = physicalMemory() options.batchSystem = self.batchSystemName outFile = os.path.join(tempDir, "counter") open(outFile, "w").close() root = Job() blocker = Job.wrapFn( _resourceBlockTestAuxFn, outFile=outFile, sleepTime=30, writeVal="b", cores=2, memory="1M", disk="1M", ) firstJob = Job.wrapFn( _resourceBlockTestAuxFn, outFile=outFile, sleepTime=5, writeVal="fJ", cores=1, memory="1M", disk="1M", ) secondJob = Job.wrapFn( _resourceBlockTestAuxFn, outFile=outFile, sleepTime=10, writeVal="sJ", cores=1, memory="1M", disk="1M", ) # Should block off 50% of memory while waiting for it's 3 cores firstJobChild = Job.wrapFn( _resourceBlockTestAuxFn, outFile=outFile, sleepTime=0, writeVal="fJC", cores=3, memory=int(availableMemory // 2), disk="1M", ) # These two shouldn't be able to run before B because there should be only # (50% of memory - 1M) available (firstJobChild should be blocking 50%) secondJobChild = Job.wrapFn( _resourceBlockTestAuxFn, outFile=outFile, sleepTime=5, writeVal="sJC", cores=2, memory=int(availableMemory // 1.5), disk="1M", ) secondJobGrandChild = Job.wrapFn( _resourceBlockTestAuxFn, outFile=outFile, sleepTime=5, writeVal="sJGC", cores=2, memory=int(availableMemory // 1.5), disk="1M", ) root.addChild(blocker) root.addChild(firstJob) root.addChild(secondJob) firstJob.addChild(firstJobChild) secondJob.addChild(secondJobChild) secondJobChild.addChild(secondJobGrandChild) """ The tree is: root / | \ b fJ sJ | | fJC sJC | sJGC But the order of execution should be root > b , fJ, sJ > sJC > sJGC > fJC since fJC cannot run till bl finishes but sJC and sJGC can(fJC blocked by disk). If the resource acquisition is written properly, then fJC which is scheduled before sJC and sJGC should not block them, and should only run after they finish. """ Job.Runner.startToil(root, options) with open(outFile) as oFH: outString = oFH.read() # The ordering of b, fJ and sJ is non-deterministic since they are scheduled at the same # time. We look for all possible permutations. possibleStarts = tuple( "".join(x) for x in itertools.permutations(["b", "fJ", "sJ"]) ) assert outString.startswith(possibleStarts) assert outString.endswith("sJCsJGCfJC")
def _resourceBlockTestAuxFn(outFile, sleepTime, writeVal): """ Write a value to the out file and then sleep for requested seconds. :param str outFile: File to write to :param int sleepTime: Time to sleep for :param str writeVal: Character to write """ with open(outFile, "a") as oFH: fcntl.flock(oFH, fcntl.LOCK_EX) oFH.write(writeVal) time.sleep(sleepTime)
[docs] @slow @needs_mesos class MesosBatchSystemJobTest(hidden.AbstractBatchSystemJobTest, MesosTestSupport): """ Tests Toil workflow against the Mesos batch system """
[docs] def getOptions(self, tempDir): options = super().getOptions(tempDir) options.mesos_endpoint = "localhost:5050" return options
[docs] def getBatchSystemName(self): self._startMesos(self.cpuCount) return "mesos"
[docs] def tearDown(self): self._stopMesos()
[docs] def measureConcurrency(filepath, sleep_time=10): """ Run in parallel to determine the number of concurrent tasks. This code was copied from toil.batchSystemTestMaxCoresSingleMachineBatchSystemTest :param str filepath: path to counter file :param int sleep_time: number of seconds to sleep before counting down :return int max concurrency value: """ count(1, filepath) try: time.sleep(sleep_time) finally: return count(-1, filepath)
[docs] def count(delta, file_path): """ Increments counter file and returns the max number of times the file has been modified. Counter data must be in the form: concurrent tasks, max concurrent tasks (counter should be initialized to 0,0) :param int delta: increment value :param str file_path: path to shared counter file :return int max concurrent tasks: """ fd = os.open(file_path, os.O_RDWR) try: fcntl.flock(fd, fcntl.LOCK_EX) try: s = os.read(fd, 10) value, maxValue = (int(i) for i in s.decode("utf-8").split(",")) value += delta if value > maxValue: maxValue = value os.lseek(fd, 0, 0) os.ftruncate(fd, 0) os.write(fd, f"{value},{maxValue}".encode()) finally: fcntl.flock(fd, fcntl.LOCK_UN) finally: os.close(fd) return maxValue
[docs] def getCounters(path): with open(path, "r+") as f: concurrentTasks, maxConcurrentTasks = (int(i) for i in f.read().split(",")) return concurrentTasks, maxConcurrentTasks
[docs] def resetCounters(path): with open(path, "w") as f: f.write("0,0") f.close()
[docs] def get_omp_threads() -> str: return os.environ["OMP_NUM_THREADS"]