Source code for toil.batchSystems.cleanup_support

# Copyright (C) 2015-2021 Regents of the University of California
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
from types import TracebackType
from typing import Any, ContextManager, List, Optional, Type

from toil.batchSystems.abstractBatchSystem import (BatchSystemSupport,
                                                   WorkerCleanupInfo)
from toil.batchSystems.local_support import BatchSystemLocalSupport
from toil.common import Config, Toil
from toil.lib.threading import LastProcessStandingArena

logger = logging.getLogger(__name__)

[docs] class BatchSystemCleanupSupport(BatchSystemLocalSupport): """ Adds cleanup support when the last running job leaves a node, for batch systems that can't provide it using the backing scheduler. """
[docs] @classmethod def supportsWorkerCleanup(cls) -> bool: return True
[docs] def getWorkerContexts(self) -> List[ContextManager[Any]]: # Tell worker to register for and invoke cleanup # Create a context manager that has a copy of our cleanup info context = WorkerCleanupContext(self.workerCleanupInfo) # Send it along so the worker works inside of it contexts = super().getWorkerContexts() contexts.append(context) return contexts
def __init__(self, config: Config, maxCores: float, maxMemory: int, maxDisk: int) -> None: super().__init__(config, maxCores, maxMemory, maxDisk)
[docs] class WorkerCleanupContext: """ Context manager used by :class:`BatchSystemCleanupSupport` to implement cleanup on a node after the last worker is done working. Gets wrapped around the worker's work. """ def __init__(self, workerCleanupInfo: WorkerCleanupInfo) -> None: """ Wrap the given workerCleanupInfo in a context manager. :param workerCleanupInfo: Info to use to clean up the worker if we are the last to exit the context manager. """ self.workerCleanupInfo = workerCleanupInfo # Don't set self.arena or MyPy will be upset that sometimes it doesn't have the right type.
[docs] def __enter__(self) -> None: # Set up an arena so we know who is the last worker to leave self.arena = LastProcessStandingArena( Toil.get_toil_coordination_dir( self.workerCleanupInfo.work_dir, self.workerCleanupInfo.coordination_dir ), Toil.get_workflow_path_component(self.workerCleanupInfo.workflow_id) + "-cleanup" ) logger.debug('Entering cleanup arena') self.arena.enter() logger.debug('Cleanup arena entered')
# This is exactly the signature MyPy demands. # Also, it demands we not say we can return a bool if we return False # always, because it can be smarter about reachability if it knows what # context managers never eat exceptions. So it decides any context manager # that is always falsey but claims to return a bool is an error.
[docs] def __exit__(self, type: Optional[Type[BaseException]], value: Optional[BaseException], traceback: Optional[TracebackType]) -> None: logger.debug('Leaving cleanup arena') for _ in self.arena.leave(): # We are the last concurrent worker to finish. # Do batch system cleanup. logger.debug('Cleaning up worker') BatchSystemSupport.workerCleanup(self.workerCleanupInfo) # Now the coordination_dir is allowed to no longer exist on the node. logger.debug('Cleanup arena left')