# Copyright (C) 2015-2021 Regents of the University of California
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import base64
import logging
import os
import re
import struct
from shlex import quote
from typing import Optional
import requests
import docker
from docker.errors import (
ContainerError,
ImageNotFound,
NotFound,
create_api_error_from_http_exception,
)
from docker.utils.socket import consume_socket_output, demux_adaptor
from toil.lib.accelerators import get_host_accelerator_numbers
logger = logging.getLogger(__name__)
FORGO = 0
STOP = 1
RM = 2
[docs]
def dockerCheckOutput(*args, **kwargs):
raise RuntimeError(
"dockerCheckOutput() using subprocess.check_output() has been removed, "
"please switch to apiDockerCall()."
)
[docs]
def dockerCall(*args, **kwargs):
raise RuntimeError(
"dockerCall() using subprocess.check_output() has been removed, "
"please switch to apiDockerCall()."
)
[docs]
def subprocessDockerCall(*args, **kwargs):
raise RuntimeError(
"subprocessDockerCall() has been removed, " "please switch to apiDockerCall()."
)
[docs]
def apiDockerCall(
job,
image,
parameters=None,
deferParam=None,
volumes=None,
working_dir=None,
containerName=None,
entrypoint=None,
detach=False,
log_config=None,
auto_remove=None,
remove=False,
user=None,
environment=None,
stdout=None,
stderr=False,
stream=False,
demux=False,
streamfile=None,
accelerators: Optional[list[int]] = None,
timeout=365 * 24 * 60 * 60,
**kwargs,
):
"""
A toil wrapper for the python docker API.
Docker API Docs: https://docker-py.readthedocs.io/en/stable/index.html
Docker API Code: https://github.com/docker/docker-py
This implements docker's python API within toil so that calls are run as
jobs, with the intention that failed/orphaned docker jobs be handled
appropriately.
Example of using dockerCall in toil to index a FASTA file with SAMtools::
def toil_job(job):
working_dir = job.fileStore.getLocalTempDir()
path = job.fileStore.readGlobalFile(ref_id,
os.path.join(working_dir, 'ref.fasta')
parameters = ['faidx', path]
apiDockerCall(job,
image='quay.io/ucgc_cgl/samtools:latest',
working_dir=working_dir,
parameters=parameters)
Note that when run with detach=False, or with detach=True and stdout=True
or stderr=True, this is a blocking call. When run with detach=True and
without output capture, the container is started and returned without
waiting for it to finish.
:param toil.Job.job job: The Job instance for the calling function.
:param str image: Name of the Docker image to be used.
(e.g. 'quay.io/ucsc_cgl/samtools:latest')
:param list[str] parameters: A list of string elements. If there are
multiple elements, these will be joined with
spaces. This handling of multiple elements
provides backwards compatibility with previous
versions which called docker using
subprocess.check_call().
If list of lists: list[list[str]], then treat
as successive commands chained with pipe.
:param str working_dir: The working directory.
:param int deferParam: Action to take on the container upon job completion.
FORGO (0) leaves the container untouched and running.
STOP (1) Sends SIGTERM, then SIGKILL if necessary to the container.
RM (2) Immediately send SIGKILL to the container. This is the default
behavior if deferParam is set to None.
:param str name: The name/ID of the container.
:param str entrypoint: Prepends commands sent to the container. See:
https://docker-py.readthedocs.io/en/stable/containers.html
:param bool detach: Run the container in detached mode. (equivalent to '-d')
:param bool stdout: Return logs from STDOUT when detach=False (default: True).
Block and capture stdout to a file when detach=True
(default: False). Output capture defaults to output.log,
and can be specified with the "streamfile" kwarg.
:param bool stderr: Return logs from STDERR when detach=False (default: False).
Block and capture stderr to a file when detach=True
(default: False). Output capture defaults to output.log,
and can be specified with the "streamfile" kwarg.
:param bool stream: If True and detach=False, return a log generator instead
of a string. Ignored if detach=True. (default: False).
:param bool demux: Similar to `demux` in container.exec_run(). If True and
detach=False, returns a tuple of (stdout, stderr). If
stream=True, returns a log generator with tuples of
(stdout, stderr). Ignored if detach=True. (default: False).
:param str streamfile: Collect container output to this file if detach=True and
stderr and/or stdout are True. Defaults to "output.log".
:param dict log_config: Specify the logs to return from the container. See:
https://docker-py.readthedocs.io/en/stable/containers.html
:param bool remove: Remove the container on exit or not.
:param str user: The container will be run with the privileges of
the user specified. Can be an actual name, such
as 'root' or 'lifeisaboutfishtacos', or it can be
the uid or gid of the user ('0' is root; '1000' is
an example of a less privileged uid or gid), or a
complement of the uid:gid (RECOMMENDED), such as
'0:0' (root user : root group) or '1000:1000'
(some other user : some other user group).
:param environment: Allows one to set environment variables inside of the
container, such as:
:param int timeout: Use the given timeout in seconds for interactions with
the Docker daemon. Note that the underlying docker module is
not always able to abort ongoing reads and writes in order
to respect the timeout. Defaults to 1 year (i.e. wait
essentially indefinitely).
:param accelerators: Toil accelerator numbers (usually GPUs) to forward to
the container. These are interpreted in the current
Python process's environment. See
toil.lib.accelerators.get_individual_local_accelerators()
for the menu of available accelerators.
:param kwargs: Additional keyword arguments supplied to the docker API's
run command. The list is 75 keywords total, for examples
and full documentation see:
https://docker-py.readthedocs.io/en/stable/containers.html
:returns: Returns the standard output/standard error text, as requested, when
detach=False. Returns the underlying
docker.models.containers.Container object from the Docker API when
detach=True.
"""
# make certain that files have the correct permissions
thisUser = os.getuid()
thisGroup = os.getgid()
if user is None:
user = str(thisUser) + ":" + str(thisGroup)
if containerName is None:
containerName = getContainerName(job)
if working_dir is None:
working_dir = os.getcwd()
if volumes is None:
volumes = {working_dir: {"bind": "/data", "mode": "rw"}}
for volume in volumes:
if not os.path.exists(volume):
os.makedirs(volume, exist_ok=True)
if parameters is None:
parameters = []
# If 'parameters' is a list of lists, treat each list as a separate command
# and chain with pipes.
if len(parameters) > 0 and type(parameters[0]) is list:
if entrypoint is None:
entrypoint = ["/bin/bash", "-c"]
chain_params = [
" ".join(quote(arg) for arg in command) for command in parameters
]
command = " | ".join(chain_params)
pipe_prefix = "set -eo pipefail && "
command = [pipe_prefix + command]
logger.debug("Calling docker with: " + repr(command))
# If 'parameters' is a normal list, join all elements into a single string
# element, quoting and escaping each element.
# Example: ['echo','the Oread'] becomes: ["echo 'the Oread'"]
# Note that this is still a list, and the docker API prefers this as best
# practice:
# http://docker-py.readthedocs.io/en/stable/containers.html
elif len(parameters) > 0 and type(parameters) is list:
command = " ".join(quote(arg) for arg in parameters)
logger.debug("Calling docker with: " + repr(command))
# If the 'parameters' lists are empty, they are respecified as None, which
# tells the API to simply create and run the container
else:
entrypoint = None
command = None
working_dir = os.path.abspath(working_dir)
# Ensure the user has passed a valid value for deferParam
if deferParam not in (None, FORGO, STOP, RM):
raise RuntimeError("Please provide a valid value for deferParam.")
client = docker.from_env(version="auto", timeout=timeout)
if deferParam is None:
deferParam = RM
if deferParam == STOP:
job.defer(dockerStop, containerName)
if deferParam == FORGO:
# Leave the container untouched and running
pass
elif deferParam == RM:
job.defer(dockerKill, containerName, remove=True)
elif remove:
job.defer(dockerKill, containerName, remove=True)
if auto_remove is None:
auto_remove = remove
device_requests = []
if accelerators:
# Map accelerator numbers to host numbers
host_accelerators = []
accelerator_mapping = get_host_accelerator_numbers()
for our_number in accelerators:
if our_number >= len(accelerator_mapping):
raise RuntimeError(
f"Cannot forward accelerator {our_number} because only "
f"{len(accelerator_mapping)} accelerators are available "
f"to this job."
)
host_accelerators.append(accelerator_mapping[our_number])
# TODO: Here we assume that the host accelerators are all GPUs
device_requests.append(
docker.types.DeviceRequest(
device_ids=[",".join(host_accelerators)], capabilities=[["gpu"]]
)
)
try:
if detach is False:
# When detach is False, this returns stdout normally:
# >>> client.containers.run("ubuntu:latest", "echo hello world")
# 'hello world\n'
if stdout is None:
stdout = True
out = client.containers.run(
image=image,
command=command,
working_dir=working_dir,
entrypoint=entrypoint,
name=containerName,
detach=False,
volumes=volumes,
auto_remove=auto_remove,
stdout=stdout,
stderr=stderr,
# to get the generator if demux=True
stream=stream or demux,
remove=remove,
log_config=log_config,
user=user,
environment=environment,
device_requests=device_requests,
**kwargs,
)
if demux is False:
return out
# If demux is True (i.e.: we want STDOUT and STDERR separated), we need to decode
# the raw response from the docker API and preserve the stream type this time.
response = out._response
gen = (
demux_adaptor(*frame)
for frame in _multiplexed_response_stream_helper(response)
)
if stream:
return gen
else:
return consume_socket_output(frames=gen, demux=True)
else:
if (stdout or stderr) and log_config is None:
logger.warning(
"stdout or stderr specified, but log_config is not set. "
'Defaulting to "journald".'
)
log_config = dict(type="journald")
if stdout is None:
stdout = False
# When detach is True, this returns a container object:
# >>> client.containers.run("bfirsh/reticulate-splines", detach=True)
# <Container '45e6d2de7c54'>
container = client.containers.run(
image=image,
command=command,
working_dir=working_dir,
entrypoint=entrypoint,
name=containerName,
detach=True,
volumes=volumes,
auto_remove=auto_remove,
stdout=stdout,
stderr=stderr,
stream=stream,
remove=remove,
log_config=log_config,
user=user,
environment=environment,
device_requests=device_requests,
**kwargs,
)
if stdout or stderr:
if streamfile is None:
streamfile = "output.log"
with open(streamfile, "wb") as f:
# stream=True makes this loop blocking; we will loop until
# the container stops and there is no more output.
for line in container.logs(
stdout=stdout, stderr=stderr, stream=True
):
f.write(line.encode() if isinstance(line, str) else line)
# If we didn't capture output, the caller will need to .wait() on
# the container to know when it is done. Even if we did capture
# output, the caller needs the container to get at the exit code.
return container
except ContainerError:
logger.error("Docker had non-zero exit. Check your command: " + repr(command))
raise
except ImageNotFound:
logger.error("Docker image not found.")
raise
except requests.exceptions.HTTPError as e:
logger.error("The server returned an error.")
raise create_api_error_from_http_exception(e)
[docs]
def dockerKill(
container_name: str,
gentleKill: bool = False,
remove: bool = False,
timeout: int = 365 * 24 * 60 * 60,
) -> None:
"""
Immediately kills a container. Equivalent to "docker kill":
https://docs.docker.com/engine/reference/commandline/kill/
:param container_name: Name of the container being killed.
:param gentleKill: If True, trigger a graceful shutdown.
:param remove: If True, remove the container after it exits.
:param int timeout: Use the given timeout in seconds for interactions with
the Docker daemon. Note that the underlying docker module is
not always able to abort ongoing reads and writes in order
to respect the timeout. Defaults to 1 year (i.e. wait
essentially indefinitely).
"""
client = docker.from_env(version="auto", timeout=timeout)
try:
this_container = client.containers.get(container_name)
while this_container.status == "running":
if gentleKill is False:
client.containers.get(container_name).kill()
else:
client.containers.get(container_name).stop()
this_container = client.containers.get(container_name)
if remove:
this_container.remove()
except NotFound:
logger.debug(
f"Attempted to stop container ({container_name}), but container != exist."
)
except requests.exceptions.HTTPError as e:
logger.debug(
f"Attempted to stop container ({container_name}), but server gave an error:"
)
raise create_api_error_from_http_exception(e)
[docs]
def dockerStop(container_name: str, remove: bool = False) -> None:
"""
Gracefully kills a container. Equivalent to "docker stop":
https://docs.docker.com/engine/reference/commandline/stop/
:param container_name: Name of the container being stopped.
:param remove: If True, remove the container after it exits.
"""
dockerKill(container_name, gentleKill=True, remove=remove)
[docs]
def containerIsRunning(container_name: str, timeout: int = 365 * 24 * 60 * 60):
"""
Checks whether the container is running or not.
:param container_name: Name of the container being checked.
:param int timeout: Use the given timeout in seconds for interactions with
the Docker daemon. Note that the underlying docker module is not always
able to abort ongoing reads and writes in order to respect the timeout.
Defaults to 1 year (i.e. wait essentially indefinitely).
:returns: True if status is 'running', False if status is anything else,
and None if the container does not exist.
"""
client = docker.from_env(version="auto", timeout=timeout)
try:
this_container = client.containers.get(container_name)
if this_container.status == "running":
return True
else:
# this_container.status == 'exited', 'restarting', or 'paused'
return False
except NotFound:
return None
except requests.exceptions.HTTPError as e:
logger.debug("Server error attempting to call container: %s", container_name)
raise create_api_error_from_http_exception(e)
[docs]
def getContainerName(job):
"""
Create a random string including the job name, and return it. Name will
match ``[a-zA-Z0-9][a-zA-Z0-9_.-]``.
"""
parts = [
"toil",
str(job.description),
base64.b64encode(os.urandom(9), b"-_").decode("utf-8"),
]
name = re.sub("[^a-zA-Z0-9_.-]", "", "--".join(parts))
return name
def _multiplexed_response_stream_helper(response):
"""
A generator of multiplexed data blocks coming from a response stream modified from:
https://github.com/docker/docker-py/blob/4.3.1-release/docker/api/client.py#L370
:param response: requests.Response
:return: a generator with tuples of (stream_type, data)
"""
while True:
header = response.raw.read(8)
if not header:
break
# header is 8 bytes with format: {STREAM_TYPE, 0, 0, 0, SIZE1, SIZE2, SIZE3, SIZE4}
# protocol: https://docs.docker.com/engine/api/v1.24/#attach-to-a-container
stream_type, length = struct.unpack(">BxxxL", header)
if not length:
continue
data = response.raw.read(length)
if not data:
break
yield stream_type, data