Source code for toil.batchSystems.options

# Copyright (C) 2015-2021 Regents of the University of California
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and

import logging
import os
import sys
from argparse import ArgumentParser, _ArgumentGroup
from typing import Any, Callable, List, Optional, TypeVar, Union, cast

if sys.version_info >= (3, 8):
    from typing import Protocol
else:
    from typing_extensions import Protocol

from toil.batchSystems.registry import (BATCH_SYSTEM_FACTORY_REGISTRY,
                                        BATCH_SYSTEMS,
                                        DEFAULT_BATCH_SYSTEM)
from toil.lib.threading import cpu_count

logger = logging.getLogger(__name__)

[docs]class OptionSetter(Protocol): """ Protocol for the setOption function we get to let us set up CLI options for each batch system. Actual functionality is defined in the Config class. """ OptionType = TypeVar('OptionType')
[docs] def __call__( self, option_name: str, parsing_function: Optional[Callable[[Any], OptionType]] = None, check_function: Optional[Callable[[OptionType], Union[None, bool]]] = None, default: Optional[OptionType] = None, env: Optional[List[str]] = None, old_names: Optional[List[str]] = None ) -> bool: ...
[docs]def set_batchsystem_options(batch_system: Optional[str], set_option: OptionSetter) -> None: """ Call set_option for all the options for the given named batch system, or all batch systems if no name is provided. """ if batch_system is not None: # Use this batch system batch_system_type = BATCH_SYSTEM_FACTORY_REGISTRY[batch_system]() batch_system_type.setOptions(set_option) else: for factory in BATCH_SYSTEM_FACTORY_REGISTRY.values(): # All the batch systems are responsible for setting their own options # with their setOptions() class methods. try: batch_system_type = factory() except ImportError: # Skip anything we can't import continue # Ask the batch system to tell us all its options. batch_system_type.setOptions(set_option) # Options shared between multiple batch systems set_option("disableAutoDeployment", bool, default=False) # Make limits maximum if set to 0 set_option("max_jobs", lambda x: int(x) or sys.maxsize) set_option("max_local_jobs", lambda x: int(x) or sys.maxsize) set_option("manualMemArgs") set_option("run_local_jobs_on_workers", bool, default=False) set_option("statePollingWait") set_option("batch_logs_dir", env=["TOIL_BATCH_LOGS_DIR"])
[docs]def add_all_batchsystem_options(parser: Union[ArgumentParser, _ArgumentGroup]) -> None: # Do the global cross-batch-system arguments parser.add_argument( "--batchSystem", dest="batchSystem", default=DEFAULT_BATCH_SYSTEM, choices=BATCH_SYSTEMS, help=f"The type of batch system to run the job(s) with, currently can be one " f"of {', '.join(BATCH_SYSTEMS)}. default={DEFAULT_BATCH_SYSTEM}", ) parser.add_argument( "--disableHotDeployment", dest="disableAutoDeployment", action="store_true", default=None, help="Hot-deployment was renamed to auto-deployment. Option now redirects to " "--disableAutoDeployment. Left in for backwards compatibility.", ) parser.add_argument( "--disableAutoDeployment", dest="disableAutoDeployment", action="store_true", default=None, help="Should auto-deployment of the user script be deactivated? If True, the user " "script/package should be present at the same location on all workers. Default = False.", ) parser.add_argument( "--maxJobs", dest="max_jobs", default=sys.maxsize, # This is *basically* unlimited and saves a lot of Optional[] help="Specifies the maximum number of jobs to submit to the " "backing scheduler at once. Not supported on Mesos or " "AWS Batch. Use 0 for unlimited. Defaults to unlimited.", ) parser.add_argument( "--maxLocalJobs", dest="max_local_jobs", default=cpu_count(), help=f"Specifies the maximum number of housekeeping jobs to " f"run sumultaneously on the local system. Use 0 for " f"unlimited. Defaults to the number of local cores ({cpu_count()}).", ) parser.add_argument( "--manualMemArgs", default=False, action="store_true", dest="manualMemArgs", help="Do not add the default arguments: 'hv=MEMORY' & 'h_vmem=MEMORY' to the qsub " "call, and instead rely on TOIL_GRIDGENGINE_ARGS to supply alternative arguments. " "Requires that TOIL_GRIDGENGINE_ARGS be set.", ) parser.add_argument( "--runLocalJobsOnWorkers" "--runCwlInternalJobsOnWorkers", dest="run_local_jobs_on_workers", action="store_true", default=None, help="Whether to run jobs marked as local (e.g. CWLScatter) on the worker nodes " "instead of the leader node. If false (default), then all such jobs are run on " "the leader node. Setting this to true can speed up CWL pipelines for very large " "workflows with many sub-workflows and/or scatters, provided that the worker " "pool is large enough.", ) parser.add_argument( "--coalesceStatusCalls", dest="coalesceStatusCalls", action="store_true", default=True, help=( "Ask for job statuses from the batch system in a batch. Deprecated; now always " "enabled where supported." ), ) parser.add_argument( "--statePollingWait", dest="statePollingWait", type=int, default=None, help="Time, in seconds, to wait before doing a scheduler query for job state. " "Return cached results if within the waiting period. Only works for grid " "engine batch systems such as gridengine, htcondor, torque, slurm, and lsf." ) parser.add_argument( "--batchLogsDir", dest="batch_logs_dir", default=None, help="Directory to tell the backing batch system to log into. Should be available " "on both the leader and the workers, if the backing batch system writes logs " "to the worker machines' filesystems, as many HPC schedulers do. If unset, " "the Toil work directory will be used. Only works for grid engine batch " "systems such as gridengine, htcondor, torque, slurm, and lsf." ) for factory in BATCH_SYSTEM_FACTORY_REGISTRY.values(): # All the batch systems are responsible for adding their own options # with the add_options class method. try: batch_system_type = factory() except ImportError: # Skip anything we can't import continue # Ask the batch system to create its options in the parser logger.debug('Add options for %s', batch_system_type) batch_system_type.add_options(parser)
[docs]def set_batchsystem_config_defaults(config) -> None: """ Set default and environment-based options for builtin batch systems. This is required if a Config object is not constructed from an Options object. """ # Do the global options across batch systems config.batchSystem = "single_machine" config.disableAutoDeployment = False config.max_jobs = sys.maxsize config.max_local_jobs = cpu_count() config.manualMemArgs = False config.statePollingWait: Optional[Union[float, int]] = None # Number of seconds to wait before querying job state OptionType = TypeVar('OptionType') def set_option(option_name: str, parsing_function: Optional[Callable[[Any], OptionType]] = None, check_function: Optional[Callable[[OptionType], None]] = None, default: Optional[OptionType] = None, env: Optional[List[str]] = None, old_names: Optional[List[str]] = None) -> None: """ Function to set a batch-system-defined option to its default value, or one from the environment. """ # TODO: deduplicate with Config option_value = default if env is not None: for env_var in env: # Try all the environment variables if option_value != default: break option_value = os.environ.get(env_var, default) if option_value is not None or not hasattr(config, option_name): if parsing_function is not None: # Parse whatever it is (string, argparse-made list, etc.) option_value = parsing_function(option_value) if check_function is not None: try: check_function(option_value) except AssertionError: raise RuntimeError(f"The {option_name} option has an invalid value: {option_value}") setattr(config, option_name, option_value) # Set up defaults from all the batch systems set_batchsystem_options(None, cast(OptionSetter, set_option))