Source code for toil.batchSystems.lsf

# Copyright (C) 2013 by Thomas Keane (tk2@sanger.ac.uk)
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
import json
import logging
import math
import os
import re
import subprocess
from datetime import datetime
from random import randint
from typing import Dict, List, Optional, Tuple, Union

from dateutil.parser import parse
from dateutil.tz import tzlocal

from toil.batchSystems.abstractBatchSystem import BatchJobExitReason, EXIT_STATUS_UNAVAILABLE_VALUE
from toil.batchSystems.abstractGridEngineBatchSystem import \
    AbstractGridEngineBatchSystem
from toil.batchSystems.lsfHelper import (check_lsf_json_output_supported,
                                         parse_mem_and_cmd_from_output,
                                         parse_memory,
                                         per_core_reservation)
from toil.lib.misc import call_command

logger = logging.getLogger(__name__)


[docs] class LSFBatchSystem(AbstractGridEngineBatchSystem):
[docs] class GridEngineThread(AbstractGridEngineBatchSystem.GridEngineThread): """LSF specific GridEngineThread methods."""
[docs] def getRunningJobIDs(self): times = {} with self.runningJobsLock: currentjobs = {str(self.batchJobIDs[x][0]): x for x in self.runningJobs} if check_lsf_json_output_supported: stdout = call_command(["bjobs","-json","-o", "jobid stat start_time"]) bjobs_records = self.parseBjobs(stdout) if bjobs_records: for single_item in bjobs_records: if single_item['STAT'] == 'RUN' and single_item['JOBID'] in currentjobs: jobstart = parse(single_item['START_TIME'], default=datetime.now(tzlocal())) times[currentjobs[single_item['JOBID']]] = datetime.now(tzlocal()) \ - jobstart else: times = self.fallbackRunningJobIDs(currentjobs) return times
[docs] def fallbackRunningJobIDs(self, currentjobs): times = {} stdout = call_command(["bjobs", "-o", "jobid stat start_time delimiter='|'"]) for curline in stdout.split('\n'): items = curline.strip().split('|') if items[0] in currentjobs and items[1] == 'RUN': jobstart = parse(items[2], default=datetime.now(tzlocal())) times[currentjobs[items[0]]] = datetime.now(tzlocal()) \ - jobstart return times
[docs] def killJob(self, jobID): call_command(['bkill', self.getBatchSystemID(jobID)])
[docs] def prepareSubmission(self, cpu: int, memory: int, jobID: int, command: str, jobName: str, job_environment: Optional[Dict[str, str]] = None, gpus: Optional[int] = None): return (self.prepareBsub(cpu, memory, jobID) + [command], job_environment) # pass job_environment to .submitJob()
[docs] def submitJob(self, subLine): subLine, job_environment = subLine combinedEnv = self.boss.environment combinedEnv.update(os.environ) if job_environment: combinedEnv.update(job_environment) stdout = call_command(subLine, env=combinedEnv) # Example success: Job <39605914> is submitted to default queue <general>. # Example fail: Service class does not exist. Job not submitted. result_search = re.search('Job <(.*)> is submitted', stdout) if result_search: result = int(result_search.group(1)) logger.debug(f"Got the job id: {result}") else: logger.error(f"Could not submit job\nReason: {stdout}") temp_id = randint(10000000, 99999999) # Flag this job to be handled by getJobExitCode result = f"NOT_SUBMITTED_{temp_id}" return result
[docs] def coalesce_job_exit_codes(self, batch_job_id_list: list) -> list: status_dict = {} valid_batch_job_id_list = [] status_resonse = [] for single_lsf_id in batch_job_id_list: if "NOT_SUBMITTED" in single_lsf_id: logger.error( "bjobs detected job [%s] failed to submit", single_lsf_id ) status_dict[single_lsf_id] = 1 job = single_lsf_id if "." in single_lsf_id: job = single_lsf_id.split(".", 1)[0] valid_batch_job_id_list.append(job) if valid_batch_job_id_list: args = [ "bjobs", "-json", "-o", "jobid user exit_code stat exit_reason pend_reason", ] + valid_batch_job_id_list logger.debug("Getting coalesced job exit codes via bjobs") bjobs_records = self.parseBjobs( subprocess.run( args, check=False, stderr=subprocess.STDOUT, encoding="utf-8" ).stdout ) if bjobs_records: for single_record in bjobs_records: if "JOBID" in single_record: single_job_id = single_record["JOBID"] status_dict[single_job_id] = self.parse_bjobs_record( single_record, single_job_id ) for single_lsf_id in batch_job_id_list: if "NOT_SUBMITTED" in single_lsf_id: status_resonse.append(status_dict[single_lsf_id]) else: job = single_lsf_id if "." in single_lsf_id: job = single_lsf_id.split(".", 1)[0] if job in status_dict: status_resonse.append(status_dict[job]) else: status_resonse.append(None) return status_resonse
[docs] def getJobExitCode(self, lsfJobID) -> Union[int, Tuple[int, Optional[BatchJobExitReason]], None]: # the task is set as part of the job ID if using getBatchSystemID() if "NOT_SUBMITTED" in lsfJobID: logger.error("bjobs detected job failed to submit") return 1 job, task = (lsfJobID, None) if '.' in lsfJobID: job, task = lsfJobID.split('.', 1) self.parseMaxMem(job) # first try bjobs to find out job state if check_lsf_json_output_supported: args = ["bjobs", "-json", "-o", "user exit_code stat exit_reason pend_reason", str(job)] logger.debug("Checking job exit code for job via bjobs: " "{}".format(job)) stdout = call_command(args) bjobs_records = self.parseBjobs(stdout) if bjobs_records: process_output = bjobs_records[0] return self.parse_bjobs_record(process_output, job) return self.fallbackGetJobExitCode(job)
[docs] def parse_bjobs_record(self, bjobs_record: dict, job: int) -> Union[int, Tuple[int, Optional[BatchJobExitReason]], None]: """ Helper functions for getJobExitCode and to parse the bjobs status record """ if "STAT" in bjobs_record: process_status = bjobs_record["STAT"] if process_status == "DONE": logger.debug("bjobs detected job completed for job: %s", job) return 0 if process_status == "PEND": pending_info = "" if "PEND_REASON" in bjobs_record: if bjobs_record["PEND_REASON"]: pending_info = "\n" + bjobs_record["PEND_REASON"] logger.debug( "bjobs detected job pending with: %s\nfor job: %s", pending_info, job ) return None if process_status == "EXIT": exit_code = 1 exit_reason = "" if "EXIT_CODE" in bjobs_record: exit_code_str = bjobs_record["EXIT_CODE"] if exit_code_str: exit_code = int(exit_code_str) if "EXIT_REASON" in bjobs_record: exit_reason = bjobs_record["EXIT_REASON"] exit_info = "" if exit_code: exit_info = f"\nexit code: {exit_code}" if exit_reason: exit_info += f"\nexit reason: {exit_reason}" logger.error( "bjobs detected job failed with: %s\nfor job: %s", exit_info, job ) if "TERM_MEMLIMIT" in exit_reason: return (exit_code if exit_code != 0 else EXIT_STATUS_UNAVAILABLE_VALUE, BatchJobExitReason.MEMLIMIT) return exit_code if process_status == "RUN": logger.debug( "bjobs detected job started but not completed for job: %s", job ) return None if process_status in {"PSUSP", "USUSP", "SSUSP"}: logger.debug("bjobs detected job suspended for job: %s", job) return None return self.getJobExitCodeBACCT(job)
[docs] def getJobExitCodeBACCT(self,job) -> Union[int, Tuple[int, Optional[BatchJobExitReason]], None]: # if not found in bjobs, then try bacct (slower than bjobs) logger.debug("bjobs failed to detect job - trying bacct: " "{}".format(job)) args = ["bacct", "-l", str(job)] stdout = call_command(args) process_output = stdout.split('\n') for line in process_output: if line.find("Completed <done>") > -1 or line.find("<DONE>") > -1: logger.debug("Detected job completed for job: " "{}".format(job)) return 0 elif line.find("Completed <exit>") > -1 or line.find("<EXIT>") > -1: logger.error("Detected job failed for job: " "{}".format(job)) return 1 logger.debug("Can't determine exit code for job or job still " "running: {}".format(job)) return None
[docs] def fallbackGetJobExitCode(self, job) -> Union[int, Tuple[int, Optional[BatchJobExitReason]], None]: args = ["bjobs", "-l", str(job)] logger.debug(f"Checking job exit code for job via bjobs (fallback): {job}") stdout = call_command(args) output = stdout.replace("\n ", "") process_output = output.split('\n') started = 0 for line in process_output: if "Done successfully" in line or "Status <DONE>" in line: logger.debug(f"bjobs detected job completed for job: {job}") return 0 elif "New job is waiting for scheduling" in line: logger.debug(f"bjobs detected job pending scheduling for job: {job}") return None elif "PENDING REASONS" in line or "Status <PEND>" in line: logger.debug(f"bjobs detected job pending for job: {job}") return None elif "Exited with exit code" in line: exit = int(line[line.find("Exited with exit code ")+22:].split('.')[0]) logger.error(f"bjobs detected job exit code {exit} for job {job}") return exit elif "Completed <exit>" in line: logger.error(f"bjobs detected job failed for job: {job}") return 1 elif line.find("Started on ") > -1 or "Status <RUN>" in line: started = 1 if started == 1: logger.debug(f"bjobs detected job started but not completed: {job}") return None return self.getJobExitCodeBACCT(job)
""" Implementation-specific helper methods """
[docs] def prepareBsub(self, cpu: int, mem: int, jobID: int) -> List[str]: """ Make a bsub commandline to execute. params: cpu: number of cores needed mem: number of bytes of memory needed jobID: ID number of the job """ bsubMem = [] if mem: mem = float(mem) if per_core_reservation() and cpu: mem = mem / math.ceil(cpu) mem = parse_memory(mem) bsubMem = ['-R', f'select[mem>{mem}] ' f'rusage[mem={mem}]', '-M', mem] bsubCpu = [] if cpu is None else ['-n', str(math.ceil(cpu))] bsubline = ["bsub", "-cwd", ".", "-J", f"toil_job_{jobID}"] bsubline.extend(bsubMem) bsubline.extend(bsubCpu) stdoutfile: str = self.boss.format_std_out_err_path(jobID, '%J', 'out') stderrfile: str = self.boss.format_std_out_err_path(jobID, '%J', 'err') bsubline.extend(['-o', stdoutfile, '-e', stderrfile]) lsfArgs = os.getenv('TOIL_LSF_ARGS') if lsfArgs: bsubline.extend(lsfArgs.split()) return bsubline
[docs] def parseBjobs(self, bjobs_output_str): """ Parse records from bjobs json type output :params bjobs_output_str: stdout of bjobs json type output """ bjobs_dict = None bjobs_records = None # Handle Cannot connect to LSF. Please wait ... type messages dict_start = bjobs_output_str.find('{') dict_end = bjobs_output_str.rfind('}') if dict_start != -1 and dict_end != -1: bjobs_output = bjobs_output_str[dict_start:(dict_end+1)] try: bjobs_dict = json.loads(bjobs_output) except json.decoder.JSONDecodeError: logger.error(f"Could not parse bjobs output: {bjobs_output_str}") if 'RECORDS' in bjobs_dict: bjobs_records = bjobs_dict['RECORDS'] if bjobs_records is None: logger.error(f"Could not find bjobs output json in: {bjobs_output_str}") return bjobs_records
[docs] def parseMaxMem(self, jobID): """ Parse the maximum memory from job. :param jobID: ID number of the job """ try: output = subprocess.check_output(["bjobs", "-l", str(jobID)], text=True) max_mem, command = parse_mem_and_cmd_from_output(output=output) if not max_mem: logger.warning(f"[job ID {jobID}] Unable to Collect Maximum Memory Usage: {output}") return if not command: logger.warning(f"[job ID {jobID}] Cannot Parse Max Memory Due to Missing Command String: {output}") else: logger.info(f"[job ID {jobID}, Command {command.group(1)}] Max Memory Used: {max_mem.group(1)}") return max_mem except subprocess.CalledProcessError as e: logger.warning(f"[job ID {jobID}] Unable to Collect Maximum Memory Usage: {e}")
[docs] def getWaitDuration(self): """We give LSF a second to catch its breath (in seconds)""" return 60