Source code for toil.batchSystems.abstractGridEngineBatchSystem

# Copyright (C) 2015-2021 Regents of the University of California
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import time
from abc import ABCMeta, abstractmethod
from datetime import datetime
from queue import Empty, Queue
from threading import Lock, Thread
from typing import Dict, List, Optional, Tuple, Union

from toil.batchSystems.abstractBatchSystem import (BatchJobExitReason,
                                                   UpdatedBatchJobInfo)
from toil.batchSystems.cleanup_support import BatchSystemCleanupSupport
from toil.bus import ExternalBatchIdMessage
from toil.job import AcceleratorRequirement
from toil.lib.misc import CalledProcessErrorStderr

logger = logging.getLogger(__name__)


# Internally we throw around these flat tuples of random important things about a job.
# Assigned ID
# Required cores
# Required memory
# Command to run
# Unit name of the job
# Environment dict for the job
# Accelerator requirements for the job
JobTuple = Tuple[int, float, int, str, str, Dict[str, str], List[AcceleratorRequirement]]

[docs] class AbstractGridEngineBatchSystem(BatchSystemCleanupSupport): """ A partial implementation of BatchSystemSupport for batch systems run on a standard HPC cluster. By default auto-deployment is not implemented. """
[docs] class Worker(Thread, metaclass=ABCMeta): def __init__(self, newJobsQueue: Queue, updatedJobsQueue: Queue, killQueue: Queue, killedJobsQueue: Queue, boss: 'AbstractGridEngineBatchSystem') -> None: """ Abstract worker interface class. All instances are created with five initial arguments (below). Note the Queue instances passed are empty. :param newJobsQueue: a Queue of new (unsubmitted) jobs :param updatedJobsQueue: a Queue of jobs that have been updated :param killQueue: a Queue of active jobs that need to be killed :param killedJobsQueue: Queue of killed jobs for this worker :param boss: the AbstractGridEngineBatchSystem instance that controls this AbstractGridEngineWorker """ Thread.__init__(self) self.boss = boss self.boss.config.statePollingWait = \ self.boss.config.statePollingWait or self.boss.getWaitDuration() self.newJobsQueue = newJobsQueue self.updatedJobsQueue = updatedJobsQueue self.killQueue = killQueue self.killedJobsQueue = killedJobsQueue self.waitingJobs: List[JobTuple] = list() self.runningJobs = set() self.runningJobsLock = Lock() self.batchJobIDs: Dict[int, str] = dict() self._checkOnJobsCache = None self._checkOnJobsTimestamp = None
[docs] def getBatchSystemID(self, jobID: int) -> str: """ Get batch system-specific job ID Note: for the moment this is the only consistent way to cleanly get the batch system job ID :param jobID: Toil BatchSystem numerical job ID """ if jobID not in self.batchJobIDs: raise RuntimeError("Unknown jobID, could not be converted") (job, task) = self.batchJobIDs[jobID] if task is None: return str(job) else: return str(job) + "." + str(task)
[docs] def forgetJob(self, jobID: int) -> None: """ Remove jobID passed :param jobID: toil job ID """ with self.runningJobsLock: self.runningJobs.remove(jobID) del self.batchJobIDs[jobID]
[docs] def createJobs(self, newJob: JobTuple) -> bool: """ Create a new job with the given attributes. Implementation-specific; called by AbstractGridEngineWorker.run() """ activity = False # Load new job id if present: if newJob is not None: self.waitingJobs.append(newJob) # Launch jobs as necessary: while len(self.waitingJobs) > 0 and \ len(self.runningJobs) < int(self.boss.config.max_jobs): activity = True jobID, cpu, memory, command, jobName, environment, gpus = self.waitingJobs.pop(0) # prepare job submission command subLine = self.prepareSubmission(cpu, memory, jobID, command, jobName, environment, gpus) logger.debug("Running %r", subLine) batchJobID = self.boss.with_retries(self.submitJob, subLine) if self.boss._outbox is not None: #JobID corresponds to the toil version of the jobID, dif from jobstore idea of the id, batchjobid is what we get from slurm self.boss._outbox.publish(ExternalBatchIdMessage(jobID, batchJobID, self.boss.__class__.__name__)) logger.debug("Submitted job %s", str(batchJobID)) # Store dict for mapping Toil job ID to batch job ID # TODO: Note that this currently stores a tuple of (batch system # ID, Task), but the second value is None by default and doesn't # seem to be used self.batchJobIDs[jobID] = (batchJobID, None) # Add to queue of running jobs with self.runningJobsLock: self.runningJobs.add(jobID) return activity
[docs] def killJobs(self): """ Kill any running jobs within worker """ killList = list() while True: try: jobId = self.killQueue.get(block=False) except Empty: break else: killList.append(jobId) if not killList: return False # Do the dirty job for jobID in list(killList): if jobID in self.runningJobs: logger.debug('Killing job: %s', jobID) # this call should be implementation-specific, all other # code is redundant w/ other implementations self.killJob(jobID) else: if jobID in self.waitingJobs: self.waitingJobs.remove(jobID) self.killedJobsQueue.put(jobID) killList.remove(jobID) # Wait to confirm the kill while killList: for jobID in list(killList): batchJobID = self.getBatchSystemID(jobID) if self.boss.with_retries(self.getJobExitCode, batchJobID) is not None: logger.debug('Adding jobID %s to killedJobsQueue', jobID) self.killedJobsQueue.put(jobID) killList.remove(jobID) self.forgetJob(jobID) if len(killList) > 0: logger.warning("Some jobs weren't killed, trying again in %is.", self.boss.sleepSeconds()) return True
[docs] def checkOnJobs(self): """Check and update status of all running jobs. Respects statePollingWait and will return cached results if not within time period to talk with the scheduler. """ if self._checkOnJobsTimestamp: time_since_last_check = (datetime.now() - self._checkOnJobsTimestamp).total_seconds() if time_since_last_check < self.boss.config.statePollingWait: return self._checkOnJobsCache activity = False running_job_list = list(self.runningJobs) batch_job_id_list = [self.getBatchSystemID(j) for j in running_job_list] if batch_job_id_list: try: # Get the statuses as a batch statuses = self.boss.with_retries( self.coalesce_job_exit_codes, batch_job_id_list ) except NotImplementedError: # We have to get the statuses individually for running_job_id, batch_job_id in zip(running_job_list, batch_job_id_list): status = self.boss.with_retries(self.getJobExitCode, batch_job_id) activity = self._handle_job_status( running_job_id, status, activity ) else: # We got the statuses as a batch for running_job_id, status in zip(running_job_list, statuses): activity = self._handle_job_status( running_job_id, status, activity ) self._checkOnJobsCache = activity self._checkOnJobsTimestamp = datetime.now() return activity
def _handle_job_status( self, job_id: int, status: Union[int, None], activity: bool ) -> bool: """ Helper method for checkOnJobs to handle job statuses """ if status is not None: self.updatedJobsQueue.put( UpdatedBatchJobInfo( jobID=job_id, exitStatus=status, exitReason=None, wallTime=None ) ) self.forgetJob(job_id) return True if status is not None and isinstance(status, BatchJobExitReason): self.updatedJobsQueue.put( UpdatedBatchJobInfo( jobID=job_id, exitStatus=1, exitReason=status, wallTime=None ) ) self.forgetJob(job_id) return True return activity def _runStep(self): """return True if more jobs, False is all done""" activity = False newJob = None if not self.newJobsQueue.empty(): activity = True newJob = self.newJobsQueue.get() if newJob is None: logger.debug('Received queue sentinel.') return False if self.killJobs(): activity = True if self.createJobs(newJob): activity = True if self.checkOnJobs(): activity = True if not activity: logger.debug('No activity, sleeping for %is', self.boss.sleepSeconds()) return True
[docs] def run(self): """ Run any new jobs """ try: while self._runStep(): pass except Exception as ex: logger.error("GridEngine like batch system failure", exc_info=ex) raise
[docs] def coalesce_job_exit_codes(self, batch_job_id_list: list) -> list: """ Returns exit codes for a list of jobs. Called by AbstractGridEngineWorker.checkOnJobs(). This is an optional part of the interface. It should raise NotImplementedError if not actually implemented for a particular scheduler. :param string batch_job_id_list: List of batch system job ID """ raise NotImplementedError()
[docs] @abstractmethod def prepareSubmission(self, cpu: int, memory: int, jobID: int, command: str, jobName: str, job_environment: Optional[Dict[str, str]] = None, gpus: Optional[int] = None) -> List[str]: """ Preparation in putting together a command-line string for submitting to batch system (via submitJob().) :param: int cpu :param: int memory :param: int jobID: Toil job ID :param: string subLine: the command line string to be called :param: string jobName: the name of the Toil job, to provide metadata to batch systems if desired :param: dict job_environment: the environment variables to be set on the worker :rtype: List[str] """ raise NotImplementedError()
[docs] @abstractmethod def submitJob(self, subLine): """ Wrapper routine for submitting the actual command-line call, then processing the output to get the batch system job ID :param: string subLine: the literal command line string to be called :rtype: string: batch system job ID, which will be stored internally """ raise NotImplementedError()
[docs] @abstractmethod def getRunningJobIDs(self): """ Get a list of running job IDs. Implementation-specific; called by boss AbstractGridEngineBatchSystem implementation via AbstractGridEngineBatchSystem.getRunningBatchJobIDs() :rtype: list """ raise NotImplementedError()
[docs] @abstractmethod def killJob(self, jobID): """ Kill specific job with the Toil job ID. Implementation-specific; called by AbstractGridEngineWorker.killJobs() :param string jobID: Toil job ID """ raise NotImplementedError()
[docs] @abstractmethod def getJobExitCode(self, batchJobID): """ Returns job exit code or an instance of abstractBatchSystem.BatchJobExitReason. if something else happened other than the job exiting. Implementation-specific; called by AbstractGridEngineWorker.checkOnJobs() :param string batchjobID: batch system job ID :rtype: int|toil.batchSystems.abstractBatchSystem.BatchJobExitReason: exit code int or BatchJobExitReason if something else happened other than job exiting. """ raise NotImplementedError()
def __init__(self, config, maxCores, maxMemory, maxDisk): super().__init__( config, maxCores, maxMemory, maxDisk) self.config = config self.currentJobs = set() self.newJobsQueue = Queue() self.updatedJobsQueue = Queue() self.killQueue = Queue() self.killedJobsQueue = Queue() # get the associated worker class here self.worker = self.Worker(self.newJobsQueue, self.updatedJobsQueue, self.killQueue, self.killedJobsQueue, self) self.worker.start() self._getRunningBatchJobIDsTimestamp = None self._getRunningBatchJobIDsCache = {}
[docs] @classmethod def supportsWorkerCleanup(cls): return False
[docs] @classmethod def supportsAutoDeployment(cls): return False
[docs] def issueBatchJob(self, jobDesc, job_environment: Optional[Dict[str, str]] = None): # Avoid submitting internal jobs to the batch queue, handle locally localID = self.handleLocalJob(jobDesc) if localID is not None: return localID else: self.check_resource_request(jobDesc) jobID = self.getNextJobID() self.currentJobs.add(jobID) gpus = 0 if isinstance(jobDesc.accelerators, list): for accelerator in jobDesc.accelerators: if accelerator['kind'] == 'gpu': gpus = accelerator['count'] else: gpus = jobDesc.accelerators self.newJobsQueue.put((jobID, jobDesc.cores, jobDesc.memory, jobDesc.command, jobDesc.get_job_kind(), job_environment, gpus)) logger.debug("Issued the job command: %s with job id: %s and job name %s", jobDesc.command, str(jobID), jobDesc.get_job_kind()) return jobID
[docs] def killBatchJobs(self, jobIDs): """ Kills the given jobs, represented as Job ids, then checks they are dead by checking they are not in the list of issued jobs. """ self.killLocalJobs(jobIDs) jobIDs = set(jobIDs) logger.debug('Jobs to be killed: %r', jobIDs) for jobID in jobIDs: self.killQueue.put(jobID) while jobIDs: killedJobId = self.killedJobsQueue.get() if killedJobId is None: break jobIDs.remove(killedJobId) if killedJobId in self._getRunningBatchJobIDsCache: # Running batch id cache can sometimes contain a job we kill, so to ensure cache doesn't contain the job, we delete it here del self._getRunningBatchJobIDsCache[killedJobId] if killedJobId in self.currentJobs: self.currentJobs.remove(killedJobId) if jobIDs: logger.debug('Some kills (%s) still pending, sleeping %is', len(jobIDs), self.sleepSeconds())
[docs] def getIssuedBatchJobIDs(self): """ Gets the list of issued jobs """ return list(self.getIssuedLocalJobIDs()) + list(self.currentJobs)
[docs] def getRunningBatchJobIDs(self): """ Retrieve running job IDs from local and batch scheduler. Respects statePollingWait and will return cached results if not within time period to talk with the scheduler. """ if (self._getRunningBatchJobIDsTimestamp and ( datetime.now() - self._getRunningBatchJobIDsTimestamp).total_seconds() < self.config.statePollingWait): batchIds = self._getRunningBatchJobIDsCache else: batchIds = self.with_retries(self.worker.getRunningJobIDs) self._getRunningBatchJobIDsCache = batchIds self._getRunningBatchJobIDsTimestamp = datetime.now() batchIds.update(self.getRunningLocalJobIDs()) return batchIds
[docs] def getUpdatedBatchJob(self, maxWait): local_tuple = self.getUpdatedLocalJob(0) if local_tuple: return local_tuple else: try: item = self.updatedJobsQueue.get(timeout=maxWait) except Empty: return None logger.debug('UpdatedJobsQueue Item: %s', item) self.currentJobs.remove(item.jobID) return item
[docs] def shutdown(self) -> None: """ Signals worker to shutdown (via sentinel) then cleanly joins the thread """ self.shutdownLocal() newJobsQueue = self.newJobsQueue self.newJobsQueue = None newJobsQueue.put(None) self.worker.join()
[docs] def setEnv(self, name, value=None): if value and ',' in value: raise ValueError(type(self).__name__ + " does not support commata in environment variable values") return super().setEnv(name, value)
[docs] @classmethod def getWaitDuration(self): return 1
[docs] def sleepSeconds(self, sleeptime=1): """ Helper function to drop on all state-querying functions to avoid over-querying. """ time.sleep(sleeptime) return sleeptime
[docs] def with_retries(self, operation, *args, **kwargs): """ Call operation with args and kwargs. If one of the calls to an SGE command fails, sleep and try again for a set number of times. """ maxTries = 3 tries = 0 while True: tries += 1 try: return operation(*args, **kwargs) except CalledProcessErrorStderr as err: if tries < maxTries: logger.error("Will retry errored operation %s, code %d: %s", operation.__name__, err.returncode, err.stderr) time.sleep(self.config.statePollingWait) else: logger.error("Failed operation %s, code %d: %s", operation.__name__, err.returncode, err.stderr) raise err