Source code for toil.batchSystems.torque

# Copyright (C) 2015-2021 Regents of the University of California
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import math
import os
import shlex
import tempfile
from queue import Empty
from shlex import quote
from typing import Optional

from toil.batchSystems.abstractGridEngineBatchSystem import (
    AbstractGridEngineBatchSystem,
    UpdatedBatchJobInfo,
)
from toil.lib.conversions import hms_duration_to_seconds
from toil.lib.misc import CalledProcessErrorStderr, call_command

logger = logging.getLogger(__name__)


[docs] class TorqueBatchSystem(AbstractGridEngineBatchSystem): # class-specific Worker
[docs] class GridEngineThread(AbstractGridEngineBatchSystem.GridEngineThread): def __init__( self, newJobsQueue, updatedJobsQueue, killQueue, killedJobsQueue, boss ): super().__init__( newJobsQueue, updatedJobsQueue, killQueue, killedJobsQueue, boss ) self._version = self._pbsVersion() def _pbsVersion(self): """Determines PBS/Torque version via pbsnodes""" try: out = call_command(["pbsnodes", "--version"]) if "PBSPro" in out: logger.debug("PBS Pro proprietary Torque version detected") self._version = "pro" else: logger.debug("Torque OSS version detected") self._version = "oss" except CalledProcessErrorStderr as e: if e.returncode != 0: logger.error("Could not determine PBS/Torque version") return self._version """ Torque-specific AbstractGridEngineWorker methods """
[docs] def getRunningJobIDs(self): times = {} with self.runningJobsLock: currentjobs = { str(self.batchJobIDs[x][0].strip()).split(".")[0]: x for x in self.runningJobs } logger.debug("getRunningJobIDs current jobs are: %s", currentjobs) # Skip running qstat if we don't have any current jobs if not currentjobs: return times # Only query for job IDs to avoid clogging the batch system on heavily loaded clusters # PBS plain qstat will return every running job on the system. jobids = sorted(list(currentjobs.keys())) if self._version == "pro": stdout = call_command(["qstat", "-x"] + jobids) elif self._version == "oss": stdout = call_command(["qstat"] + jobids) # qstat supports XML output which is more comprehensive, but PBSPro does not support it # so instead we stick with plain commandline qstat tabular outputs for currline in stdout.split("\n"): items = currline.strip().split() if items: jobid = items[0].strip().split(".")[0] if jobid in currentjobs: logger.debug("getRunningJobIDs job status for is: %s", items[4]) if jobid in currentjobs and items[4] == "R": walltime = items[3].strip() logger.debug( "getRunningJobIDs qstat reported walltime is: %s", walltime ) # normal qstat has a quirk with job time where it reports '0' # when initially running; this catches this case if walltime == "0": walltime = 0.0 elif not walltime: # Sometimes we don't get any data here. # See https://github.com/DataBiosphere/toil/issues/3715 logger.warning( "Assuming 0 walltime due to missing field in qstat line: %s", currline, ) walltime = 0.0 else: walltime = hms_duration_to_seconds(walltime) times[currentjobs[jobid]] = walltime logger.debug("Job times from qstat are: %s", times) return times
[docs] def getUpdatedBatchJob(self, maxWait): try: logger.debug("getUpdatedBatchJob: Job updates") item = self.updatedJobsQueue.get(timeout=maxWait) self.updatedJobsQueue.task_done() jobID, retcode = (self.jobIDs[item.jobID], item.exitStatus) self.currentjobs -= {self.jobIDs[item.jobID]} except Empty: logger.debug("getUpdatedBatchJob: Job queue is empty") else: return UpdatedBatchJobInfo( jobID=jobID, exitStatus=retcode, wallTime=None, exitReason=None )
[docs] def killJob(self, jobID): call_command(["qdel", self.getBatchSystemID(jobID)])
[docs] def prepareSubmission( self, cpu: int, memory: int, jobID: int, command: str, jobName: str, job_environment: Optional[dict[str, str]] = None, gpus: Optional[int] = None, ) -> list[str]: return self.prepareQsub(cpu, memory, jobID, job_environment) + [ self.generateTorqueWrapper(command, jobID) ]
[docs] def submitJob(self, subLine): return call_command(subLine)
[docs] def getJobExitCode(self, torqueJobID): if self._version == "pro": args = ["qstat", "-x", "-f", str(torqueJobID).split(".")[0]] elif self._version == "oss": args = ["qstat", "-f", str(torqueJobID).split(".")[0]] stdout = call_command(args) for line in stdout.split("\n"): line = line.strip() # Case differences due to PBSPro vs OSS Torque qstat outputs if ( line.startswith("failed") or line.startswith("FAILED") and int(line.split()[1]) == 1 ): return 1 if line.startswith("exit_status") or line.startswith("Exit_status"): status = line.split(" = ")[1] logger.debug("Exit Status: %s", status) return int(status) if "unknown job id" in line.lower(): # some clusters configure Torque to forget everything about just # finished jobs instantly, apparently for performance reasons logger.debug( "Batch system no longer remembers about job %s", torqueJobID ) # return assumed success; status files should reveal failure return 0 return None
""" Implementation-specific helper methods """
[docs] def prepareQsub( self, cpu: int, mem: int, jobID: int, job_environment: Optional[dict[str, str]], ) -> list[str]: # TODO: passing $PWD on command line not working for -d, resorting to # $PBS_O_WORKDIR but maybe should fix this here instead of in script? qsubline = ["qsub", "-S", "/bin/sh", "-V", "-N", f"toil_job_{jobID}"] environment = self.boss.environment.copy() if job_environment: environment.update(job_environment) if environment: qsubline.append("-v") qsubline.append( ",".join( k + "=" + quote(os.environ[k] if v is None else v) for k, v in self.boss.environment.items() ) ) reqline = list() if self._version == "pro": request = "select=1" if mem is not None: request += f":mem={mem//1024}K" if cpu is not None and math.ceil(cpu) > 1: request += ":ncpus=" + str(int(math.ceil(cpu))) reqline.append(request) else: if mem is not None: reqline.append(f"mem={mem//1024}K") if cpu is not None and math.ceil(cpu) > 1: reqline.append("nodes=1:ppn=" + str(int(math.ceil(cpu)))) # Other resource requirements can be passed through the environment (see man qsub) reqlineEnv = os.getenv("TOIL_TORQUE_REQS") if reqlineEnv is not None: logger.debug( "Additional Torque resource requirements appended to qsub from " "TOIL_TORQUE_REQS env. variable: %s", reqlineEnv, ) if ( ("mem=" in reqlineEnv) or ("nodes=" in reqlineEnv) or ("ppn=" in reqlineEnv) ): raise ValueError( f"Incompatible resource arguments ('mem=', 'nodes=', 'ppn='): {reqlineEnv}" ) reqline.append(reqlineEnv) if reqline: qsubline += ["-l", ",".join(reqline)] # All other qsub parameters can be passed through the environment (see man qsub). # No attempt is made to parse them out here and check that they do not conflict # with those that we already constructed above arglineEnv = os.getenv("TOIL_TORQUE_ARGS") if arglineEnv is not None: logger.debug( "Native Torque options appended to qsub from TOIL_TORQUE_ARGS env. variable: %s", arglineEnv, ) if ( ("mem=" in arglineEnv) or ("nodes=" in arglineEnv) or ("ppn=" in arglineEnv) ): raise ValueError( f"Incompatible resource arguments ('mem=', 'nodes=', 'ppn='): {arglineEnv}" ) qsubline += shlex.split(arglineEnv) return qsubline
[docs] def generateTorqueWrapper(self, command, jobID): """ A very simple script generator that just wraps the command given; for now this goes to default tempdir """ stdoutfile: str = self.boss.format_std_out_err_path( jobID, r"${PBS_JOBID}", "out" ) stderrfile: str = self.boss.format_std_out_err_path( jobID, r"${PBS_JOBID}", "err" ) fd, tmp_file = tempfile.mkstemp(suffix=".sh", prefix="torque_wrapper") with open(tmp_file, "w") as f: f.write("#!/bin/sh\n") f.write(f"#PBS -o {stdoutfile}\n") f.write(f"#PBS -e {stderrfile}\n") f.write("cd $PBS_O_WORKDIR\n\n") f.write(command + "\n") os.close(fd) return tmp_file