Source code for toil.test.lib.dockerTest

# Copyright (C) 2015-2021 Regents of the University of California
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import os
import signal
import time
import uuid
from threading import Thread

from docker.errors import ContainerError
from toil.common import Toil
from toil.exceptions import FailedJobsException
from toil.job import Job
from toil.lib.docker import (FORGO,
                             RM,
                             STOP,
                             apiDockerCall,
                             containerIsRunning,
                             dockerKill)
from toil.test import ToilTest, needs_docker, slow

logger = logging.getLogger(__name__)


[docs] @needs_docker class DockerTest(ToilTest): """ Tests dockerCall and ensures no containers are left around. When running tests you may optionally set the TOIL_TEST_TEMP environment variable to the path of a directory where you want temporary test files be placed. The directory will be created if it doesn't exist. The path may be relative in which case it will be assumed to be relative to the project root. If TOIL_TEST_TEMP is not defined, temporary files and directories will be created in the system's default location for such files and any temporary files or directories left over from tests will be removed automatically removed during tear down. Otherwise, left-over files will not be removed. """
[docs] def setUp(self): self.tempDir = self._createTempDir(purpose='tempDir') self.dockerTestLogLevel = 'INFO'
[docs] def testDockerClean(self, caching=False, detached=True, rm=True, deferParam=None): """ Run the test container that creates a file in the work dir, and sleeps for 5 minutes. Ensure that the calling job gets SIGKILLed after a minute, leaving behind the spooky/ghost/zombie container. Ensure that the container is killed on batch system shutdown (through the deferParam mechanism). """ # We need to test the behaviour of `deferParam` with `rm` and # `detached`. We do not look at the case where `rm` and `detached` are # both True. This is the truth table for the different combinations at # the end of the test. R = Running, X = Does not exist, E = Exists but # not running. # None FORGO STOP RM # rm X R X X # detached X R E X # Neither X R E X data_dir = os.path.join(self.tempDir, 'data') working_dir = os.path.join(self.tempDir, 'working') test_file = os.path.join(working_dir, 'test.txt') os.makedirs(data_dir, exist_ok=True) os.makedirs(working_dir, exist_ok=True) options = Job.Runner.getDefaultOptions(os.path.join(self.tempDir, 'jobstore')) options.logLevel = self.dockerTestLogLevel options.workDir = working_dir options.clean = 'always' options.retryCount = 0 # we're expecting the job to fail so don't retry! options.caching = caching # No base64 logic since it might create a name starting with a `-`. container_name = uuid.uuid4().hex A = Job.wrapJobFn(_testDockerCleanFn, working_dir, detached, rm, deferParam, container_name) try: with Toil(options) as toil: toil.start(A) except FailedJobsException: # The file created by spooky_container would remain in the directory # and since it was created inside the container, it would have had # uid and gid == 0 (root) which may cause problems when docker # attempts to clean up the jobstore. file_stats = os.stat(test_file) assert file_stats.st_gid != 0 assert file_stats.st_uid != 0 if (rm and (deferParam != FORGO)) or deferParam == RM or deferParam is None: # These containers should not exist assert containerIsRunning(container_name) is None, \ 'Container was not removed.' elif deferParam == STOP: # These containers should exist but be non-running assert containerIsRunning(container_name) == False, \ 'Container was not stopped.' else: # These containers will be running assert containerIsRunning(container_name) == True, \ 'Container was not running.' finally: # Clean up try: dockerKill(container_name, remove=True) os.remove(test_file) except: pass
[docs] def testDockerClean_CRx_FORGO(self): self.testDockerClean(caching=False, detached=False, rm=True, deferParam=FORGO)
[docs] def testDockerClean_CRx_STOP(self): self.testDockerClean(caching=False, detached=False, rm=True, deferParam=STOP)
[docs] def testDockerClean_CRx_RM(self): self.testDockerClean(caching=False, detached=False, rm=True, deferParam=RM)
[docs] @slow def testDockerClean_CRx_None(self): self.testDockerClean(caching=False, detached=False, rm=True, deferParam=None)
[docs] @slow def testDockerClean_CxD_FORGO(self): self.testDockerClean(caching=False, detached=True, rm=False, deferParam=FORGO)
[docs] @slow def testDockerClean_CxD_STOP(self): self.testDockerClean(caching=False, detached=True, rm=False, deferParam=STOP)
[docs] @slow def testDockerClean_CxD_RM(self): self.testDockerClean(caching=False, detached=True, rm=False, deferParam=RM)
[docs] @slow def testDockerClean_CxD_None(self): self.testDockerClean(caching=False, detached=True, rm=False, deferParam=None)
[docs] @slow def testDockerClean_Cxx_FORGO(self): self.testDockerClean(caching=False, detached=False, rm=False, deferParam=FORGO)
[docs] @slow def testDockerClean_Cxx_STOP(self): self.testDockerClean(caching=False, detached=False, rm=False, deferParam=STOP)
[docs] @slow def testDockerClean_Cxx_RM(self): self.testDockerClean(caching=False, detached=False, rm=False, deferParam=RM)
[docs] @slow def testDockerClean_Cxx_None(self): self.testDockerClean(caching=False, detached=False, rm=False, deferParam=None)
[docs] @slow def testDockerClean_xRx_FORGO(self): self.testDockerClean(caching=True, detached=False, rm=True, deferParam=FORGO)
[docs] @slow def testDockerClean_xRx_STOP(self): self.testDockerClean(caching=True, detached=False, rm=True, deferParam=STOP)
[docs] @slow def testDockerClean_xRx_RM(self): self.testDockerClean(caching=True, detached=False, rm=True, deferParam=RM)
[docs] @slow def testDockerClean_xRx_None(self): self.testDockerClean(caching=True, detached=False, rm=True, deferParam=None)
[docs] @slow def testDockerClean_xxD_FORGO(self): self.testDockerClean(caching=True, detached=True, rm=False, deferParam=FORGO)
[docs] @slow def testDockerClean_xxD_STOP(self): self.testDockerClean(caching=True, detached=True, rm=False, deferParam=STOP)
[docs] @slow def testDockerClean_xxD_RM(self): self.testDockerClean(caching=True, detached=True, rm=False, deferParam=RM)
[docs] @slow def testDockerClean_xxD_None(self): self.testDockerClean(caching=True, detached=True, rm=False, deferParam=None)
[docs] @slow def testDockerClean_xxx_FORGO(self): self.testDockerClean(caching=True, detached=False, rm=False, deferParam=FORGO)
[docs] @slow def testDockerClean_xxx_STOP(self): self.testDockerClean(caching=True, detached=False, rm=False, deferParam=STOP)
[docs] @slow def testDockerClean_xxx_RM(self): self.testDockerClean(caching=True, detached=False, rm=False, deferParam=RM)
[docs] @slow def testDockerClean_xxx_None(self): self.testDockerClean(caching=True, detached=False, rm=False, deferParam=None)
[docs] def testDockerPipeChain(self, caching=False): r""" Test for piping API for dockerCall(). Using this API (activated when list of argument lists is given as parameters), commands a piped together into a chain. ex: ``parameters=[ ['printf', 'x\n y\n'], ['wc', '-l'] ]`` should execute: ``printf 'x\n y\n' | wc -l`` """ options = Job.Runner.getDefaultOptions(os.path.join(self.tempDir, 'jobstore')) options.logLevel = self.dockerTestLogLevel options.workDir = self.tempDir options.clean = 'always' options.caching = caching A = Job.wrapJobFn(_testDockerPipeChainFn) rv = Job.Runner.startToil(A, options) logger.info('Container pipeline result: %s', repr(rv)) rv = rv.decode('utf-8') assert rv.strip() == '2'
[docs] def testDockerPipeChainErrorDetection(self, caching=False): """ By default, executing cmd1 | cmd2 | ... | cmdN, will only return an error if cmdN fails. This can lead to all manor of errors being silently missed. This tests to make sure that the piping API for dockerCall() throws an exception if non-last commands in the chain fail. """ options = Job.Runner.getDefaultOptions(os.path.join(self.tempDir, 'jobstore')) options.logLevel = self.dockerTestLogLevel options.workDir = self.tempDir options.clean = 'always' options.caching = caching A = Job.wrapJobFn(_testDockerPipeChainErrorFn) rv = Job.Runner.startToil(A, options) assert rv is True
[docs] def testNonCachingDockerChain(self): self.testDockerPipeChain(caching=True)
[docs] def testNonCachingDockerChainErrorDetection(self): self.testDockerPipeChainErrorDetection(caching=True)
[docs] def testDockerLogs(self, stream=False, demux=False): """Test for the different log outputs when deatch=False.""" working_dir = os.path.join(self.tempDir, 'working') script_file = os.path.join(working_dir, 'script.sh') os.makedirs(working_dir, exist_ok=True) options = Job.Runner.getDefaultOptions(os.path.join(self.tempDir, 'jobstore')) options.logLevel = self.dockerTestLogLevel options.workDir = working_dir options.clean = 'always' A = Job.wrapJobFn(_testDockerLogsFn, working_dir=working_dir, script_file=script_file, stream=stream, demux=demux) try: rv = Job.Runner.startToil(A, options) assert rv is True finally: try: os.remove(script_file) except: pass
[docs] def testDockerLogs_Stream(self): self.testDockerLogs(stream=True, demux=False)
[docs] def testDockerLogs_Demux(self): self.testDockerLogs(stream=False, demux=True)
[docs] def testDockerLogs_Demux_Stream(self): self.testDockerLogs(stream=True, demux=True)
def _testDockerCleanFn(job, working_dir, detached=None, rm=None, deferParam=None, containerName=None): """ Test function for test docker_clean. Runs a container with given flags and then dies leaving behind a zombie container. :param toil.job.Job job: job :param working_dir: See `work_dir=` in :func:`dockerCall` :param bool rm: See `rm=` in :func:`dockerCall` :param bool detached: See `detached=` in :func:`dockerCall` :param int deferParam: See `deferParam=` in :func:`dockerCall` :param str containerName: See `container_name=` in :func:`dockerCall` """ def killSelf(): test_file = os.path.join(working_dir, 'test.txt') # Kill the worker once we are sure the docker container is started while not os.path.exists(test_file): logger.debug('Waiting on the file created by spooky_container.') time.sleep(1) # By the time we reach here, we are sure the container is running. time.sleep(1) os.kill(os.getpid(), signal.SIGKILL) t = Thread(target=killSelf) # Make it a daemon thread so that thread failure doesn't hang tests. t.daemon = True t.start() apiDockerCall(job, image='quay.io/ucsc_cgl/spooky_test', working_dir=working_dir, deferParam=deferParam, containerName=containerName, detach=detached, remove=rm, privileged=True) def _testDockerPipeChainFn(job): """Return the result of a simple pipe chain. Should be 2.""" parameters = [['printf', 'x\n y\n'], ['wc', '-l']] return apiDockerCall(job, image='quay.io/ucsc_cgl/ubuntu:20.04', parameters=parameters, privileged=True) def _testDockerPipeChainErrorFn(job): """Return True if the command exit 1 | wc -l raises a ContainerError.""" parameters = [['exit', '1'], ['wc', '-l']] try: apiDockerCall(job, image='quay.io/ucsc_cgl/spooky_test', parameters=parameters) except ContainerError: return True return False def _testDockerLogsFn(job, working_dir, script_file, stream=False, demux=False): """Return True if the test succeeds. Otherwise Exception is raised.""" # we write a script file because the redirection operator, '>&2', is wrapped # in quotes when passed as parameters. import textwrap bash_script = textwrap.dedent(''' #!/bin/bash echo hello stdout ; echo hello stderr >&2 ; echo hello stdout ; echo hello stderr >&2 ; echo hello stdout ; echo hello stdout ; ''') with open(script_file, 'w') as file: file.write(bash_script) out = apiDockerCall(job, image='quay.io/ucsc_cgl/ubuntu:20.04', working_dir=working_dir, parameters=[script_file], volumes={working_dir: {'bind': working_dir, 'mode': 'rw'}}, entrypoint="/bin/bash", stdout=True, stderr=True, stream=stream, demux=demux) # we check the output length because order is not guaranteed. if stream: if demux: # a generator with tuples of (stdout, stderr) assert hasattr(out, '__iter__') for _ in range(6): stdout, stderr = next(out) if stdout: # len('hello stdout\n') == 13 assert len(stdout) == 13 elif stderr: assert len(stderr) == 13 else: assert False else: # a generator with bytes assert hasattr(out, '__iter__') for _ in range(6): assert len(next(out)) == 13 else: if demux: # a tuple of (stdout, stderr) stdout, stderr = out # len('hello stdout\n' * 4) == 52 assert len(stdout) == 52 # len('hello stderr\n' * 2) == 26 assert len(stderr) == 26 else: # a bytes object # len('hello stdout\n' * 4 + 'hello stderr\n' * 2) == 78 assert len(out) == 78 return True