Source code for toil.batchSystems.local_support
# Copyright (C) 2015-2021 Regents of the University of California
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
from toil.batchSystems.abstractBatchSystem import (
BatchSystemSupport,
UpdatedBatchJobInfo,
)
from toil.batchSystems.singleMachine import SingleMachineBatchSystem
from toil.common import Config
from toil.job import JobDescription
from toil.lib.threading import cpu_count
logger = logging.getLogger(__name__)
[docs]
class BatchSystemLocalSupport(BatchSystemSupport):
"""Adds a local queue for helper jobs, useful for CWL & others."""
def __init__(
self, config: Config, maxCores: float, maxMemory: float, maxDisk: int
) -> None:
super().__init__(config, maxCores, maxMemory, maxDisk)
max_local_jobs = (
config.max_local_jobs if config.max_local_jobs is not None else cpu_count()
)
self.localBatch: SingleMachineBatchSystem = SingleMachineBatchSystem(
config, maxCores, maxMemory, maxDisk, max_jobs=max_local_jobs
)
[docs]
def handleLocalJob(self, command: str, jobDesc: JobDescription) -> int | None:
"""
To be called by issueBatchJob.
Returns the jobID if the jobDesc has been submitted to the local queue,
otherwise returns None
"""
if not self.config.run_local_jobs_on_workers and jobDesc.local:
# Since singleMachine.py doesn't typecheck yet and MyPy is ignoring
# it, it will raise errors here unless we add type annotations to
# everything we get back from it. The easiest way to do that seems
# to be to put it in a variable with a type annotation on it. That
# somehow doesn't error whereas just returning the value complains
# we're returning an Any. TODO: When singleMachine.py typechecks,
# remove all these extra variables.
local_id: int = self.localBatch.issueBatchJob(command, jobDesc)
return local_id
else:
return None
[docs]
def killLocalJobs(self, jobIDs: list[int]) -> None:
"""
Will kill all local jobs that match the provided jobIDs.
To be called by killBatchJobs.
"""
self.localBatch.killBatchJobs(jobIDs)
[docs]
def getIssuedLocalJobIDs(self) -> list[int]:
"""To be called by getIssuedBatchJobIDs."""
local_ids: list[int] = self.localBatch.getIssuedBatchJobIDs()
return local_ids
[docs]
def getRunningLocalJobIDs(self) -> dict[int, float]:
"""To be called by getRunningBatchJobIDs()."""
local_running: dict[int, float] = self.localBatch.getRunningBatchJobIDs()
return local_running
[docs]
def getUpdatedLocalJob(self, maxWait: int) -> UpdatedBatchJobInfo | None:
"""To be called by getUpdatedBatchJob()."""
return self.localBatch.getUpdatedBatchJob(maxWait)
[docs]
def getNextJobID(self) -> int:
"""Must be used to get job IDs so that the local and batch jobs do not conflict."""
# TODO: This reaches deep into SingleMachineBatchSystem when it probably shouldn't
with self.localBatch.jobIndexLock:
jobID: int = self.localBatch.jobIndex
self.localBatch.jobIndex += 1
return jobID
[docs]
def shutdownLocal(self) -> None:
"""To be called from shutdown()."""
self.localBatch.shutdown()