Source code for toil.options.common

import logging
import os
from argparse import Action, ArgumentParser, _AppendAction
from typing import TYPE_CHECKING, Any, Callable, Optional, Union

from configargparse import SUPPRESS
from ruamel.yaml import YAML

from toil.batchSystems.options import add_all_batchsystem_options
from toil.lib.conversions import bytes2human, human2bytes, opt_strtobool, strtobool
from toil.provisioners import parse_node_types
from toil.statsAndLogging import add_logging_options

if TYPE_CHECKING:
    from toil.job import AcceleratorRequirement

logger = logging.getLogger(__name__)

# aim to pack autoscaling jobs within a 30 minute block before provisioning a new node
defaultTargetTime = 1800
SYS_MAX_SIZE = 9223372036854775807
# sys.max_size on 64 bit systems is 9223372036854775807, so that 32-bit systems
# use the same number


[docs] def parse_set_env(l: list[str]) -> dict[str, Optional[str]]: """ Parse a list of strings of the form "NAME=VALUE" or just "NAME" into a dictionary. Strings of the latter from will result in dictionary entries whose value is None. >>> parse_set_env([]) {} >>> parse_set_env(['a']) {'a': None} >>> parse_set_env(['a=']) {'a': ''} >>> parse_set_env(['a=b']) {'a': 'b'} >>> parse_set_env(['a=a', 'a=b']) {'a': 'b'} >>> parse_set_env(['a=b', 'c=d']) {'a': 'b', 'c': 'd'} >>> parse_set_env(['a=b=c']) {'a': 'b=c'} >>> parse_set_env(['']) Traceback (most recent call last): ... ValueError: Empty name >>> parse_set_env(['=1']) Traceback (most recent call last): ... ValueError: Empty name """ d = {} v: Optional[str] = None for i in l: try: k, v = i.split("=", 1) except ValueError: k, v = i, None if not k: raise ValueError("Empty name") d[k] = v return d
[docs] def parse_str_list(s: str) -> list[str]: return [str(x) for x in s.split(",")]
[docs] def parse_int_list(s: str) -> list[int]: return [int(x) for x in s.split(",")]
[docs] def iC(min_value: int, max_value: Optional[int] = None) -> Callable[[int], bool]: """Returns a function that checks if a given int is in the given half-open interval.""" assert isinstance(min_value, int) if max_value is None: return lambda x: min_value <= x assert isinstance(max_value, int) return lambda x: min_value <= x < max_value
[docs] def fC(minValue: float, maxValue: Optional[float] = None) -> Callable[[float], bool]: """Returns a function that checks if a given float is in the given half-open interval.""" assert isinstance(minValue, float) if maxValue is None: return lambda x: minValue <= x assert isinstance(maxValue, float) return lambda x: minValue <= x < maxValue
[docs] def parse_accelerator_list(specs: Optional[str]) -> list["AcceleratorRequirement"]: """ Parse a string description of one or more accelerator requirements. """ if specs is None or len(specs) == 0: # Not specified, so the default default is to not need any. return [] # Otherwise parse each requirement. from toil.job import parse_accelerator return [parse_accelerator(r) for r in specs.split(",")]
[docs] def parseBool(val: str) -> bool: if val.lower() in ["true", "t", "yes", "y", "on", "1"]: return True elif val.lower() in ["false", "f", "no", "n", "off", "0"]: return False else: raise RuntimeError('Could not interpret "%s" as a boolean value' % val)
# This is kept in the outer scope as multiple batchsystem files use this
[docs] def make_open_interval_action( min: Union[int, float], max: Optional[Union[int, float]] = None ) -> type[Action]: """ Returns an argparse action class to check if the input is within the given half-open interval. ex: Provided value to argparse must be within the interval [min, max) Types of min and max must be the same (max may be None) :param min: float/int :param max: optional float/int :return: argparse action class """ class IntOrFloatOpenAction(Action): def __call__( self, parser: Any, namespace: Any, values: Any, option_string: Any = None ) -> None: if isinstance(min, int): if max is not None: # for mypy assert isinstance(max, int) func = iC(min, max) else: func = fC(min, max) try: if not func(values): if max is None: raise parser.error( f"{option_string} ({values}) must be at least {min}" ) else: raise parser.error( f"{option_string} ({values}) must be at least {min} and strictly less than {max})" ) except AssertionError: raise RuntimeError( f"The {option_string} option has an invalid value: {values}" ) setattr(namespace, self.dest, values) return IntOrFloatOpenAction
[docs] def parse_jobstore(jobstore_uri: str) -> str: """ Turn the jobstore string into it's corresponding URI ex: /path/to/jobstore -> file:/path/to/jobstore If the jobstore string already is a URI, return the jobstore: aws:/path/to/jobstore -> aws:/path/to/jobstore :param jobstore_uri: string of the jobstore :return: URI of the jobstore """ from toil.common import Toil name, rest = Toil.parseLocator(jobstore_uri) if name == "file": # We need to resolve relative paths early, on the leader, because the worker process # may have a different working directory than the leader, e.g. under Mesos. return Toil.buildLocator(name, os.path.abspath(rest)) else: return jobstore_uri
JOBSTORE_HELP = ( "The location of the job store for the workflow. " "A job store holds persistent information about the jobs, stats, and files in a " "workflow. If the workflow is run with a distributed batch system, the job " "store must be accessible by all worker nodes. Depending on the desired " "job store implementation, the location should be formatted according to " "one of the following schemes:\n\n" "file:<path> where <path> points to a directory on the file system\n\n" "aws:<region>:<prefix> where <region> is the name of an AWS region like " "us-west-2 and <prefix> will be prepended to the names of any top-level " "AWS resources in use by job store, e.g. S3 buckets.\n\n " "google:<project_id>:<prefix> TODO: explain\n\n" "For backwards compatibility, you may also specify ./foo (equivalent to " "file:./foo or just file:foo) or /bar (equivalent to file:/bar)." )
[docs] def add_base_toil_options( parser: ArgumentParser, jobstore_as_flag: bool = False, cwl: bool = False ) -> None: """ Add base Toil command line options to the parser. :param parser: Argument parser to add options to :param jobstore_as_flag: make the job store option a --jobStore flag instead of a required jobStore positional argument. :param cwl: whether CWL should be included or not """ # This is necessary as the config file must have at least one valid key to parse properly and we want to use a # dummy key config = parser.add_argument_group() config.add_argument("--config_version", default=None, help=SUPPRESS) # If using argparse instead of configargparse, this should just not parse when calling parse_args() # default config value is set to none as defaults should already be populated at config init config.add_argument( "--config", dest="config", is_config_file_arg=True, default=None, metavar="PATH", help="Get options from a config file.", ) add_logging_options(parser) parser.register("type", "bool", parseBool) # Custom type for arg=True/False. # Core options core_options = parser.add_argument_group( title="Toil core options", description="Options to specify the location of the Toil workflow and " "turn on stats collation about the performance of jobs.", ) if jobstore_as_flag: core_options.add_argument( "--jobstore", "--jobStore", dest="jobStore", type=parse_jobstore, default=None, help=JOBSTORE_HELP, ) else: core_options.add_argument("jobStore", type=parse_jobstore, help=JOBSTORE_HELP) class WorkDirAction(Action): """ Argparse action class to check that the provided --workDir exists """ def __call__( self, parser: Any, namespace: Any, values: Any, option_string: Any = None ) -> None: workDir = values if workDir is not None: workDir = os.path.abspath(workDir) if not os.path.exists(workDir): raise RuntimeError( f"The path provided to --workDir ({workDir}) does not exist." ) if len(workDir) > 80: logger.warning( f'Length of workDir path "{workDir}" is {len(workDir)} characters. ' f"Consider setting a shorter path with --workPath or setting TMPDIR to something " f'like "/tmp" to avoid overly long paths.' ) setattr(namespace, self.dest, workDir) class CoordinationDirAction(Action): """ Argparse action class to check that the provided --coordinationDir exists """ def __call__( self, parser: Any, namespace: Any, values: Any, option_string: Any = None ) -> None: coordination_dir = values if coordination_dir is not None: coordination_dir = os.path.abspath(coordination_dir) if not os.path.exists(coordination_dir): raise RuntimeError( f"The path provided to --coordinationDir ({coordination_dir}) does not exist." ) setattr(namespace, self.dest, coordination_dir) def make_closed_interval_action( min: Union[int, float], max: Optional[Union[int, float]] = None ) -> type[Action]: """ Returns an argparse action class to check if the input is within the given half-open interval. ex: Provided value to argparse must be within the interval [min, max] :param min: int/float :param max: optional int/float :return: argparse action """ class ClosedIntOrFloatAction(Action): def __call__( self, parser: Any, namespace: Any, values: Any, option_string: Any = None, ) -> None: def is_within(x: Union[int, float]) -> bool: if max is None: return min <= x else: return min <= x <= max try: if not is_within(values): raise parser.error( f"{option_string} ({values}) must be within the range: [{min}, {'infinity' if max is None else max}]" ) except AssertionError: raise RuntimeError( f"The {option_string} option has an invalid value: {values}" ) setattr(namespace, self.dest, values) return ClosedIntOrFloatAction core_options.add_argument( "--workDir", dest="workDir", default=None, env_var="TOIL_WORKDIR", action=WorkDirAction, metavar="PATH", help="Absolute path to directory where temporary files generated during the Toil " "run should be placed. Standard output and error from batch system jobs " "(unless --noStdOutErr is set) will be placed in this directory. A cache directory " "may be placed in this directory. Temp files and folders will be placed in a " "directory toil-<workflowID> within workDir. The workflowID is generated by " "Toil and will be reported in the workflow logs. Default is determined by the " "variables (TMPDIR, TEMP, TMP) via mkdtemp. This directory needs to exist on " "all machines running jobs; if capturing standard output and error from batch " "system jobs is desired, it will generally need to be on a shared file system. " "When sharing a cache between containers on a host, this directory must be " "shared between the containers.", ) core_options.add_argument( "--coordinationDir", dest="coordination_dir", default=None, env_var="TOIL_COORDINATION_DIR", action=CoordinationDirAction, metavar="PATH", help="Absolute path to directory where Toil will keep state and lock files. " "When sharing a cache between containers on a host, this directory must be " "shared between the containers.", ) core_options.add_argument( "--noStdOutErr", dest="noStdOutErr", default=False, action="store_true", help="Do not capture standard output and error from batch system jobs.", ) # TODO: Should this be deprecated since we always save stats now for history tracking? core_options.add_argument( "--stats", dest="stats", default=False, action="store_true", help="Keep statistics about the toil workflow to be used by 'toil stats'.", ) clean_choices = ["always", "onError", "never", "onSuccess"] core_options.add_argument( "--clean", dest="clean", choices=clean_choices, default="onSuccess", help=f"Determines the deletion of the jobStore upon completion of the program. " f"Choices: {clean_choices}. The --stats option requires information from the " f"jobStore upon completion so the jobStore will never be deleted with that flag. " f"If you wish to be able to restart the run, choose 'never' or 'onSuccess'. " f"Default is 'never' if stats is enabled, and 'onSuccess' otherwise.", ) core_options.add_argument( "--cleanWorkDir", dest="cleanWorkDir", choices=clean_choices, default="always", help=f"Determines deletion of temporary worker directory upon completion of a job. " f"Choices: {clean_choices}. Default = always. WARNING: This option should be " f"changed for debugging only. Running a full pipeline with this option could " f"fill your disk with excessive intermediate data.", ) core_options.add_argument( "--clusterStats", dest="clusterStats", nargs="?", action="store", default=None, metavar="OPT_PATH", const=os.getcwd(), help="If enabled, writes out JSON resource usage statistics to a file. " "The default location for this file is the current working directory, but an " "absolute path can also be passed to specify where this file should be written. " "This options only applies when using scalable batch systems.", ) # Restarting the workflow options restart_options = parser.add_argument_group( title="Toil options for restarting an existing workflow", description="Allows the restart of an existing workflow", ) restart_options.add_argument( "--restart", dest="restart", default=False, action="store_true", help="If --restart is specified then will attempt to restart existing workflow " "at the location pointed to by the --jobStore option. Will raise an exception " "if the workflow does not exist", ) # Batch system options batchsystem_options = parser.add_argument_group( title="Toil options for specifying the batch system", description="Allows the specification of the batch system.", ) add_all_batchsystem_options(batchsystem_options) # File store options file_store_options = parser.add_argument_group( title="Toil options for configuring storage", description="Allows configuring Toil's data storage.", ) link_imports = file_store_options.add_mutually_exclusive_group() link_imports_help = ( "When using a filesystem based job store, CWL input files are by default symlinked in. " "Setting this option to True instead copies the files into the job store, which may protect " "them from being modified externally. When set to False, as long as caching is enabled, " "Toil will protect the file automatically by changing the permissions to read-only. " "default=%(default)s" ) link_imports.add_argument( "--symlinkImports", dest="symlinkImports", type=strtobool, default=True, metavar="BOOL", help=link_imports_help, ) move_exports = file_store_options.add_mutually_exclusive_group() move_exports_help = ( "When using a filesystem based job store, output files are by default moved to the " "output directory, and a symlink to the moved exported file is created at the initial " "location. Setting this option to True instead copies the files into the output directory. " "Applies to filesystem-based job stores only. " "default=%(default)s" ) move_exports.add_argument( "--moveOutputs", dest="moveOutputs", type=strtobool, default=False, metavar="BOOL", help=move_exports_help, ) caching = file_store_options.add_mutually_exclusive_group() caching_help = ("Enable or disable worker level file caching for your workflow, specifying this overrides default from batch system. " "Does not affect CWL or WDL task caching.") caching.add_argument( "--caching", dest="caching", type=opt_strtobool, default=None, metavar="BOOL", help=caching_help, ) # default is None according to PR 4299, seems to be generated at runtime file_store_options.add_argument( "--symlinkJobStoreReads", dest="symlink_job_store_reads", type=strtobool, default=True, metavar="BOOL", help="Allow reads and container mounts from a JobStore's shared filesystem directly " "via symlink. default=%(default)s", ) # Auto scaling options autoscaling_options = parser.add_argument_group( title="Toil options for autoscaling the cluster of worker nodes", description="Allows the specification of the minimum and maximum number of nodes in an autoscaled cluster, " "as well as parameters to control the level of provisioning.", ) provisioner_choices = ["aws", "gce", None] # TODO: Better consolidate this provisioner arg and the one in provisioners/__init__.py? autoscaling_options.add_argument( "--provisioner", "-p", dest="provisioner", choices=provisioner_choices, default=None, help=f"The provisioner for cluster auto-scaling. This is the main Toil " f"'--provisioner' option, and defaults to None for running on single " f"machine and non-auto-scaling batch systems. The currently supported " f"choices are {provisioner_choices}. The default is %(default)s.", ) autoscaling_options.add_argument( "--nodeTypes", default=[], dest="nodeTypes", type=parse_node_types, action="extend", help="Specifies a list of comma-separated node types, each of which is " "composed of slash-separated instance types, and an optional spot " "bid set off by a colon, making the node type preemptible. Instance " "types may appear in multiple node types, and the same node type " "may appear as both preemptible and non-preemptible.\n" "Valid argument specifying two node types:\n" "\tc5.4xlarge/c5a.4xlarge:0.42,t2.large\n" "Node types:\n" "\tc5.4xlarge/c5a.4xlarge:0.42 and t2.large\n" "Instance types:\n" "\tc5.4xlarge, c5a.4xlarge, and t2.large\n" "Semantics:\n" "\tBid $0.42/hour for either c5.4xlarge or c5a.4xlarge instances,\n" "\ttreated interchangeably, while they are available at that price,\n" "\tand buy t2.large instances at full price.\n" "default=%(default)s", ) class NodeExtendAction(_AppendAction): """ argparse Action class to remove the default value on first call, and act as an extend action after """ # with action=append/extend, the argparse default is always prepended to the option # so make the CLI have priority by rewriting the option on the first run def __init__(self, option_strings: Any, dest: Any, **kwargs: Any): super().__init__(option_strings, dest, **kwargs) self.is_default = True def __call__( self, parser: Any, namespace: Any, values: Any, option_string: Any = None ) -> None: if self.is_default: setattr(namespace, self.dest, values) self.is_default = False else: super().__call__(parser, namespace, values, option_string) autoscaling_options.add_argument( "--maxNodes", default=[10], dest="maxNodes", type=parse_int_list, action=NodeExtendAction, metavar="INT[,INT...]", help=f"Maximum number of nodes of each type in the cluster, if using autoscaling, " f"provided as a comma-separated list. The first value is used as a default " f"if the list length is less than the number of nodeTypes. " f"default=%(default)s", ) autoscaling_options.add_argument( "--minNodes", default=[0], dest="minNodes", type=parse_int_list, action=NodeExtendAction, metavar="INT[,INT...]", help="Mininum number of nodes of each type in the cluster, if using " "auto-scaling. This should be provided as a comma-separated list of the " "same length as the list of node types. default=%(default)s", ) autoscaling_options.add_argument( "--targetTime", dest="targetTime", default=defaultTargetTime, type=int, action=make_closed_interval_action(0), metavar="INT", help=f"Sets how rapidly you aim to complete jobs in seconds. Shorter times mean " f"more aggressive parallelization. The autoscaler attempts to scale up/down " f"so that it expects all queued jobs will complete within targetTime " f"seconds. default=%(default)s", ) autoscaling_options.add_argument( "--betaInertia", dest="betaInertia", default=0.1, type=float, action=make_closed_interval_action(0.0, 0.9), metavar="FLOAT", help=f"A smoothing parameter to prevent unnecessary oscillations in the number " f"of provisioned nodes. This controls an exponentially weighted moving " f"average of the estimated number of nodes. A value of 0.0 disables any " f"smoothing, and a value of 0.9 will smooth so much that few changes will " f"ever be made. Must be between 0.0 and 0.9. default=%(default)s", ) autoscaling_options.add_argument( "--scaleInterval", dest="scaleInterval", default=60, type=int, metavar="INT", help=f"The interval (seconds) between assessing if the scale of " f"the cluster needs to change. default=%(default)s", ) autoscaling_options.add_argument( "--preemptibleCompensation", "--preemptableCompensation", dest="preemptibleCompensation", default=0.0, type=float, action=make_closed_interval_action(0.0, 1.0), metavar="FLOAT", help=f"The preference of the autoscaler to replace preemptible nodes with " f"non-preemptible nodes, when preemptible nodes cannot be started for some " f"reason. This value must be between 0.0 and 1.0, inclusive. " f"A value of 0.0 disables such " f"compensation, a value of 0.5 compensates two missing preemptible nodes " f"with a non-preemptible one. A value of 1.0 replaces every missing " f"pre-emptable node with a non-preemptible one. default=%(default)s", ) autoscaling_options.add_argument( "--nodeStorage", dest="nodeStorage", default=50, type=int, metavar="INT", help="Specify the size of the root volume of worker nodes when they are launched " "in gigabytes. You may want to set this if your jobs require a lot of disk " f"space. (default=%(default)s).", ) autoscaling_options.add_argument( "--nodeStorageOverrides", dest="nodeStorageOverrides", default=[], type=parse_str_list, action="extend", metavar="NODETYPE:NODESTORAGE[,NODETYPE:NODESTORAGE...]", help="Comma-separated list of nodeType:nodeStorage that are used to override " "the default value from --nodeStorage for the specified nodeType(s). " "This is useful for heterogeneous jobs where some tasks require much more " "disk than others.", ) autoscaling_options.add_argument( "--metrics", dest="metrics", default=False, type=strtobool, metavar="BOOL", help="Enable the prometheus/grafana dashboard for monitoring CPU/RAM usage, " "queue size, and issued jobs.", ) autoscaling_options.add_argument( "--assumeZeroOverhead", dest="assume_zero_overhead", default=False, type=strtobool, metavar="BOOL", help="Ignore scheduler and OS overhead and assume jobs can use every last byte " "of memory and disk on a node when autoscaling.", ) # Parameters to limit service jobs / detect service deadlocks service_options = parser.add_argument_group( title="Toil options for limiting the number of service jobs and detecting service deadlocks", description=( SUPPRESS if cwl else "Allows the specification of the maximum number of service jobs in a cluster. " "By keeping this limited we can avoid nodes occupied with services causing " "deadlocks." ), ) service_options.add_argument( "--maxServiceJobs", dest="maxServiceJobs", default=SYS_MAX_SIZE, type=int, metavar="INT", help=( SUPPRESS if cwl else f"The maximum number of service jobs that can be run " f"concurrently, excluding service jobs running on " f"preemptible nodes. default=%(default)s" ), ) service_options.add_argument( "--maxPreemptibleServiceJobs", dest="maxPreemptibleServiceJobs", default=SYS_MAX_SIZE, type=int, metavar="INT", help=( SUPPRESS if cwl else "The maximum number of service jobs that can run " "concurrently on preemptible nodes. default=%(default)s" ), ) service_options.add_argument( "--deadlockWait", dest="deadlockWait", default=60, type=int, metavar="INT", help=( SUPPRESS if cwl else f"Time, in seconds, to tolerate the workflow running only " f"the same service jobs, with no jobs to use them, " f"before declaring the workflow to be deadlocked and " f"stopping. default=%(default)s" ), ) service_options.add_argument( "--deadlockCheckInterval", dest="deadlockCheckInterval", default=30, type=int, metavar="INT", help=( SUPPRESS if cwl else "Time, in seconds, to wait between checks to see if the " "workflow is stuck running only service jobs, with no jobs " "to use them. Should be shorter than --deadlockWait. May " "need to be increased if the batch system cannot enumerate " "running jobs quickly enough, or if polling for running " "jobs is placing an unacceptable load on a shared cluster." f"default=%(default)s" ), ) # Resource requirements resource_options = parser.add_argument_group( title="Toil options for cores/memory requirements", description="The options to specify default cores/memory requirements (if not specified by the jobs " "themselves), and to limit the total amount of memory/cores requested from the batch system.", ) resource_help_msg = ( "The {} amount of {} to request for a job. " "Only applicable to jobs that do not specify an explicit value for this requirement. " "{}. " "Default is {}." ) cpu_note = "Fractions of a core (for example 0.1) are supported on some batch systems [mesos, single_machine]" disk_mem_note = "Standard suffixes like K, Ki, M, Mi, G or Gi are supported" accelerators_note = ( "Each accelerator specification can have a type (gpu [default], nvidia, amd, cuda, rocm, opencl, " "or a specific model like nvidia-tesla-k80), and a count [default: 1]. If both a type and a count " "are used, they must be separated by a colon. If multiple types of accelerators are " "used, the specifications are separated by commas" ) h2b = lambda x: human2bytes(str(x)) resource_options.add_argument( "--defaultMemory", dest="defaultMemory", default="2.0 Gi", type=h2b, action=make_open_interval_action(1), help=resource_help_msg.format( "default", "memory", disk_mem_note, bytes2human(2147483648) ), ) resource_options.add_argument( "--defaultCores", dest="defaultCores", default=1, metavar="FLOAT", type=float, action=make_open_interval_action(1.0), help=resource_help_msg.format("default", "cpu", cpu_note, str(1)), ) resource_options.add_argument( "--defaultDisk", dest="defaultDisk", default="2.0 Gi", metavar="INT", type=h2b, action=make_open_interval_action(1), help=resource_help_msg.format( "default", "disk", disk_mem_note, bytes2human(2147483648) ), ) resource_options.add_argument( "--defaultAccelerators", dest="defaultAccelerators", default=[], metavar="ACCELERATOR[,ACCELERATOR...]", type=parse_accelerator_list, action="extend", help=resource_help_msg.format("default", "accelerators", accelerators_note, []), ) resource_options.add_argument( "--defaultPreemptible", "--defaultPreemptable", dest="defaultPreemptible", metavar="BOOL", type=strtobool, nargs="?", const=True, default=False, help="Make all jobs able to run on preemptible (spot) nodes by default.", ) resource_options.add_argument( "--maxCores", dest="maxCores", default=SYS_MAX_SIZE, metavar="INT", type=int, action=make_open_interval_action(1), help=resource_help_msg.format("max", "cpu", cpu_note, str(SYS_MAX_SIZE)), ) resource_options.add_argument( "--maxMemory", dest="maxMemory", default=SYS_MAX_SIZE, metavar="INT", type=h2b, action=make_open_interval_action(1), help=resource_help_msg.format( "max", "memory", disk_mem_note, bytes2human(SYS_MAX_SIZE) ), ) resource_options.add_argument( "--maxDisk", dest="maxDisk", default=SYS_MAX_SIZE, metavar="INT", type=h2b, action=make_open_interval_action(1), help=resource_help_msg.format( "max", "disk", disk_mem_note, bytes2human(SYS_MAX_SIZE) ), ) # Retrying/rescuing jobs job_options = parser.add_argument_group( title="Toil options for rescuing/killing/restarting jobs", description="The options for jobs that either run too long/fail or get lost (some batch systems have issues!).", ) job_options.add_argument( "--retryCount", dest="retryCount", default=1, type=int, action=make_open_interval_action(0), metavar="INT", help=f"Number of times to retry a failing job before giving up and " f"labeling job failed. default={1}", ) job_options.add_argument( "--stopOnFirstFailure", dest="stop_on_first_failure", type=strtobool, default=False, metavar="BOOL", help="Stop the workflow at the first complete job failure.", ) job_options.add_argument( "--enableUnlimitedPreemptibleRetries", "--enableUnlimitedPreemptableRetries", dest="enableUnlimitedPreemptibleRetries", type=strtobool, default=False, metavar="BOOL", help="If set, preemptible failures (or any failure due to an instance getting " "unexpectedly terminated) will not count towards job failures and --retryCount.", ) job_options.add_argument( "--doubleMem", dest="doubleMem", type=strtobool, default=False, metavar="BOOL", help="If set, batch jobs which die to reaching memory limit on batch schedulers " "will have their memory doubled and they will be retried. The remaining " "retry count will be reduced by 1. Currently supported by LSF.", ) job_options.add_argument( "--maxJobDuration", dest="maxJobDuration", default=SYS_MAX_SIZE, type=int, action=make_open_interval_action(1), metavar="INT", help=f"Maximum runtime of a job (in seconds) before we kill it (this is a lower bound, " f"and the actual time before killing the job may be longer). " f"default=%(default)s", ) job_options.add_argument( "--rescueJobsFrequency", dest="rescueJobsFrequency", default=60, type=int, action=make_open_interval_action(1), metavar="INT", help=f"Period of time to wait (in seconds) between checking for missing/overlong jobs, " f"that is jobs which get lost by the batch system. Expert parameter. " f"default=%(default)s", ) job_options.add_argument( "--jobStoreTimeout", dest="job_store_timeout", default=30, type=float, action=make_open_interval_action(0), metavar="FLOAT", help=f"Maximum time (in seconds) to wait for a job's update to the job store " f"before declaring it failed. default=%(default)s", ) # Log management options log_options = parser.add_argument_group( title="Toil log management options", description="Options for how Toil should manage its logs.", ) log_options.add_argument( "--maxLogFileSize", dest="maxLogFileSize", default=100 * 1024 * 1024, type=h2b, action=make_open_interval_action(1), help=f"The maximum size of a job log file to keep (in bytes), log files larger than " f"this will be truncated to the last X bytes. Setting this option to zero will " f"prevent any truncation. Setting this option to a negative value will truncate " f"from the beginning. Default={bytes2human(100 * 1024 * 1024)}", ) log_options.add_argument( "--writeLogs", dest="writeLogs", nargs="?", action="store", default=None, const=os.getcwd(), metavar="OPT_PATH", help="Write worker logs received by the leader into their own files at the specified " "path. Any non-empty standard output and error from failed batch system jobs will " "also be written into files at this path. The current working directory will be " "used if a path is not specified explicitly. Note: By default only the logs of " "failed jobs are returned to leader. Set log level to 'debug' or enable " "'--writeLogsFromAllJobs' to get logs back from successful jobs, and adjust " "'maxLogFileSize' to control the truncation limit for worker logs.", ) log_options.add_argument( "--writeLogsGzip", dest="writeLogsGzip", nargs="?", action="store", default=None, const=os.getcwd(), metavar="OPT_PATH", help="Identical to --writeLogs except the logs files are gzipped on the leader.", ) log_options.add_argument( "--writeLogsFromAllJobs", dest="writeLogsFromAllJobs", type=strtobool, default=False, metavar="BOOL", help="Whether to write logs from all jobs (including the successful ones) without " "necessarily setting the log level to 'debug'. Ensure that either --writeLogs " "or --writeLogsGzip is set if enabling this option.", ) log_options.add_argument( "--writeMessages", dest="write_messages", default=None, type=lambda x: None if x is None else os.path.abspath(x), metavar="PATH", help="File to send messages from the leader's message bus to.", ) log_options.add_argument( "--realTimeLogging", dest="realTimeLogging", type=strtobool, default=False, metavar="BOOL", help="Enable real-time logging from workers to leader", ) # Misc options misc_options = parser.add_argument_group( title="Toil miscellaneous options", description="Everything else." ) misc_options.add_argument( "--disableChaining", dest="disableChaining", type=strtobool, default=False, metavar="BOOL", help="Disables chaining of jobs (chaining uses one job's resource allocation " "for its successor job if possible).", ) misc_options.add_argument( "--disableJobStoreChecksumVerification", dest="disableJobStoreChecksumVerification", default=False, type=strtobool, metavar="BOOL", help="Disables checksum verification for files transferred to/from the job store. " "Checksum verification is a safety check to ensure the data is not corrupted " "during transfer. Currently only supported for non-streaming AWS files.", ) class SSEKeyAction(Action): def __call__( self, parser: Any, namespace: Any, values: Any, option_string: Any = None ) -> None: if values is not None: sse_key = values if sse_key is None: return with open(sse_key) as f: assert ( len(f.readline().rstrip()) == 32 ), "SSE key appears to be invalid." setattr(namespace, self.dest, values) misc_options.add_argument( "--sseKey", dest="sseKey", default=None, action=SSEKeyAction, metavar="PATH", help="Path to file containing 32 character key to be used for server-side encryption on " "awsJobStore or googleJobStore. SSE will not be used if this flag is not passed.", ) # yaml.safe_load is being deprecated, this is the suggested workaround def yaml_safe_load(stream: Any) -> Any: yaml = YAML(typ="safe", pure=True) d = yaml.load(stream) if isinstance(d, dict): # this means the argument was a dictionary and is valid yaml (for configargparse) return d else: # this means the argument is likely in it's string format (for CLI) return parse_set_env(parse_str_list(stream)) class ExtendActionDict(Action): """ Argparse action class to implement the action="extend" functionality on dictionaries """ def __call__( self, parser: Any, namespace: Any, values: Any, option_string: Any = None ) -> None: items = getattr(namespace, self.dest, None) assert ( items is not None ) # for mypy. This should never be None, esp. if called in setEnv # note: this will overwrite existing entries items.update(values) misc_options.add_argument( "--setEnv", "-e", metavar="NAME=VALUE or NAME", dest="environment", default={}, type=yaml_safe_load, action=ExtendActionDict, help="Set an environment variable early on in the worker. If VALUE is null, it will " "be looked up in the current environment. Independently of this option, the worker " "will try to emulate the leader's environment before running a job, except for " "some variables known to vary across systems. Using this option, a variable can " "be injected into the worker process itself before it is started.", ) misc_options.add_argument( "--servicePollingInterval", dest="servicePollingInterval", default=60.0, type=float, action=make_open_interval_action(0.0), metavar="FLOAT", help=f"Interval of time service jobs wait between polling for the existence of the " f"keep-alive flag. Default: {60.0}", ) misc_options.add_argument( "--forceDockerAppliance", dest="forceDockerAppliance", type=strtobool, default=False, metavar="BOOL", help="Disables sanity checking the existence of the docker image specified by " "TOIL_APPLIANCE_SELF, which Toil uses to provision mesos for autoscaling.", ) misc_options.add_argument( "--statusWait", dest="statusWait", type=int, default=3600, metavar="INT", help="Seconds to wait between reports of running jobs.", ) misc_options.add_argument( "--disableProgress", dest="disableProgress", action="store_true", default=False, help="Disables the progress bar shown when standard error is a terminal.", ) misc_options.add_argument( "--publishWorkflowMetrics", dest="publish_workflow_metrics", choices=["all", "current", "no"], default=None, help="Whether to publish workflow metrics reports (including unique workflow " "and task run IDs, job names, and version and Toil feature use information) to " "Dockstore when a workflow completes. Selecting \"current\" will publish metrics " "for the current workflow. Selecting \"all\" will also publish prior workflow " "runs from the Toil history database, even if they themselves were run with \"no\". " "Note that once published, workflow metrics CANNOT be deleted or un-published; they " "will stay published forever!" ) # Debug options debug_options = parser.add_argument_group( title="Toil debug options", description="Debug options for finding problems or helping with testing.", ) debug_options.add_argument( "--debugWorker", dest="debugWorker", default=False, action="store_true", help="Experimental no forking mode for local debugging. Specifically, workers " "are not forked and stderr/stdout are not redirected to the log.", ) debug_options.add_argument( "--disableWorkerOutputCapture", dest="disableWorkerOutputCapture", default=False, action="store_true", help="Let worker output go to worker's standard out/error instead of per-job logs.", ) debug_options.add_argument( "--badWorker", dest="badWorker", default=0.0, type=float, action=make_closed_interval_action(0.0, 1.0), metavar="FLOAT", help=f"For testing purposes randomly kill --badWorker proportion of jobs using " f"SIGKILL. default={0.0}", ) debug_options.add_argument( "--badWorkerFailInterval", dest="badWorkerFailInterval", default=0.01, type=float, action=make_open_interval_action(0.0), metavar="FLOAT", # might be cyclical? help=f"When killing the job pick uniformly within the interval from 0.0 to " f"--badWorkerFailInterval seconds after the worker starts. " f"default={0.01}", ) # All deprecated options: # These are deprecated in favor of a simpler option # ex: noLinkImports and linkImports can be simplified into a single link_imports argument link_imports.add_argument( "--noLinkImports", dest="linkImports", action="store_false", help=SUPPRESS ) link_imports.add_argument( "--linkImports", dest="linkImports", action="store_true", help=SUPPRESS ) link_imports.set_defaults(linkImports=None) move_exports.add_argument( "--moveExports", dest="moveExports", action="store_true", help=SUPPRESS ) move_exports.add_argument( "--noMoveExports", dest="moveExports", action="store_false", help=SUPPRESS ) link_imports.set_defaults(moveExports=None) # dest is set to enableCaching to not conflict with the current --caching destination caching.add_argument( "--disableCaching", dest="enableCaching", action="store_false", help=SUPPRESS ) caching.set_defaults(enableCaching=None)