Source code for toil.batchSystems.registry

# Copyright (C) 2015-2021 Regents of the University of California
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import importlib
import logging
import pkgutil
import warnings
from typing import TYPE_CHECKING, Callable, Dict, List, Sequence, Tuple, Type

from toil.lib.compatibility import deprecated
from toil.lib.memoize import memoize

if TYPE_CHECKING:
    from toil.batchSystems.abstractBatchSystem import AbstractBatchSystem

logger = logging.getLogger(__name__)

#####
# Plugin system/API
#####

[docs] def add_batch_system_factory(key: str, class_factory: Callable[[], Type['AbstractBatchSystem']]): """ Adds a batch system to the registry for workflow or plugin-supplied batch systems. :param class_factory: A function that returns a batch system class (NOT an instance), which implements :class:`toil.batchSystems.abstractBatchSystem.AbstractBatchSystem`. """ _registry_keys.append(key) _registry[key] = class_factory
[docs] def get_batch_systems() -> Sequence[str]: """ Get the names of all the availsble batch systems. """ _load_all_plugins() return _registry_keys
[docs] def get_batch_system(key: str) -> Type['AbstractBatchSystem']: """ Get a batch system class by name. :raises: KeyError if the key is not the name of a batch system, and ImportError if the batch system's class cannot be loaded. """ return _registry[key]()
DEFAULT_BATCH_SYSTEM = 'single_machine' ##### # Built-in batch systems #####
[docs] def aws_batch_batch_system_factory(): from toil.batchSystems.awsBatch import AWSBatchBatchSystem return AWSBatchBatchSystem
[docs] def gridengine_batch_system_factory(): from toil.batchSystems.gridengine import GridEngineBatchSystem return GridEngineBatchSystem
[docs] def lsf_batch_system_factory(): from toil.batchSystems.lsf import LSFBatchSystem return LSFBatchSystem
[docs] def single_machine_batch_system_factory(): from toil.batchSystems.singleMachine import SingleMachineBatchSystem return SingleMachineBatchSystem
[docs] def mesos_batch_system_factory(): from toil.batchSystems.mesos.batchSystem import MesosBatchSystem return MesosBatchSystem
[docs] def slurm_batch_system_factory(): from toil.batchSystems.slurm import SlurmBatchSystem return SlurmBatchSystem
[docs] def torque_batch_system_factory(): from toil.batchSystems.torque import TorqueBatchSystem return TorqueBatchSystem
[docs] def htcondor_batch_system_factory(): from toil.batchSystems.htcondor import HTCondorBatchSystem return HTCondorBatchSystem
[docs] def kubernetes_batch_system_factory(): from toil.batchSystems.kubernetes import KubernetesBatchSystem return KubernetesBatchSystem
##### # Registry implementation ##### _registry: Dict[str, Callable[[], Type["AbstractBatchSystem"]]] = { 'aws_batch' : aws_batch_batch_system_factory, 'single_machine' : single_machine_batch_system_factory, 'grid_engine' : gridengine_batch_system_factory, 'lsf' : lsf_batch_system_factory, 'mesos' : mesos_batch_system_factory, 'slurm' : slurm_batch_system_factory, 'torque' : torque_batch_system_factory, 'htcondor' : htcondor_batch_system_factory, 'kubernetes' : kubernetes_batch_system_factory } _registry_keys = list(_registry.keys()) # We will load any packages starting with this prefix and let them call # add_batch_system_factory() _PLUGIN_NAME_PREFIX = "toil_batch_system_" @memoize def _load_all_plugins() -> None: """ Load all the batch system plugins that are installed. """ for finder, name, is_pkg in pkgutil.iter_modules(): # For all installed packages if name.startswith(_PLUGIN_NAME_PREFIX): # If it is a Toil batch system plugin, import it importlib.import_module(name) ##### # Deprecated API ##### # We used to directly access these constants, but now the Right Way to use this # module is add_batch_system_factory() to register and get_batch_systems() to # get the list/get_batch_system() to get a class by name.
[docs] def __getattr__(name): """ Implement a fallback attribute getter to handle deprecated constants. See <https://stackoverflow.com/a/48242860>. """ if name == "BATCH_SYSTEM_FACTORY_REGISTRY": warnings.warn("BATCH_SYSTEM_FACTORY_REGISTRY is deprecated; use get_batch_system() or add_batch_system_factory()", DeprecationWarning) return _registry elif name == "BATCH_SYSTEMS": warnings.warn("BATCH_SYSTEMS is deprecated; use get_batch_systems()", DeprecationWarning) return _registry_keys else: raise AttributeError(f"Module {__name__} ahs no attribute {name}")
[docs] @deprecated(new_function_name="add_batch_system_factory") def addBatchSystemFactory(key: str, batchSystemFactory: Callable[[], Type['AbstractBatchSystem']]): """ Deprecated method to add a batch system. """ return add_batch_system_factory(key, batchSystemFactory)
##### # Testing utilities ##### # We need a snapshot save/restore system for testing. We can't just tamper with # the globals because module-level globals are their own references, so we # can't touch this module's global name bindings from a client module.
[docs] def save_batch_system_plugin_state() -> Tuple[List[str], Dict[str, Callable[[], Type['AbstractBatchSystem']]]]: """ Return a snapshot of the plugin registry that can be restored to remove added plugins. Useful for testing the plugin system in-process with other tests. """ snapshot = (list(_registry_keys), dict(_registry)) return snapshot
[docs] def restore_batch_system_plugin_state(snapshot: Tuple[List[str], Dict[str, Callable[[], Type['AbstractBatchSystem']]]]): """ Restore the batch system registry state to a snapshot from save_batch_system_plugin_state(). """ # We need to apply the snapshot without rebinding the names, because that # won't affect modules that imported the names. wanted_batch_systems, wanted_registry = snapshot _registry_keys.clear() _registry_keys.extend(wanted_batch_systems) _registry.clear() _registry.update(wanted_registry)