Source code for toil.test

"""Base testing class for Toil."""

# Copyright (C) 2015-2021 Regents of the University of California
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import datetime
import logging
import os
import random
import re
import shutil
import signal
import subprocess
import threading
import time
import unittest
import uuid
import zoneinfo
from abc import ABCMeta, abstractmethod
from collections.abc import Generator
from contextlib import contextmanager
from inspect import getsource
from shutil import which
from tempfile import mkstemp
from textwrap import dedent
from typing import Any, Callable, Literal, Optional, TypeVar, Union, cast
from unittest.util import strclass
from urllib.error import HTTPError, URLError
from urllib.request import urlopen

from toil import ApplianceImageNotFound, applianceSelf, toilPackageDirPath
from toil.lib.accelerators import (
    have_working_nvidia_docker_runtime,
    have_working_nvidia_smi,
)
from toil.lib.io import mkdtemp
from toil.lib.iterables import concat
from toil.lib.memoize import memoize
from toil.lib.threading import ExceptionalThread, cpu_count
from toil.version import distVersion

logger = logging.getLogger(__name__)


[docs] class ToilTest(unittest.TestCase): """ A common base class for Toil tests. Please have every test case directly or indirectly inherit this one. When running tests you may optionally set the TOIL_TEST_TEMP environment variable to the path of a directory where you want temporary test files be placed. The directory will be created if it doesn't exist. The path may be relative in which case it will be assumed to be relative to the project root. If TOIL_TEST_TEMP is not defined, temporary files and directories will be created in the system's default location for such files and any temporary files or directories left over from tests will be removed automatically removed during tear down. Otherwise, left-over files will not be removed. """ _tempBaseDir: Optional[str] = None _tempDirs: list[str] = []
[docs] def setup_method(self, method: Any) -> None: western = zoneinfo.ZoneInfo("America/Los_Angeles") california_time = datetime.datetime.now(tz=western) timestamp = california_time.strftime("%b %d %Y %H:%M:%S:%f %Z") print( f"\n\n[TEST] {strclass(self.__class__)}:{self._testMethodName} ({timestamp})\n\n" )
[docs] @classmethod def setUpClass(cls) -> None: super().setUpClass() tempBaseDir = os.environ.get("TOIL_TEST_TEMP", None) if tempBaseDir is not None and not os.path.isabs(tempBaseDir): tempBaseDir = os.path.abspath( os.path.join(cls._projectRootPath(), tempBaseDir) ) os.makedirs(tempBaseDir, exist_ok=True) cls._tempBaseDir = tempBaseDir
[docs] @classmethod def tearDownClass(cls) -> None: if cls._tempBaseDir is None: while cls._tempDirs: tempDir = cls._tempDirs.pop() if os.path.exists(tempDir): shutil.rmtree(tempDir) else: cls._tempDirs = [] super().tearDownClass()
[docs] def setUp(self) -> None: logger.info("Setting up %s ...", self.id()) super().setUp()
[docs] def tearDown(self) -> None: super().tearDown() logger.info("Tore down %s", self.id())
[docs] @classmethod def awsRegion(cls) -> str: """ Pick an appropriate AWS region. Use us-west-2 unless running on EC2, in which case use the region in which the instance is located """ from toil.lib.aws import running_on_ec2 return cls._region() if running_on_ec2() else "us-west-2"
@classmethod def _availabilityZone(cls) -> str: """ Query this instance's metadata to determine in which availability zone it is running. Used only when running on EC2. """ zone = urlopen( "http://169.254.169.254/latest/meta-data/placement/availability-zone" ).read() return zone if not isinstance(zone, bytes) else zone.decode("utf-8") @classmethod @memoize def _region(cls) -> str: """ Determine in what region this instance is running. Used only when running on EC2. The region will not change over the life of the instance so the result is memoized to avoid unnecessary work. """ region = re.match( r"^([a-z]{2}-[a-z]+-[1-9][0-9]*)([a-z])$", cls._availabilityZone() ) assert region return region.group(1) @classmethod def _getUtilScriptPath(cls, script_name: str) -> str: return os.path.join(toilPackageDirPath(), "utils", script_name + ".py") @classmethod def _projectRootPath(cls) -> str: """ Return the path to the project root. i.e. the directory that typically contains the .git and src subdirectories. This method has limited utility. It only works if in "develop" mode, since it assumes the existence of a src subdirectory which, in a regular install wouldn't exist. Then again, in that mode project root has no meaning anyways. """ assert re.search(r"__init__\.pyc?$", __file__) projectRootPath = os.path.dirname(os.path.abspath(__file__)) packageComponents = __name__.split(".") expectedSuffix = os.path.join("src", *packageComponents) assert projectRootPath.endswith(expectedSuffix) projectRootPath = projectRootPath[: -len(expectedSuffix)] return projectRootPath def _createTempDir(self, purpose: Optional[str] = None) -> str: return self._createTempDirEx(self._testMethodName, purpose) @classmethod def _createTempDirEx(cls, *names: Optional[str]) -> str: classname = strclass(cls) if classname.startswith("toil.test."): classname = classname[len("toil.test.") :] prefix = ["toil", "test", classname] prefix.extend([_f for _f in names if _f]) prefix.append("") temp_dir_path = os.path.realpath( mkdtemp(dir=cls._tempBaseDir, prefix="-".join(prefix)) ) cls._tempDirs.append(temp_dir_path) return temp_dir_path @classmethod def _getTestJobStorePath(cls) -> str: path = cls._createTempDirEx("jobstore") # We only need a unique path, directory shouldn't actually exist. This of course is racy # and insecure because another thread could now allocate the same path as a temporary # directory. However, the built-in tempfile module randomizes the name temp dir suffixes # reasonably well (1 in 63 ^ 6 chance of collision), making this an unlikely scenario. os.rmdir(path) return path @classmethod def _getSourceDistribution(cls) -> str: """ Find the sdist tarball for this project and return the path to it. Also assert that the sdist is up-to date """ sdistPath = os.path.join( cls._projectRootPath(), "dist", "toil-%s.tar.gz" % distVersion ) assert os.path.isfile(sdistPath), ( "Can't find Toil source distribution at %s. Run 'make sdist'." % sdistPath ) excluded = set( cast( str, cls._run( "git", "ls-files", "--others", "-i", "--exclude-standard", capture=True, cwd=cls._projectRootPath(), ), ).splitlines() ) dirty = cast( str, cls._run( "find", "src", "-type", "f", "-newer", sdistPath, capture=True, cwd=cls._projectRootPath(), ), ).splitlines() assert all(path.startswith("src") for path in dirty) dirty_set = set(dirty) dirty_set.difference_update(excluded) assert ( not dirty_set ), "Run 'make clean_sdist sdist'. Files newer than {}: {!r}".format( sdistPath, list(dirty_set), ) return sdistPath @classmethod def _run(cls, command: str, *args: str, **kwargs: Any) -> Optional[str]: """ Run a command. Convenience wrapper for subprocess.check_call and subprocess.check_output. :param command: The command to be run. :param args: Any arguments to be passed to the command. :param kwargs: keyword arguments for subprocess.Popen constructor. Pass capture=True to have the process' stdout returned. Pass input='some string' to feed input to the process' stdin. :return: The output of the process' stdout if capture=True was passed, None otherwise. """ argl = list(concat(command, args)) logger.info("Running %r", argl) capture = kwargs.pop("capture", False) _input = kwargs.pop("input", None) if capture: kwargs["stdout"] = subprocess.PIPE if _input is not None: kwargs["stdin"] = subprocess.PIPE popen = subprocess.Popen(args, universal_newlines=True, **kwargs) stdout, stderr = popen.communicate(input=_input) assert stderr is None if popen.returncode != 0: raise subprocess.CalledProcessError(popen.returncode, argl) if capture: return cast(Optional[str], stdout) def _getScriptSource(self, callable_: Callable[..., Any]) -> str: """ Return the source code of the body of given callable as a string, dedented. This is a naughty but incredibly useful trick that lets you embed user scripts as nested functions and expose them to the syntax checker of your IDE. """ return dedent("\n".join(getsource(callable_).split("\n")[1:]))
MT = TypeVar("MT", bound=Callable[..., Any]) try: # noinspection PyUnresolvedReferences from pytest import mark as pytest_mark except ImportError: # noinspection PyUnusedLocal def _mark_test(name: str, test_item: MT) -> MT: return test_item else: def _mark_test(name: str, test_item: MT) -> MT: return cast(MT, getattr(pytest_mark, name)(test_item))
[docs] def get_temp_file(suffix: str = "", rootDir: Optional[str] = None) -> str: """Return a string representing a temporary file, that must be manually deleted.""" if rootDir is None: handle, tmp_file = mkstemp(suffix) os.close(handle) return tmp_file else: alphanumerics = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz" tmp_file = os.path.join( rootDir, f"tmp_{''.join([random.choice(alphanumerics) for _ in range(0, 10)])}{suffix}", ) open(tmp_file, "w").close() os.chmod(tmp_file, 0o777) # Ensure everyone has access to the file. return tmp_file
[docs] def needs_env_var(var_name: str, comment: Optional[str] = None) -> Callable[[MT], MT]: """ Use as a decorator before test classes or methods to run only if the given environment variable is set. Can include a comment saying what the variable should be set to. """ def decorator(test_item: MT) -> MT: if not os.getenv(var_name): return unittest.skip( f"Set {var_name}{' to ' + comment if comment else ''} to include this test." )(test_item) return test_item return decorator
[docs] def needs_rsync3(test_item: MT) -> MT: """ Decorate classes or methods that depend on any features from rsync version 3.0.0+. Necessary because :meth:`utilsTest.testAWSProvisionerUtils` uses option `--protect-args` which is only available in rsync 3 """ test_item = _mark_test("rsync", test_item) try: versionInfo = subprocess.check_output(["rsync", "--version"]).decode("utf-8") # output looks like: 'rsync version 2.6.9 ...' if int(versionInfo.split()[2].split(".")[0]) < 3: return unittest.skip("This test depends on rsync version 3.0.0+.")( test_item ) except subprocess.CalledProcessError: return unittest.skip("rsync needs to be installed to run this test.")(test_item) return test_item
[docs] def needs_online(test_item: MT) -> MT: """Use as a decorator before test classes or methods to run only if we are meant to talk to the Internet.""" test_item = _mark_test("online", test_item) if os.getenv("TOIL_SKIP_ONLINE", "").lower() == "true": return unittest.skip("Skipping online test.")(test_item) return test_item
[docs] def needs_aws_s3(test_item: MT) -> MT: """Use as a decorator before test classes or methods to run only if AWS S3 is usable.""" # TODO: we just check for generic access to the AWS account test_item = _mark_test("aws-s3", needs_online(test_item)) try: from boto3 import Session session = Session() boto3_credentials = session.get_credentials() except ImportError: return unittest.skip("Install Toil with the 'aws' extra to include this test.")( test_item ) from toil.lib.aws import running_on_ec2 if not ( boto3_credentials or os.path.exists(os.path.expanduser("~/.aws/credentials")) or running_on_ec2() ): return unittest.skip("Configure AWS credentials to include this test.")( test_item ) return test_item
[docs] def needs_aws_ec2(test_item: MT) -> MT: """Use as a decorator before test classes or methods to run only if AWS EC2 is usable.""" # Assume we need S3 as well as EC2 test_item = _mark_test("aws-ec2", needs_aws_s3(test_item)) # In addition to S3 we also need an SSH key to deploy with. # TODO: We assume that if this is set we have EC2 access. test_item = needs_env_var("TOIL_AWS_KEYNAME", "an AWS-stored SSH key")(test_item) return test_item
[docs] def needs_aws_batch(test_item: MT) -> MT: """ Use as a decorator before test classes or methods to run only if AWS Batch is usable. """ # Assume we need S3 as well as Batch test_item = _mark_test("aws-batch", needs_aws_s3(test_item)) # Assume we have Batch if the user has set these variables. test_item = needs_env_var("TOIL_AWS_BATCH_QUEUE", "an AWS Batch queue name or ARN")( test_item ) test_item = needs_env_var( "TOIL_AWS_BATCH_JOB_ROLE_ARN", "an IAM role ARN that grants S3 and SDB access" )(test_item) try: from toil.lib.aws import get_current_aws_region if get_current_aws_region() is None: # We don't know a region so we need one set. # TODO: It always won't be set if we get here. test_item = needs_env_var( "TOIL_AWS_REGION", "an AWS region to use with AWS batch" )(test_item) except ImportError: return unittest.skip("Install Toil with the 'aws' extra to include this test.")( test_item ) return test_item
[docs] def needs_google_storage(test_item: MT) -> MT: """ Use as a decorator before test classes or methods to run only if Google Cloud is installed and we ought to be able to access public Google Storage URIs. """ test_item = _mark_test("google-storage", needs_online(test_item)) try: from google.cloud import storage # noqa except ImportError: return unittest.skip( "Install Toil with the 'google' extra to include this test." )(test_item) return test_item
[docs] def needs_google_project(test_item: MT) -> MT: """ Use as a decorator before test classes or methods to run only if we have a Google Cloud project set. """ test_item = _mark_test("google-project", needs_online(test_item)) test_item = needs_env_var("TOIL_GOOGLE_PROJECTID", "a Google project ID")(test_item) return test_item
[docs] def needs_gridengine(test_item: MT) -> MT: """Use as a decorator before test classes or methods to run only if GridEngine is installed.""" test_item = _mark_test("gridengine", test_item) if which("qhost"): return test_item return unittest.skip("Install GridEngine to include this test.")(test_item)
[docs] def needs_torque(test_item: MT) -> MT: """Use as a decorator before test classes or methods to run only if PBS/Torque is installed.""" test_item = _mark_test("torque", test_item) if which("pbsnodes"): return test_item return unittest.skip("Install PBS/Torque to include this test.")(test_item)
[docs] def needs_kubernetes_installed(test_item: MT) -> MT: """Use as a decorator before test classes or methods to run only if Kubernetes is installed.""" test_item = _mark_test("kubernetes", test_item) try: import kubernetes str(kubernetes) # to prevent removal of this import except ImportError: return unittest.skip( "Install Toil with the 'kubernetes' extra to include this test." )(test_item) return test_item
[docs] def needs_kubernetes(test_item: MT) -> MT: """Use as a decorator before test classes or methods to run only if Kubernetes is installed and configured.""" test_item = needs_kubernetes_installed(needs_online(test_item)) try: import kubernetes try: kubernetes.config.load_kube_config() except kubernetes.config.ConfigException: try: kubernetes.config.load_incluster_config() except kubernetes.config.ConfigException: return unittest.skip( "Configure Kubernetes (~/.kube/config, $KUBECONFIG, " "or current pod) to include this test." )(test_item) except ImportError: # We should already be skipping this test pass return test_item
[docs] def needs_mesos(test_item: MT) -> MT: """Use as a decorator before test classes or methods to run only if Mesos is installed.""" test_item = _mark_test("mesos", test_item) if not (which("mesos-master") or which("mesos-agent")): return unittest.skip( "Install Mesos (and Toil with the 'mesos' extra) to include this test." )(test_item) try: import psutil # noqa import pymesos # noqa except ImportError: return unittest.skip( "Install Mesos (and Toil with the 'mesos' extra) to include this test." )(test_item) return test_item
[docs] def needs_slurm(test_item: MT) -> MT: """Use as a decorator before test classes or methods to run only if Slurm is installed.""" test_item = _mark_test("slurm", test_item) if which("squeue"): return test_item return unittest.skip("Install Slurm to include this test.")(test_item)
[docs] def needs_htcondor(test_item: MT) -> MT: """Use a decorator before test classes or methods to run only if the HTCondor is installed.""" test_item = _mark_test("htcondor", test_item) try: import htcondor htcondor.Collector(os.getenv("TOIL_HTCONDOR_COLLECTOR")).query( constraint="False" ) except ImportError: return unittest.skip( "Install the HTCondor Python bindings to include this test." )(test_item) except OSError: return unittest.skip("HTCondor must be running to include this test.")( test_item ) except RuntimeError: return unittest.skip( "HTCondor must be installed and configured to include this test." )(test_item) else: return test_item
[docs] def needs_lsf(test_item: MT) -> MT: """ Use as a decorator before test classes or methods to only run them if LSF is installed. """ test_item = _mark_test("lsf", test_item) if which("bsub"): return test_item else: return unittest.skip("Install LSF to include this test.")(test_item)
[docs] def needs_java(test_item: MT) -> MT: """Use as a test decorator to run only if java is installed.""" test_item = _mark_test("java", test_item) if which("java"): return test_item else: return unittest.skip("Install java to include this test.")(test_item)
[docs] def needs_docker(test_item: MT) -> MT: """ Use as a decorator before test classes or methods to only run them if docker is installed and docker-based tests are enabled. """ test_item = _mark_test("docker", needs_online(test_item)) if os.getenv("TOIL_SKIP_DOCKER", "").lower() == "true": return unittest.skip("Skipping docker test.")(test_item) if which("docker"): return test_item else: return unittest.skip("Install docker to include this test.")(test_item)
[docs] def needs_singularity(test_item: MT) -> MT: """ Use as a decorator before test classes or methods to only run them if singularity is installed. """ test_item = _mark_test("singularity", needs_online(test_item)) if which("singularity"): return test_item else: return unittest.skip("Install singularity to include this test.")(test_item)
[docs] def needs_singularity_or_docker(test_item: MT) -> MT: """ Use as a decorator before test classes or methods to only run them if docker is installed and docker-based tests are enabled, or if Singularity is installed. """ # TODO: Is there a good way to OR decorators? if which("singularity"): # Singularity is here, say it's a Singularity test return needs_singularity(test_item) else: # Otherwise say it's a Docker test. return needs_docker(test_item)
[docs] def needs_local_cuda(test_item: MT) -> MT: """ Use as a decorator before test classes or methods to only run them if a CUDA setup legible to cwltool (i.e. providing userspace nvidia-smi) is present. """ test_item = _mark_test("local_cuda", test_item) if have_working_nvidia_smi(): return test_item else: return unittest.skip( "Install nvidia-smi, an nvidia proprietary driver, and a CUDA-capable nvidia GPU to include this test." )(test_item)
[docs] def needs_docker_cuda(test_item: MT) -> MT: """ Use as a decorator before test classes or methods to only run them if a CUDA setup is available through Docker. """ test_item = _mark_test("docker_cuda", needs_online(test_item)) if have_working_nvidia_docker_runtime(): return test_item else: return unittest.skip( "Install nvidia-container-runtime on your Docker server and configure an 'nvidia' runtime to include this test." )(test_item)
[docs] def needs_encryption(test_item: MT) -> MT: """ Use as a decorator before test classes or methods to only run them if PyNaCl is installed and configured. """ test_item = _mark_test("encryption", test_item) try: # noinspection PyUnresolvedReferences import nacl # noqa except ImportError: return unittest.skip( "Install Toil with the 'encryption' extra to include this test." )(test_item) else: return test_item
[docs] def needs_cwl(test_item: MT) -> MT: """ Use as a decorator before test classes or methods to only run them if CWLTool is installed and configured. """ test_item = _mark_test("cwl", test_item) try: # noinspection PyUnresolvedReferences import cwltool # noqa except ImportError: return unittest.skip("Install Toil with the 'cwl' extra to include this test.")( test_item ) else: return test_item
[docs] def needs_wdl(test_item: MT) -> MT: """ Use as a decorator before test classes or methods to only run them if miniwdl is installed and configured. """ test_item = _mark_test("wdl", test_item) try: # noinspection PyUnresolvedReferences import WDL # noqa except ImportError: return unittest.skip("Install Toil with the 'wdl' extra to include this test.")( test_item ) else: return test_item
[docs] def needs_server(test_item: MT) -> MT: """ Use as a decorator before test classes or methods to only run them if Connexion is installed. """ test_item = _mark_test("server_mode", test_item) try: # noinspection PyUnresolvedReferences import connexion print(connexion.__file__) # keep this import from being removed. except ImportError: return unittest.skip( "Install Toil with the 'server' extra to include this test." )(test_item) else: return test_item
[docs] def needs_celery_broker(test_item: MT) -> MT: """ Use as a decorator before test classes or methods to run only if RabbitMQ is set up to take Celery jobs. """ test_item = _mark_test("celery", needs_online(test_item)) test_item = needs_env_var( "TOIL_WES_BROKER_URL", "a URL to a RabbitMQ broker for Celery" )(test_item) return test_item
[docs] def needs_wes_server(test_item: MT) -> MT: """ Use as a decorator before test classes or methods to run only if a WES server is available to run against. """ test_item = _mark_test("wes_server", needs_online(test_item)) wes_url = os.environ.get("TOIL_WES_ENDPOINT") if not wes_url: return unittest.skip(f"Set TOIL_WES_ENDPOINT to include this test")(test_item) try: urlopen(f"{wes_url}/ga4gh/wes/v1/service-info") except (HTTPError, URLError) as e: return unittest.skip(f"Run a WES server on {wes_url} to include this test")( test_item ) return test_item
[docs] def needs_local_appliance(test_item: MT) -> MT: """ Use as a decorator before test classes or methods to only run them if the Toil appliance Docker image is downloaded. """ test_item = _mark_test("appliance", test_item) if os.getenv("TOIL_SKIP_DOCKER", "").lower() == "true": return unittest.skip("Skipping docker test.")(test_item) if not which("docker"): return unittest.skip("Install docker to include this test.")(test_item) try: image = applianceSelf() except ApplianceImageNotFound: return unittest.skip( "Appliance image is not published. Use 'make test' target to automatically " "build appliance, or just run 'make push_docker' prior to running this " "test." )(test_item) try: stdout, stderr = subprocess.Popen( ["docker", "inspect", '--format="{{json .RepoTags}}"', image], stdout=subprocess.PIPE, stderr=subprocess.PIPE, ).communicate() if image in stdout.decode("utf-8"): return test_item except Exception: pass return unittest.skip( f"Cannot find appliance {image} locally. Use 'make test' target to automatically " "build appliance, or just run 'make push_docker' prior to running this " "test." )(test_item)
[docs] def needs_fetchable_appliance(test_item: MT) -> MT: """ Use as a decorator before test classes or methods to only run them if the Toil appliance Docker image is able to be downloaded from the Internet. """ test_item = _mark_test("fetchable_appliance", needs_online(test_item)) if os.getenv("TOIL_SKIP_DOCKER", "").lower() == "true": return unittest.skip("Skipping docker test.")(test_item) try: applianceSelf() except ApplianceImageNotFound: # Not downloadable return unittest.skip( "Cannot see appliance in registry. Use 'make test' target to automatically " "build appliance, or just run 'make push_docker' prior to running this " "test." )(test_item) else: return test_item
[docs] def integrative(test_item: MT) -> MT: """ Use this to decorate integration tests so as to skip them during regular builds. We define integration tests as A) involving other, non-Toil software components that we develop and/or B) having a higher cost (time or money). """ test_item = _mark_test("integrative", test_item) if os.getenv("TOIL_TEST_INTEGRATIVE", "").lower() == "true": return test_item else: return unittest.skip( "Set TOIL_TEST_INTEGRATIVE=True to include this integration test, " "or run `make integration_test_local` to run all integration tests." )(test_item)
[docs] def slow(test_item: MT) -> MT: """ Use this decorator to identify tests that are slow and not critical. Skip if TOIL_TEST_QUICK is true. """ test_item = _mark_test("slow", test_item) if os.environ.get("TOIL_TEST_QUICK", "").lower() != "true": return test_item else: return unittest.skip('Skipped because TOIL_TEST_QUICK is "True"')(test_item)
methodNamePartRegex = re.compile("^[a-zA-Z_0-9]+$")
[docs] @contextmanager def timeLimit(seconds: int) -> Generator[None, None, None]: """ Use to limit the execution time of a function. Raises an exception if the execution of the function takes more than the specified amount of time. See <http://stackoverflow.com/a/601168>. :param seconds: maximum allowable time, in seconds >>> import time >>> with timeLimit(2): ... time.sleep(1) >>> import time >>> with timeLimit(1): ... time.sleep(2) Traceback (most recent call last): ... RuntimeError: Timed out """ # noinspection PyUnusedLocal def signal_handler(signum: int, frame: Any) -> None: raise RuntimeError("Timed out") signal.signal(signal.SIGALRM, signal_handler) signal.alarm(seconds) try: yield finally: signal.alarm(0)
[docs] def make_tests(generalMethod, targetClass, **kwargs): """ This method dynamically generates test methods using the generalMethod as a template. Each generated function is the result of a unique combination of parameters applied to the generalMethod. Each of the parameters has a corresponding string that will be used to name the method. These generated functions are named in the scheme: test_[generalMethodName]___[ firstParamaterName]_[someValueName]__[secondParamaterName]_... The arguments following the generalMethodName should be a series of one or more dictionaries of the form {str : type, ...} where the key represents the name of the value. The names will be used to represent the permutation of values passed for each parameter in the generalMethod. The generated method names will list the parameters in lexicographic order by parameter name. :param generalMethod: A method that will be parameterized with values passed as kwargs. Note that the generalMethod must be a regular method. :param targetClass: This represents the class to which the generated test methods will be bound. If no targetClass is specified the class of the generalMethod is assumed the target. :param kwargs: a series of dictionaries defining values, and their respective names where each keyword is the name of a parameter in generalMethod. >>> class Foo: ... def has(self, num, letter): ... return num, letter ... ... def hasOne(self, num): ... return num >>> class Bar(Foo): ... pass >>> make_tests(Foo.has, Bar, num={'one':1, 'two':2}, letter={'a':'a', 'b':'b'}) >>> b = Bar() Note that num comes lexicographically before letter and so appears first in the generated method names. >>> assert b.test_has__letter_a__num_one() == b.has(1, 'a') >>> assert b.test_has__letter_b__num_one() == b.has(1, 'b') >>> assert b.test_has__letter_a__num_two() == b.has(2, 'a') >>> assert b.test_has__letter_b__num_two() == b.has(2, 'b') >>> f = Foo() >>> hasattr(f, 'test_has__num_one__letter_a') # should be false because Foo has no test methods False """ def permuteIntoLeft(left, rParamName, right): """ Permutes values in right dictionary into each parameter: value dict pair in the left dictionary. Such that the left dictionary will contain a new set of keys each of which is a combination of one of its original parameter-value names appended with some parameter-value name from the right dictionary. Each original key in the left is deleted from the left dictionary after the permutation of the key and every parameter-value name from the right has been added to the left dictionary. For example if left is {'__PrmOne_ValName':{'ValName':Val}} and right is {'rValName1':rVal1, 'rValName2':rVal2} then left will become {'__PrmOne_ValName__rParamName_rValName1':{'ValName':Val. 'rValName1':rVal1}, '__PrmOne_ValName__rParamName_rValName2':{'ValName':Val. 'rValName2':rVal2}} :param left: A dictionary pairing each paramNameValue to a nested dictionary that contains each ValueName and value pair described in the outer dict's paramNameValue key. :param rParamName: The name of the parameter that each value in the right dict represents. :param right: A dict that pairs 1 or more valueNames and values for the rParamName parameter. """ for prmValName, lDict in list(left.items()): for rValName, rVal in list(right.items()): nextPrmVal = f"__{rParamName}_{rValName.lower()}" if methodNamePartRegex.match(nextPrmVal) is None: raise RuntimeError( "The name '%s' cannot be used in a method name" % pvName ) aggDict = dict(lDict) aggDict[rParamName] = rVal left[prmValName + nextPrmVal] = aggDict left.pop(prmValName) def insertMethodToClass(): """Generate and insert test methods.""" def fx(self, prms=prms): if prms is not None: return generalMethod(self, **prms) else: return generalMethod(self) methodName = f"test_{generalMethod.__name__}{prmNames}" setattr(targetClass, methodName, fx) if len(kwargs) > 0: # Define order of kwargs. # We keep them in reverse order of how we use them for efficient pop. sortedKwargs = sorted(list(kwargs.items()), reverse=True) # create first left dict left = {} prmName, vals = sortedKwargs.pop() for valName, val in list(vals.items()): pvName = f"__{prmName}_{valName.lower()}" if methodNamePartRegex.match(pvName) is None: raise RuntimeError( "The name '%s' cannot be used in a method name" % pvName ) left[pvName] = {prmName: val} # get cartesian product while len(sortedKwargs) > 0: permuteIntoLeft(left, *sortedKwargs.pop()) # set class attributes targetClass = targetClass or generalMethod.__class__ for prmNames, prms in list(left.items()): insertMethodToClass() else: prms = None prmNames = "" insertMethodToClass()
[docs] class ApplianceTestSupport(ToilTest): """ A Toil test that runs a user script on a minimal cluster of appliance containers. i.e. one leader container and one worker container. """ @contextmanager def _applianceCluster( self, mounts: dict[str, str], numCores: Optional[int] = None ) -> Generator[ tuple["ApplianceTestSupport.LeaderThread", "ApplianceTestSupport.WorkerThread"], None, None, ]: """ Context manager for creating and tearing down an appliance cluster. :param mounts: Dictionary mapping host paths to container paths. Both the leader and the worker container will be started with one -v argument per dictionary entry, as in -v KEY:VALUE. Beware that if KEY is a path to a directory, its entire content will be deleted when the cluster is torn down. :param numCores: The number of cores to be offered by the Mesos agent process running in the worker container. :return: A tuple of the form `(leader, worker)` containing the Appliance instances representing the respective appliance containers """ if numCores is None: numCores = cpu_count() # The last container to stop (and the first to start) should clean the mounts. with self.LeaderThread(self, mounts, cleanMounts=True) as leader: with self.WorkerThread(self, mounts, numCores) as worker: yield leader, worker
[docs] class Appliance(ExceptionalThread, metaclass=ABCMeta): @abstractmethod def _getRole(self) -> str: return "leader" @abstractmethod def _containerCommand(self) -> list[str]: pass @abstractmethod def _entryPoint(self) -> str: pass # Lock is used because subprocess is NOT thread safe: http://tinyurl.com/pkp5pgq lock = threading.Lock() def __init__( self, outer: "ApplianceTestSupport", mounts: dict[str, str], cleanMounts: bool = False, ) -> None: assert all( " " not in v for v in mounts.values() ), "No spaces allowed in mounts" super().__init__() self.outer = outer self.mounts = mounts self.cleanMounts = cleanMounts self.containerName = str(uuid.uuid4()) self.popen: Optional[subprocess.Popen[bytes]] = None
[docs] def __enter__(self) -> "Appliance": with self.lock: image = applianceSelf() # Omitting --rm, it's unreliable, see https://github.com/docker/docker/issues/16575 args = list( concat( "docker", "run", "--entrypoint=" + self._entryPoint(), "--net=host", "-i", "--name=" + self.containerName, ["--volume=%s:%s" % mount for mount in self.mounts.items()], image, self._containerCommand(), ) ) logger.info("Running %r", args) self.popen = subprocess.Popen(args) self.start() self.__wait_running() return self
# noinspection PyUnusedLocal
[docs] def __exit__( self, exc_type: type[BaseException], exc_val: Exception, exc_tb: Any ) -> Literal[False]: try: try: self.outer._run("docker", "stop", self.containerName) self.join() finally: if self.cleanMounts: self.__cleanMounts() finally: self.outer._run("docker", "rm", "-f", self.containerName) return False # don't swallow exception
def __wait_running(self) -> None: logger.info( "Waiting for %s container process to appear. " "Expect to see 'Error: No such image or container'.", self._getRole(), ) alive = cast( Callable[[], bool], getattr(self, "isAlive", getattr(self, "is_alive")) ) while alive(): try: running = cast( str, self.outer._run( "docker", "inspect", "--format={{ .State.Running }}", self.containerName, capture=True, ), ).strip() except subprocess.CalledProcessError: pass else: if "true" == running: break time.sleep(1) def __cleanMounts(self) -> None: """ Delete all files in every mounted directory. Without this step, we risk leaking files owned by root on the host. To avoid races, this method should be called after the appliance container was stopped, otherwise the running container might still be writing files. """ # Delete all files within each mounted directory, but not the directory itself. cmd = "shopt -s dotglob && rm -rf " + " ".join( v + "/*" for k, v in self.mounts.items() if os.path.isdir(k) ) self.outer._run( "docker", "run", "--rm", "--entrypoint=/bin/bash", applianceSelf(), "-c", cmd, )
[docs] def tryRun(self) -> None: assert self.popen self.popen.wait() logger.info("Exiting %s", self.__class__.__name__)
[docs] def runOnAppliance(self, *args: str, **kwargs: Any) -> None: # Check if thread is still alive. Note that ExceptionalThread.join raises the # exception that occurred in the thread. self.join(timeout=0) # noinspection PyProtectedMember self.outer._run("docker", "exec", "-i", self.containerName, *args, **kwargs)
[docs] def writeToAppliance(self, path: str, contents: Any) -> None: self.runOnAppliance("tee", path, input=contents)
[docs] def deployScript( self, path: str, packagePath: str, script: Union[str, Callable[..., Any]] ) -> None: """ Deploy a Python module on the appliance. :param path: the path (absolute or relative to the WORDIR of the appliance container) to the root of the package hierarchy where the given module should be placed. The given directory should be on the Python path. :param packagePath: the desired fully qualified module name (dotted form) of the module :param str|callable script: the contents of the Python module. If a callable is given, its source code will be extracted. This is a convenience that lets you embed user scripts into test code as nested function. """ if callable(script): script = self.outer._getScriptSource(script) packagePath_list = packagePath.split(".") packages, module = packagePath_list[:-1], packagePath_list[-1] for package in packages: path += "/" + package self.runOnAppliance("mkdir", "-p", path) self.writeToAppliance(path + "/__init__.py", "") self.writeToAppliance(path + "/" + module + ".py", script)
[docs] class LeaderThread(Appliance): def _entryPoint(self) -> str: return "mesos-master" def _getRole(self) -> str: return "leader" def _containerCommand(self) -> list[str]: return [ "--registry=in_memory", "--ip=127.0.0.1", "--port=5050", "--allocation_interval=500ms", ]
[docs] class WorkerThread(Appliance): def __init__( self, outer: "ApplianceTestSupport", mounts: dict[str, str], numCores: int ) -> None: self.numCores = numCores super().__init__(outer, mounts) def _entryPoint(self) -> str: return "mesos-agent" def _getRole(self) -> str: return "worker" def _containerCommand(self) -> list[str]: return [ "--work_dir=/var/lib/mesos", "--ip=127.0.0.1", "--master=127.0.0.1:5050", "--attributes=preemptible:False", "--resources=cpus(*):%i" % self.numCores, "--no-hostname_lookup", "--no-systemd_enable_support", ]