import os
from argparse import ArgumentParser, Action, _AppendAction
from typing import Any, Optional, Union, Type, Callable, List, Dict, TYPE_CHECKING
from configargparse import SUPPRESS
import logging
from ruamel.yaml import YAML
from toil.lib.conversions import bytes2human, human2bytes, strtobool, opt_strtobool
from toil.batchSystems.options import add_all_batchsystem_options
from toil.provisioners import parse_node_types
from toil.statsAndLogging import add_logging_options
if TYPE_CHECKING:
from toil.job import AcceleratorRequirement
logger = logging.getLogger(__name__)
# aim to pack autoscaling jobs within a 30 minute block before provisioning a new node
defaultTargetTime = 1800
SYS_MAX_SIZE = 9223372036854775807
# sys.max_size on 64 bit systems is 9223372036854775807, so that 32-bit systems
# use the same number
[docs]
def parse_set_env(l: List[str]) -> Dict[str, Optional[str]]:
"""
Parse a list of strings of the form "NAME=VALUE" or just "NAME" into a dictionary.
Strings of the latter from will result in dictionary entries whose value is None.
>>> parse_set_env([])
{}
>>> parse_set_env(['a'])
{'a': None}
>>> parse_set_env(['a='])
{'a': ''}
>>> parse_set_env(['a=b'])
{'a': 'b'}
>>> parse_set_env(['a=a', 'a=b'])
{'a': 'b'}
>>> parse_set_env(['a=b', 'c=d'])
{'a': 'b', 'c': 'd'}
>>> parse_set_env(['a=b=c'])
{'a': 'b=c'}
>>> parse_set_env([''])
Traceback (most recent call last):
...
ValueError: Empty name
>>> parse_set_env(['=1'])
Traceback (most recent call last):
...
ValueError: Empty name
"""
d = {}
v: Optional[str] = None
for i in l:
try:
k, v = i.split('=', 1)
except ValueError:
k, v = i, None
if not k:
raise ValueError('Empty name')
d[k] = v
return d
[docs]
def parse_str_list(s: str) -> List[str]:
return [str(x) for x in s.split(",")]
[docs]
def parse_int_list(s: str) -> List[int]:
return [int(x) for x in s.split(",")]
[docs]
def iC(min_value: int, max_value: Optional[int] = None) -> Callable[[int], bool]:
"""Returns a function that checks if a given int is in the given half-open interval."""
assert isinstance(min_value, int)
if max_value is None:
return lambda x: min_value <= x
assert isinstance(max_value, int)
return lambda x: min_value <= x < max_value
[docs]
def fC(minValue: float, maxValue: Optional[float] = None) -> Callable[[float], bool]:
"""Returns a function that checks if a given float is in the given half-open interval."""
assert isinstance(minValue, float)
if maxValue is None:
return lambda x: minValue <= x
assert isinstance(maxValue, float)
return lambda x: minValue <= x < maxValue
[docs]
def parse_accelerator_list(specs: Optional[str]) -> List['AcceleratorRequirement']:
"""
Parse a string description of one or more accelerator requirements.
"""
if specs is None or len(specs) == 0:
# Not specified, so the default default is to not need any.
return []
# Otherwise parse each requirement.
from toil.job import parse_accelerator
return [parse_accelerator(r) for r in specs.split(',')]
[docs]
def parseBool(val: str) -> bool:
if val.lower() in ['true', 't', 'yes', 'y', 'on', '1']:
return True
elif val.lower() in ['false', 'f', 'no', 'n', 'off', '0']:
return False
else:
raise RuntimeError("Could not interpret \"%s\" as a boolean value" % val)
# This is kept in the outer scope as multiple batchsystem files use this
[docs]
def make_open_interval_action(min: Union[int, float], max: Optional[Union[int, float]] = None) -> Type[Action]:
"""
Returns an argparse action class to check if the input is within the given half-open interval.
ex:
Provided value to argparse must be within the interval [min, max)
Types of min and max must be the same (max may be None)
:param min: float/int
:param max: optional float/int
:return: argparse action class
"""
class IntOrFloatOpenAction(Action):
def __call__(self, parser: Any, namespace: Any, values: Any, option_string: Any = None) -> None:
if isinstance(min, int):
if max is not None: # for mypy
assert isinstance(max, int)
func = iC(min, max)
else:
func = fC(min, max)
try:
if not func(values):
if max is None:
raise parser.error(
f"{option_string} ({values}) must be at least {min}"
)
else:
raise parser.error(
f"{option_string} ({values}) must be at least {min} and strictly less than {max})"
)
except AssertionError:
raise RuntimeError(f"The {option_string} option has an invalid value: {values}")
setattr(namespace, self.dest, values)
return IntOrFloatOpenAction
[docs]
def parse_jobstore(jobstore_uri: str) -> str:
"""
Turn the jobstore string into it's corresponding URI
ex:
/path/to/jobstore -> file:/path/to/jobstore
If the jobstore string already is a URI, return the jobstore:
aws:/path/to/jobstore -> aws:/path/to/jobstore
:param jobstore_uri: string of the jobstore
:return: URI of the jobstore
"""
from toil.common import Toil
name, rest = Toil.parseLocator(jobstore_uri)
if name == 'file':
# We need to resolve relative paths early, on the leader, because the worker process
# may have a different working directory than the leader, e.g. under Mesos.
return Toil.buildLocator(name, os.path.abspath(rest))
else:
return jobstore_uri
JOBSTORE_HELP = ("The location of the job store for the workflow. "
"A job store holds persistent information about the jobs, stats, and files in a "
"workflow. If the workflow is run with a distributed batch system, the job "
"store must be accessible by all worker nodes. Depending on the desired "
"job store implementation, the location should be formatted according to "
"one of the following schemes:\n\n"
"file:<path> where <path> points to a directory on the file system\n\n"
"aws:<region>:<prefix> where <region> is the name of an AWS region like "
"us-west-2 and <prefix> will be prepended to the names of any top-level "
"AWS resources in use by job store, e.g. S3 buckets.\n\n "
"google:<project_id>:<prefix> TODO: explain\n\n"
"For backwards compatibility, you may also specify ./foo (equivalent to "
"file:./foo or just file:foo) or /bar (equivalent to file:/bar).")
[docs]
def add_base_toil_options(parser: ArgumentParser, jobstore_as_flag: bool = False, cwl: bool = False) -> None:
"""
Add base Toil command line options to the parser.
:param parser: Argument parser to add options to
:param jobstore_as_flag: make the job store option a --jobStore flag instead of a required jobStore positional argument.
:param cwl: whether CWL should be included or not
"""
# This is necessary as the config file must have at least one valid key to parse properly and we want to use a
# dummy key
config = parser.add_argument_group()
config.add_argument("--config_version", default=None, help=SUPPRESS)
# If using argparse instead of configargparse, this should just not parse when calling parse_args()
# default config value is set to none as defaults should already be populated at config init
config.add_argument('--config', dest='config', is_config_file_arg=True, default=None, metavar="PATH",
help="Get options from a config file.")
add_logging_options(parser)
parser.register("type", "bool", parseBool) # Custom type for arg=True/False.
# Core options
core_options = parser.add_argument_group(
title="Toil core options.",
description="Options to specify the location of the Toil workflow and "
"turn on stats collation about the performance of jobs."
)
if jobstore_as_flag:
core_options.add_argument('--jobstore', '--jobStore', dest='jobStore', type=parse_jobstore, default=None,
help=JOBSTORE_HELP)
else:
core_options.add_argument('jobStore', type=parse_jobstore, help=JOBSTORE_HELP)
class WorkDirAction(Action):
"""
Argparse action class to check that the provided --workDir exists
"""
def __call__(self, parser: Any, namespace: Any, values: Any, option_string: Any = None) -> None:
workDir = values
if workDir is not None:
workDir = os.path.abspath(workDir)
if not os.path.exists(workDir):
raise RuntimeError(f"The path provided to --workDir ({workDir}) does not exist.")
if len(workDir) > 80:
logger.warning(f'Length of workDir path "{workDir}" is {len(workDir)} characters. '
f'Consider setting a shorter path with --workPath or setting TMPDIR to something '
f'like "/tmp" to avoid overly long paths.')
setattr(namespace, self.dest, workDir)
class CoordinationDirAction(Action):
"""
Argparse action class to check that the provided --coordinationDir exists
"""
def __call__(self, parser: Any, namespace: Any, values: Any, option_string: Any = None) -> None:
coordination_dir = values
if coordination_dir is not None:
coordination_dir = os.path.abspath(coordination_dir)
if not os.path.exists(coordination_dir):
raise RuntimeError(
f"The path provided to --coordinationDir ({coordination_dir}) does not exist.")
setattr(namespace, self.dest, coordination_dir)
def make_closed_interval_action(min: Union[int, float], max: Optional[Union[int, float]] = None) -> Type[Action]:
"""
Returns an argparse action class to check if the input is within the given half-open interval.
ex:
Provided value to argparse must be within the interval [min, max]
:param min: int/float
:param max: optional int/float
:return: argparse action
"""
class ClosedIntOrFloatAction(Action):
def __call__(self, parser: Any, namespace: Any, values: Any, option_string: Any = None) -> None:
def is_within(x: Union[int, float]) -> bool:
if max is None:
return min <= x
else:
return min <= x <= max
try:
if not is_within(values):
raise parser.error(
f"{option_string} ({values}) must be within the range: [{min}, {'infinity' if max is None else max}]")
except AssertionError:
raise RuntimeError(f"The {option_string} option has an invalid value: {values}")
setattr(namespace, self.dest, values)
return ClosedIntOrFloatAction
core_options.add_argument("--workDir", dest="workDir", default=None, env_var="TOIL_WORKDIR", action=WorkDirAction,
metavar="PATH",
help="Absolute path to directory where temporary files generated during the Toil "
"run should be placed. Standard output and error from batch system jobs "
"(unless --noStdOutErr is set) will be placed in this directory. A cache directory "
"may be placed in this directory. Temp files and folders will be placed in a "
"directory toil-<workflowID> within workDir. The workflowID is generated by "
"Toil and will be reported in the workflow logs. Default is determined by the "
"variables (TMPDIR, TEMP, TMP) via mkdtemp. This directory needs to exist on "
"all machines running jobs; if capturing standard output and error from batch "
"system jobs is desired, it will generally need to be on a shared file system. "
"When sharing a cache between containers on a host, this directory must be "
"shared between the containers.")
core_options.add_argument("--coordinationDir", dest="coordination_dir", default=None,
env_var="TOIL_COORDINATION_DIR", action=CoordinationDirAction, metavar="PATH",
help="Absolute path to directory where Toil will keep state and lock files."
"When sharing a cache between containers on a host, this directory must be "
"shared between the containers.")
core_options.add_argument("--noStdOutErr", dest="noStdOutErr", default=False, action="store_true",
help="Do not capture standard output and error from batch system jobs.")
core_options.add_argument("--stats", dest="stats", default=False, action="store_true",
help="Records statistics about the toil workflow to be used by 'toil stats'.")
clean_choices = ['always', 'onError', 'never', 'onSuccess']
core_options.add_argument("--clean", dest="clean", choices=clean_choices, default="onSuccess",
help=f"Determines the deletion of the jobStore upon completion of the program. "
f"Choices: {clean_choices}. The --stats option requires information from the "
f"jobStore upon completion so the jobStore will never be deleted with that flag. "
f"If you wish to be able to restart the run, choose \'never\' or \'onSuccess\'. "
f"Default is \'never\' if stats is enabled, and \'onSuccess\' otherwise.")
core_options.add_argument("--cleanWorkDir", dest="cleanWorkDir", choices=clean_choices, default='always',
help=f"Determines deletion of temporary worker directory upon completion of a job. "
f"Choices: {clean_choices}. Default = always. WARNING: This option should be "
f"changed for debugging only. Running a full pipeline with this option could "
f"fill your disk with excessive intermediate data.")
core_options.add_argument("--clusterStats", dest="clusterStats", nargs='?', action='store', default=None,
metavar="OPT_PATH", const=os.getcwd(),
help="If enabled, writes out JSON resource usage statistics to a file. "
"The default location for this file is the current working directory, but an "
"absolute path can also be passed to specify where this file should be written. "
"This options only applies when using scalable batch systems.")
# Restarting the workflow options
restart_options = parser.add_argument_group(
title="Toil options for restarting an existing workflow.",
description="Allows the restart of an existing workflow"
)
restart_options.add_argument("--restart", dest="restart", default=False, action="store_true",
help="If --restart is specified then will attempt to restart existing workflow "
"at the location pointed to by the --jobStore option. Will raise an exception "
"if the workflow does not exist")
# Batch system options
batchsystem_options = parser.add_argument_group(
title="Toil options for specifying the batch system.",
description="Allows the specification of the batch system."
)
add_all_batchsystem_options(batchsystem_options)
# File store options
file_store_options = parser.add_argument_group(
title="Toil options for configuring storage.",
description="Allows configuring Toil's data storage."
)
link_imports = file_store_options.add_mutually_exclusive_group()
link_imports_help = ("When using a filesystem based job store, CWL input files are by default symlinked in. "
"Setting this option to True instead copies the files into the job store, which may protect "
"them from being modified externally. When set to False, as long as caching is enabled, "
"Toil will protect the file automatically by changing the permissions to read-only."
"default=%(default)s")
link_imports.add_argument("--symlinkImports", dest="symlinkImports", type=strtobool, default=True,
metavar="BOOL", help=link_imports_help)
move_exports = file_store_options.add_mutually_exclusive_group()
move_exports_help = ('When using a filesystem based job store, output files are by default moved to the '
'output directory, and a symlink to the moved exported file is created at the initial '
'location. Setting this option to True instead copies the files into the output directory. '
'Applies to filesystem-based job stores only.'
'default=%(default)s')
move_exports.add_argument("--moveOutputs", dest="moveOutputs", type=strtobool, default=False, metavar="BOOL",
help=move_exports_help)
caching = file_store_options.add_mutually_exclusive_group()
caching_help = "Enable or disable caching for your workflow, specifying this overrides default from job store"
caching.add_argument('--caching', dest='caching', type=opt_strtobool, default=None, metavar="BOOL",
help=caching_help)
# default is None according to PR 4299, seems to be generated at runtime
# Auto scaling options
autoscaling_options = parser.add_argument_group(
title="Toil options for autoscaling the cluster of worker nodes.",
description="Allows the specification of the minimum and maximum number of nodes in an autoscaled cluster, "
"as well as parameters to control the level of provisioning."
)
provisioner_choices = ['aws', 'gce', None]
# TODO: Better consolidate this provisioner arg and the one in provisioners/__init__.py?
autoscaling_options.add_argument('--provisioner', '-p', dest="provisioner", choices=provisioner_choices,
default=None,
help=f"The provisioner for cluster auto-scaling. This is the main Toil "
f"'--provisioner' option, and defaults to None for running on single "
f"machine and non-auto-scaling batch systems. The currently supported "
f"choices are {provisioner_choices}. The default is %(default)s.")
autoscaling_options.add_argument('--nodeTypes', default=[], dest="nodeTypes", type=parse_node_types,
action="extend",
help="Specifies a list of comma-separated node types, each of which is "
"composed of slash-separated instance types, and an optional spot "
"bid set off by a colon, making the node type preemptible. Instance "
"types may appear in multiple node types, and the same node type "
"may appear as both preemptible and non-preemptible.\n"
"Valid argument specifying two node types:\n"
"\tc5.4xlarge/c5a.4xlarge:0.42,t2.large\n"
"Node types:\n"
"\tc5.4xlarge/c5a.4xlarge:0.42 and t2.large\n"
"Instance types:\n"
"\tc5.4xlarge, c5a.4xlarge, and t2.large\n"
"Semantics:\n"
"\tBid $0.42/hour for either c5.4xlarge or c5a.4xlarge instances,\n"
"\ttreated interchangeably, while they are available at that price,\n"
"\tand buy t2.large instances at full price.\n"
"default=%(default)s")
class NodeExtendAction(_AppendAction):
"""
argparse Action class to remove the default value on first call, and act as an extend action after
"""
# with action=append/extend, the argparse default is always prepended to the option
# so make the CLI have priority by rewriting the option on the first run
def __init__(self, option_strings: Any, dest: Any, **kwargs: Any):
super().__init__(option_strings, dest, **kwargs)
self.is_default = True
def __call__(self, parser: Any, namespace: Any, values: Any, option_string: Any = None) -> None:
if self.is_default:
setattr(namespace, self.dest, values)
self.is_default = False
else:
super().__call__(parser, namespace, values, option_string)
autoscaling_options.add_argument('--maxNodes', default=[10], dest="maxNodes", type=parse_int_list,
action=NodeExtendAction, metavar="INT[,INT...]",
help=f"Maximum number of nodes of each type in the cluster, if using autoscaling, "
f"provided as a comma-separated list. The first value is used as a default "
f"if the list length is less than the number of nodeTypes. "
f"default=%(default)s")
autoscaling_options.add_argument('--minNodes', default=[0], dest="minNodes", type=parse_int_list,
action=NodeExtendAction, metavar="INT[,INT...]",
help="Mininum number of nodes of each type in the cluster, if using "
"auto-scaling. This should be provided as a comma-separated list of the "
"same length as the list of node types. default=%(default)s")
autoscaling_options.add_argument("--targetTime", dest="targetTime", default=defaultTargetTime, type=int,
action=make_closed_interval_action(0), metavar="INT",
help=f"Sets how rapidly you aim to complete jobs in seconds. Shorter times mean "
f"more aggressive parallelization. The autoscaler attempts to scale up/down "
f"so that it expects all queued jobs will complete within targetTime "
f"seconds. default=%(default)s")
autoscaling_options.add_argument("--betaInertia", dest="betaInertia", default=0.1, type=float,
action=make_closed_interval_action(0.0, 0.9), metavar="FLOAT",
help=f"A smoothing parameter to prevent unnecessary oscillations in the number "
f"of provisioned nodes. This controls an exponentially weighted moving "
f"average of the estimated number of nodes. A value of 0.0 disables any "
f"smoothing, and a value of 0.9 will smooth so much that few changes will "
f"ever be made. Must be between 0.0 and 0.9. default=%(default)s")
autoscaling_options.add_argument("--scaleInterval", dest="scaleInterval", default=60, type=int, metavar="INT",
help=f"The interval (seconds) between assessing if the scale of "
f"the cluster needs to change. default=%(default)s")
autoscaling_options.add_argument("--preemptibleCompensation", "--preemptableCompensation",
dest="preemptibleCompensation", default=0.0, type=float,
action=make_closed_interval_action(0.0, 1.0), metavar="FLOAT",
help=f"The preference of the autoscaler to replace preemptible nodes with "
f"non-preemptible nodes, when preemptible nodes cannot be started for some "
f"reason. This value must be between 0.0 and 1.0, inclusive. "
f"A value of 0.0 disables such "
f"compensation, a value of 0.5 compensates two missing preemptible nodes "
f"with a non-preemptible one. A value of 1.0 replaces every missing "
f"pre-emptable node with a non-preemptible one. default=%(default)s")
autoscaling_options.add_argument("--nodeStorage", dest="nodeStorage", default=50, type=int, metavar="INT",
help="Specify the size of the root volume of worker nodes when they are launched "
"in gigabytes. You may want to set this if your jobs require a lot of disk "
f"space. (default=%(default)s).")
autoscaling_options.add_argument('--nodeStorageOverrides', dest="nodeStorageOverrides", default=[],
type=parse_str_list, action="extend",
metavar="NODETYPE:NODESTORAGE[,NODETYPE:NODESTORAGE...]",
help="Comma-separated list of nodeType:nodeStorage that are used to override "
"the default value from --nodeStorage for the specified nodeType(s). "
"This is useful for heterogeneous jobs where some tasks require much more "
"disk than others.")
autoscaling_options.add_argument("--metrics", dest="metrics", default=False, type=strtobool, metavar="BOOL",
help="Enable the prometheus/grafana dashboard for monitoring CPU/RAM usage, "
"queue size, and issued jobs.")
autoscaling_options.add_argument("--assumeZeroOverhead", dest="assume_zero_overhead", default=False,
type=strtobool, metavar="BOOL",
help="Ignore scheduler and OS overhead and assume jobs can use every last byte "
"of memory and disk on a node when autoscaling.")
# Parameters to limit service jobs / detect service deadlocks
service_options = parser.add_argument_group(
title="Toil options for limiting the number of service jobs and detecting service deadlocks",
description="Allows the specification of the maximum number of service jobs in a cluster. By keeping "
"this limited we can avoid nodes occupied with services causing deadlocks."
)
service_options.add_argument("--maxServiceJobs", dest="maxServiceJobs", default=SYS_MAX_SIZE, type=int,
metavar="INT",
help=SUPPRESS if cwl else f"The maximum number of service jobs that can be run "
f"concurrently, excluding service jobs running on "
f"preemptible nodes. default=%(default)s")
service_options.add_argument("--maxPreemptibleServiceJobs", dest="maxPreemptibleServiceJobs",
default=SYS_MAX_SIZE,
type=int, metavar="INT",
help=SUPPRESS if cwl else "The maximum number of service jobs that can run "
"concurrently on preemptible nodes. default=%(default)s")
service_options.add_argument("--deadlockWait", dest="deadlockWait", default=60, type=int, metavar="INT",
help=SUPPRESS if cwl else f"Time, in seconds, to tolerate the workflow running only "
f"the same service jobs, with no jobs to use them, "
f"before declaring the workflow to be deadlocked and "
f"stopping. default=%(default)s")
service_options.add_argument("--deadlockCheckInterval", dest="deadlockCheckInterval", default=30, type=int,
metavar="INT",
help=SUPPRESS if cwl else "Time, in seconds, to wait between checks to see if the "
"workflow is stuck running only service jobs, with no jobs "
"to use them. Should be shorter than --deadlockWait. May "
"need to be increased if the batch system cannot enumerate "
"running jobs quickly enough, or if polling for running "
"jobs is placing an unacceptable load on a shared cluster."
f"default=%(default)s")
# Resource requirements
resource_options = parser.add_argument_group(
title="Toil options for cores/memory requirements.",
description="The options to specify default cores/memory requirements (if not specified by the jobs "
"themselves), and to limit the total amount of memory/cores requested from the batch system."
)
resource_help_msg = ('The {} amount of {} to request for a job. '
'Only applicable to jobs that do not specify an explicit value for this requirement. '
'{}. '
'Default is {}.')
cpu_note = 'Fractions of a core (for example 0.1) are supported on some batch systems [mesos, single_machine]'
disk_mem_note = 'Standard suffixes like K, Ki, M, Mi, G or Gi are supported'
accelerators_note = (
'Each accelerator specification can have a type (gpu [default], nvidia, amd, cuda, rocm, opencl, '
'or a specific model like nvidia-tesla-k80), and a count [default: 1]. If both a type and a count '
'are used, they must be separated by a colon. If multiple types of accelerators are '
'used, the specifications are separated by commas')
h2b = lambda x: human2bytes(str(x))
resource_options.add_argument('--defaultMemory', dest='defaultMemory', default="2.0 Gi", type=h2b,
action=make_open_interval_action(1),
help=resource_help_msg.format('default', 'memory', disk_mem_note,
bytes2human(2147483648)))
resource_options.add_argument('--defaultCores', dest='defaultCores', default=1, metavar='FLOAT', type=float,
action=make_open_interval_action(1.0),
help=resource_help_msg.format('default', 'cpu', cpu_note, str(1)))
resource_options.add_argument('--defaultDisk', dest='defaultDisk', default="2.0 Gi", metavar='INT', type=h2b,
action=make_open_interval_action(1),
help=resource_help_msg.format('default', 'disk', disk_mem_note,
bytes2human(2147483648)))
resource_options.add_argument('--defaultAccelerators', dest='defaultAccelerators', default=[],
metavar='ACCELERATOR[,ACCELERATOR...]', type=parse_accelerator_list, action="extend",
help=resource_help_msg.format('default', 'accelerators', accelerators_note, []))
resource_options.add_argument('--defaultPreemptible', '--defaultPreemptable', dest='defaultPreemptible',
metavar='BOOL',
type=strtobool, nargs='?', const=True, default=False,
help='Make all jobs able to run on preemptible (spot) nodes by default.')
resource_options.add_argument('--maxCores', dest='maxCores', default=SYS_MAX_SIZE, metavar='INT', type=int,
action=make_open_interval_action(1),
help=resource_help_msg.format('max', 'cpu', cpu_note, str(SYS_MAX_SIZE)))
resource_options.add_argument('--maxMemory', dest='maxMemory', default=SYS_MAX_SIZE, metavar='INT', type=h2b,
action=make_open_interval_action(1),
help=resource_help_msg.format('max', 'memory', disk_mem_note,
bytes2human(SYS_MAX_SIZE)))
resource_options.add_argument('--maxDisk', dest='maxDisk', default=SYS_MAX_SIZE, metavar='INT', type=h2b,
action=make_open_interval_action(1),
help=resource_help_msg.format('max', 'disk', disk_mem_note,
bytes2human(SYS_MAX_SIZE)))
# Retrying/rescuing jobs
job_options = parser.add_argument_group(
title="Toil options for rescuing/killing/restarting jobs.",
description="The options for jobs that either run too long/fail or get lost (some batch systems have issues!)."
)
job_options.add_argument("--retryCount", dest="retryCount", default=1, type=int,
action=make_open_interval_action(0), metavar="INT",
help=f"Number of times to retry a failing job before giving up and "
f"labeling job failed. default={1}")
job_options.add_argument("--enableUnlimitedPreemptibleRetries", "--enableUnlimitedPreemptableRetries",
dest="enableUnlimitedPreemptibleRetries",
type=strtobool, default=False, metavar="BOOL",
help="If set, preemptible failures (or any failure due to an instance getting "
"unexpectedly terminated) will not count towards job failures and --retryCount.")
job_options.add_argument("--doubleMem", dest="doubleMem", type=strtobool, default=False, metavar="BOOL",
help="If set, batch jobs which die to reaching memory limit on batch schedulers "
"will have their memory doubled and they will be retried. The remaining "
"retry count will be reduced by 1. Currently supported by LSF.")
job_options.add_argument("--maxJobDuration", dest="maxJobDuration", default=SYS_MAX_SIZE, type=int,
action=make_open_interval_action(1), metavar="INT",
help=f"Maximum runtime of a job (in seconds) before we kill it (this is a lower bound, "
f"and the actual time before killing the job may be longer). "
f"default=%(default)s")
job_options.add_argument("--rescueJobsFrequency", dest="rescueJobsFrequency", default=60, type=int,
action=make_open_interval_action(1), metavar="INT",
help=f"Period of time to wait (in seconds) between checking for missing/overlong jobs, "
f"that is jobs which get lost by the batch system. Expert parameter. "
f"default=%(default)s")
job_options.add_argument("--jobStoreTimeout", dest="job_store_timeout", default=30, type=float,
action=make_open_interval_action(0), metavar="FLOAT",
help=f"Maximum time (in seconds) to wait for a job's update to the job store "
f"before declaring it failed. default=%(default)s")
# Log management options
log_options = parser.add_argument_group(
title="Toil log management options.",
description="Options for how Toil should manage its logs."
)
log_options.add_argument("--maxLogFileSize", dest="maxLogFileSize", default=100 * 1024 * 1024, type=h2b,
action=make_open_interval_action(1),
help=f"The maximum size of a job log file to keep (in bytes), log files larger than "
f"this will be truncated to the last X bytes. Setting this option to zero will "
f"prevent any truncation. Setting this option to a negative value will truncate "
f"from the beginning. Default={bytes2human(100 * 1024 * 1024)}")
log_options.add_argument("--writeLogs", dest="writeLogs", nargs='?', action='store', default=None,
const=os.getcwd(), metavar="OPT_PATH",
help="Write worker logs received by the leader into their own files at the specified "
"path. Any non-empty standard output and error from failed batch system jobs will "
"also be written into files at this path. The current working directory will be "
"used if a path is not specified explicitly. Note: By default only the logs of "
"failed jobs are returned to leader. Set log level to 'debug' or enable "
"'--writeLogsFromAllJobs' to get logs back from successful jobs, and adjust "
"'maxLogFileSize' to control the truncation limit for worker logs.")
log_options.add_argument("--writeLogsGzip", dest="writeLogsGzip", nargs='?', action='store', default=None,
const=os.getcwd(), metavar="OPT_PATH",
help="Identical to --writeLogs except the logs files are gzipped on the leader.")
log_options.add_argument("--writeLogsFromAllJobs", dest="writeLogsFromAllJobs", type=strtobool,
default=False, metavar="BOOL",
help="Whether to write logs from all jobs (including the successful ones) without "
"necessarily setting the log level to 'debug'. Ensure that either --writeLogs "
"or --writeLogsGzip is set if enabling this option.")
log_options.add_argument("--writeMessages", dest="write_messages", default=None,
type=lambda x: None if x is None else os.path.abspath(x), metavar="PATH",
help="File to send messages from the leader's message bus to.")
log_options.add_argument("--realTimeLogging", dest="realTimeLogging", type=strtobool, default=False,
help="Enable real-time logging from workers to leader")
# Misc options
misc_options = parser.add_argument_group(
title="Toil miscellaneous options.",
description="Everything else."
)
misc_options.add_argument('--disableChaining', dest='disableChaining', type=strtobool, default=False,
metavar="BOOL",
help="Disables chaining of jobs (chaining uses one job's resource allocation "
"for its successor job if possible).")
misc_options.add_argument("--disableJobStoreChecksumVerification", dest="disableJobStoreChecksumVerification",
default=False, type=strtobool, metavar="BOOL",
help="Disables checksum verification for files transferred to/from the job store. "
"Checksum verification is a safety check to ensure the data is not corrupted "
"during transfer. Currently only supported for non-streaming AWS files.")
class SSEKeyAction(Action):
def __call__(self, parser: Any, namespace: Any, values: Any, option_string: Any = None) -> None:
if values is not None:
sse_key = values
if sse_key is None:
return
with open(sse_key) as f:
assert len(f.readline().rstrip()) == 32, 'SSE key appears to be invalid.'
setattr(namespace, self.dest, values)
misc_options.add_argument("--sseKey", dest="sseKey", default=None, action=SSEKeyAction, metavar="PATH",
help="Path to file containing 32 character key to be used for server-side encryption on "
"awsJobStore or googleJobStore. SSE will not be used if this flag is not passed.")
# yaml.safe_load is being deprecated, this is the suggested workaround
def yaml_safe_load(stream: Any) -> Any:
yaml = YAML(typ='safe', pure=True)
d = yaml.load(stream)
if isinstance(d, dict):
# this means the argument was a dictionary and is valid yaml (for configargparse)
return d
else:
# this means the argument is likely in it's string format (for CLI)
return parse_set_env(parse_str_list(stream))
class ExtendActionDict(Action):
"""
Argparse action class to implement the action="extend" functionality on dictionaries
"""
def __call__(self, parser: Any, namespace: Any, values: Any, option_string: Any = None) -> None:
items = getattr(namespace, self.dest, None)
assert items is not None # for mypy. This should never be None, esp. if called in setEnv
# note: this will overwrite existing entries
items.update(values)
misc_options.add_argument("--setEnv", '-e', metavar='NAME=VALUE or NAME', dest="environment",
default={}, type=yaml_safe_load, action=ExtendActionDict,
help="Set an environment variable early on in the worker. If VALUE is null, it will "
"be looked up in the current environment. Independently of this option, the worker "
"will try to emulate the leader's environment before running a job, except for "
"some variables known to vary across systems. Using this option, a variable can "
"be injected into the worker process itself before it is started.")
misc_options.add_argument("--servicePollingInterval", dest="servicePollingInterval", default=60.0, type=float,
action=make_open_interval_action(0.0), metavar="FLOAT",
help=f"Interval of time service jobs wait between polling for the existence of the "
f"keep-alive flag. Default: {60.0}")
misc_options.add_argument('--forceDockerAppliance', dest='forceDockerAppliance', type=strtobool, default=False,
metavar="BOOL",
help='Disables sanity checking the existence of the docker image specified by '
'TOIL_APPLIANCE_SELF, which Toil uses to provision mesos for autoscaling.')
misc_options.add_argument('--statusWait', dest='statusWait', type=int, default=3600, metavar="INT",
help="Seconds to wait between reports of running jobs.")
misc_options.add_argument('--disableProgress', dest='disableProgress', action="store_true", default=False,
help="Disables the progress bar shown when standard error is a terminal.")
# Debug options
debug_options = parser.add_argument_group(
title="Toil debug options.",
description="Debug options for finding problems or helping with testing."
)
debug_options.add_argument("--debugWorker", dest="debugWorker", default=False, action="store_true",
help="Experimental no forking mode for local debugging. Specifically, workers "
"are not forked and stderr/stdout are not redirected to the log.")
debug_options.add_argument("--disableWorkerOutputCapture", dest="disableWorkerOutputCapture", default=False,
action="store_true",
help="Let worker output go to worker's standard out/error instead of per-job logs.")
debug_options.add_argument("--badWorker", dest="badWorker", default=0.0, type=float,
action=make_closed_interval_action(0.0, 1.0), metavar="FLOAT",
help=f"For testing purposes randomly kill --badWorker proportion of jobs using "
f"SIGKILL. default={0.0}")
debug_options.add_argument("--badWorkerFailInterval", dest="badWorkerFailInterval", default=0.01, type=float,
action=make_open_interval_action(0.0), metavar="FLOAT", # might be cyclical?
help=f"When killing the job pick uniformly within the interval from 0.0 to "
f"--badWorkerFailInterval seconds after the worker starts. "
f"default={0.01}")
# All deprecated options:
# These are deprecated in favor of a simpler option
# ex: noLinkImports and linkImports can be simplified into a single link_imports argument
link_imports.add_argument("--noLinkImports", dest="linkImports", action="store_false",
help=SUPPRESS)
link_imports.add_argument("--linkImports", dest="linkImports", action="store_true",
help=SUPPRESS)
link_imports.set_defaults(linkImports=None)
move_exports.add_argument("--moveExports", dest="moveExports", action="store_true",
help=SUPPRESS)
move_exports.add_argument("--noMoveExports", dest="moveExports", action="store_false",
help=SUPPRESS)
link_imports.set_defaults(moveExports=None)
# dest is set to enableCaching to not conflict with the current --caching destination
caching.add_argument('--disableCaching', dest='enableCaching', action='store_false', help=SUPPRESS)
caching.set_defaults(enableCaching=None)