# Copyright (C) 2013 by Thomas Keane (tk2@sanger.ac.uk)
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
import json
import logging
import math
import os
import re
import subprocess
from datetime import datetime
from random import randint
from typing import Optional, Union
from dateutil.parser import parse
from dateutil.tz import tzlocal
from toil.batchSystems.abstractBatchSystem import (
EXIT_STATUS_UNAVAILABLE_VALUE,
BatchJobExitReason,
)
from toil.batchSystems.abstractGridEngineBatchSystem import (
AbstractGridEngineBatchSystem,
)
from toil.batchSystems.lsfHelper import (
check_lsf_json_output_supported,
parse_mem_and_cmd_from_output,
parse_memory,
per_core_reservation,
)
from toil.lib.misc import call_command
logger = logging.getLogger(__name__)
[docs]
class LSFBatchSystem(AbstractGridEngineBatchSystem):
[docs]
class GridEngineThread(AbstractGridEngineBatchSystem.GridEngineThread):
"""LSF specific GridEngineThread methods."""
[docs]
def getRunningJobIDs(self):
times = {}
with self.runningJobsLock:
currentjobs = {str(self.batchJobIDs[x][0]): x for x in self.runningJobs}
if check_lsf_json_output_supported:
stdout = call_command(["bjobs", "-json", "-o", "jobid stat start_time"])
bjobs_records = self.parseBjobs(stdout)
if bjobs_records:
for single_item in bjobs_records:
if (
single_item["STAT"] == "RUN"
and single_item["JOBID"] in currentjobs
):
jobstart = parse(
single_item["START_TIME"],
default=datetime.now(tzlocal()),
)
times[currentjobs[single_item["JOBID"]]] = (
datetime.now(tzlocal()) - jobstart
)
else:
times = self.fallbackRunningJobIDs(currentjobs)
return times
[docs]
def fallbackRunningJobIDs(self, currentjobs):
times = {}
stdout = call_command(
["bjobs", "-o", "jobid stat start_time delimiter='|'"]
)
for curline in stdout.split("\n"):
items = curline.strip().split("|")
if items[0] in currentjobs and items[1] == "RUN":
jobstart = parse(items[2], default=datetime.now(tzlocal()))
times[currentjobs[items[0]]] = datetime.now(tzlocal()) - jobstart
return times
[docs]
def killJob(self, jobID):
call_command(["bkill", self.getBatchSystemID(jobID)])
[docs]
def prepareSubmission(
self,
cpu: int,
memory: int,
jobID: int,
command: str,
jobName: str,
job_environment: Optional[dict[str, str]] = None,
gpus: Optional[int] = None,
):
return (
self.prepareBsub(cpu, memory, jobID) + [command],
job_environment,
) # pass job_environment to .submitJob()
[docs]
def submitJob(self, subLine):
subLine, job_environment = subLine
combinedEnv = self.boss.environment
combinedEnv.update(os.environ)
if job_environment:
combinedEnv.update(job_environment)
stdout = call_command(subLine, env=combinedEnv)
# Example success: Job <39605914> is submitted to default queue <general>.
# Example fail: Service class does not exist. Job not submitted.
result_search = re.search("Job <(.*)> is submitted", stdout)
if result_search:
result = int(result_search.group(1))
logger.debug(f"Got the job id: {result}")
else:
logger.error(f"Could not submit job\nReason: {stdout}")
temp_id = randint(10000000, 99999999)
# Flag this job to be handled by getJobExitCode
result = f"NOT_SUBMITTED_{temp_id}"
return result
[docs]
def coalesce_job_exit_codes(self, batch_job_id_list: list) -> list:
status_dict = {}
valid_batch_job_id_list = []
status_resonse = []
for single_lsf_id in batch_job_id_list:
if "NOT_SUBMITTED" in single_lsf_id:
logger.error(
"bjobs detected job [%s] failed to submit", single_lsf_id
)
status_dict[single_lsf_id] = 1
job = single_lsf_id
if "." in single_lsf_id:
job = single_lsf_id.split(".", 1)[0]
valid_batch_job_id_list.append(job)
if valid_batch_job_id_list:
args = [
"bjobs",
"-json",
"-o",
"jobid user exit_code stat exit_reason pend_reason",
] + valid_batch_job_id_list
logger.debug("Getting coalesced job exit codes via bjobs")
bjobs_records = self.parseBjobs(
subprocess.run(
args,
check=False,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
encoding="utf-8",
).stdout
)
if bjobs_records:
for single_record in bjobs_records:
if "JOBID" in single_record:
single_job_id = single_record["JOBID"]
status_dict[single_job_id] = self.parse_bjobs_record(
single_record, single_job_id
)
for single_lsf_id in batch_job_id_list:
if "NOT_SUBMITTED" in single_lsf_id:
status_resonse.append(status_dict[single_lsf_id])
else:
job = single_lsf_id
if "." in single_lsf_id:
job = single_lsf_id.split(".", 1)[0]
if job in status_dict:
status_resonse.append(status_dict[job])
else:
status_resonse.append(None)
return status_resonse
[docs]
def getJobExitCode(
self, lsfJobID
) -> Union[int, tuple[int, Optional[BatchJobExitReason]], None]:
# the task is set as part of the job ID if using getBatchSystemID()
if "NOT_SUBMITTED" in lsfJobID:
logger.error("bjobs detected job failed to submit")
return 1
job, task = (lsfJobID, None)
if "." in lsfJobID:
job, task = lsfJobID.split(".", 1)
self.parseMaxMem(job)
# first try bjobs to find out job state
if check_lsf_json_output_supported:
args = [
"bjobs",
"-json",
"-o",
"user exit_code stat exit_reason pend_reason",
str(job),
]
logger.debug(
"Checking job exit code for job via bjobs: " "{}".format(job)
)
stdout = call_command(args)
bjobs_records = self.parseBjobs(stdout)
if bjobs_records:
process_output = bjobs_records[0]
return self.parse_bjobs_record(process_output, job)
return self.fallbackGetJobExitCode(job)
[docs]
def parse_bjobs_record(
self, bjobs_record: dict, job: int
) -> Union[int, tuple[int, Optional[BatchJobExitReason]], None]:
"""
Helper functions for getJobExitCode and to parse the bjobs status record
"""
if "STAT" in bjobs_record:
process_status = bjobs_record["STAT"]
if process_status == "DONE":
logger.debug("bjobs detected job completed for job: %s", job)
return 0
if process_status == "PEND":
pending_info = ""
if "PEND_REASON" in bjobs_record:
if bjobs_record["PEND_REASON"]:
pending_info = "\n" + bjobs_record["PEND_REASON"]
logger.debug(
"bjobs detected job pending with: %s\nfor job: %s",
pending_info,
job,
)
return None
if process_status == "EXIT":
exit_code = 1
exit_reason = ""
if "EXIT_CODE" in bjobs_record:
exit_code_str = bjobs_record["EXIT_CODE"]
if exit_code_str:
exit_code = int(exit_code_str)
if "EXIT_REASON" in bjobs_record:
exit_reason = bjobs_record["EXIT_REASON"]
exit_info = ""
if exit_code:
exit_info = f"\nexit code: {exit_code}"
if exit_reason:
exit_info += f"\nexit reason: {exit_reason}"
logger.error(
"bjobs detected job failed with: %s\nfor job: %s",
exit_info,
job,
)
if "TERM_MEMLIMIT" in exit_reason:
return (
(
exit_code
if exit_code != 0
else EXIT_STATUS_UNAVAILABLE_VALUE
),
BatchJobExitReason.MEMLIMIT,
)
return exit_code
if process_status == "RUN":
logger.debug(
"bjobs detected job started but not completed for job: %s", job
)
return None
if process_status in {"PSUSP", "USUSP", "SSUSP"}:
logger.debug("bjobs detected job suspended for job: %s", job)
return None
return self.getJobExitCodeBACCT(job)
[docs]
def getJobExitCodeBACCT(
self, job
) -> Union[int, tuple[int, Optional[BatchJobExitReason]], None]:
# if not found in bjobs, then try bacct (slower than bjobs)
logger.debug("bjobs failed to detect job - trying bacct: " "{}".format(job))
args = ["bacct", "-l", str(job)]
stdout = call_command(args)
process_output = stdout.split("\n")
for line in process_output:
if line.find("Completed <done>") > -1 or line.find("<DONE>") > -1:
logger.debug("Detected job completed for job: " "{}".format(job))
return 0
elif line.find("Completed <exit>") > -1 or line.find("<EXIT>") > -1:
logger.error("Detected job failed for job: " "{}".format(job))
return 1
logger.debug(
"Can't determine exit code for job or job still "
"running: {}".format(job)
)
return None
[docs]
def fallbackGetJobExitCode(
self, job
) -> Union[int, tuple[int, Optional[BatchJobExitReason]], None]:
args = ["bjobs", "-l", str(job)]
logger.debug(f"Checking job exit code for job via bjobs (fallback): {job}")
stdout = call_command(args)
output = stdout.replace("\n ", "")
process_output = output.split("\n")
started = 0
for line in process_output:
if "Done successfully" in line or "Status <DONE>" in line:
logger.debug(f"bjobs detected job completed for job: {job}")
return 0
elif "New job is waiting for scheduling" in line:
logger.debug(
f"bjobs detected job pending scheduling for job: {job}"
)
return None
elif "PENDING REASONS" in line or "Status <PEND>" in line:
logger.debug(f"bjobs detected job pending for job: {job}")
return None
elif "Exited with exit code" in line:
exit = int(
line[line.find("Exited with exit code ") + 22 :].split(".")[0]
)
logger.error(f"bjobs detected job exit code {exit} for job {job}")
return exit
elif "Completed <exit>" in line:
logger.error(f"bjobs detected job failed for job: {job}")
return 1
elif line.find("Started on ") > -1 or "Status <RUN>" in line:
started = 1
if started == 1:
logger.debug(f"bjobs detected job started but not completed: {job}")
return None
return self.getJobExitCodeBACCT(job)
"""
Implementation-specific helper methods
"""
[docs]
def prepareBsub(self, cpu: int, mem: int, jobID: int) -> list[str]:
"""
Make a bsub commandline to execute.
params:
cpu: number of cores needed
mem: number of bytes of memory needed
jobID: ID number of the job
"""
bsubMem = []
if mem:
mem = float(mem)
if per_core_reservation() and cpu:
mem = mem / math.ceil(cpu)
mem = parse_memory(mem)
bsubMem = ["-R", f"select[mem>{mem}] " f"rusage[mem={mem}]", "-M", mem]
bsubCpu = [] if cpu is None else ["-n", str(math.ceil(cpu))]
bsubline = ["bsub", "-cwd", ".", "-J", f"toil_job_{jobID}"]
bsubline.extend(bsubMem)
bsubline.extend(bsubCpu)
stdoutfile: str = self.boss.format_std_out_err_path(jobID, "%J", "out")
stderrfile: str = self.boss.format_std_out_err_path(jobID, "%J", "err")
bsubline.extend(["-o", stdoutfile, "-e", stderrfile])
lsfArgs = os.getenv("TOIL_LSF_ARGS")
if lsfArgs:
bsubline.extend(lsfArgs.split())
return bsubline
[docs]
def parseBjobs(self, bjobs_output_str):
"""
Parse records from bjobs json type output
:params bjobs_output_str: stdout of bjobs json type output
"""
bjobs_dict = None
bjobs_records = None
# Handle Cannot connect to LSF. Please wait ... type messages
dict_start = bjobs_output_str.find("{")
dict_end = bjobs_output_str.rfind("}")
if dict_start != -1 and dict_end != -1:
bjobs_output = bjobs_output_str[dict_start : (dict_end + 1)]
try:
bjobs_dict = json.loads(bjobs_output)
except json.decoder.JSONDecodeError:
logger.error(f"Could not parse bjobs output: {bjobs_output_str}")
if "RECORDS" in bjobs_dict:
bjobs_records = bjobs_dict["RECORDS"]
if bjobs_records is None:
logger.error(f"Could not find bjobs output json in: {bjobs_output_str}")
return bjobs_records
[docs]
def parseMaxMem(self, jobID):
"""
Parse the maximum memory from job.
:param jobID: ID number of the job
"""
try:
output = subprocess.check_output(["bjobs", "-l", str(jobID)], text=True)
max_mem, command = parse_mem_and_cmd_from_output(output=output)
if not max_mem:
logger.warning(
f"[job ID {jobID}] Unable to Collect Maximum Memory Usage: {output}"
)
return
if not command:
logger.warning(
f"[job ID {jobID}] Cannot Parse Max Memory Due to Missing Command String: {output}"
)
else:
logger.info(
f"[job ID {jobID}, Command {command.group(1)}] Max Memory Used: {max_mem.group(1)}"
)
return max_mem
except subprocess.CalledProcessError as e:
logger.warning(
f"[job ID {jobID}] Unable to Collect Maximum Memory Usage: {e}"
)
[docs]
def getWaitDuration(self):
"""We give LSF a second to catch its breath (in seconds)"""
return 60