#!/usr/bin/env python
# Adapted from https://github.com/roryk/ipython-cluster-helper
# ipython-cluster-helper is licensed under the MIT license
#
# Copyright 2013-2017 "Rory Kirchne" <rory.kirchner@gmail.com> and contributors
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
import fnmatch
import logging
import os
import re
import subprocess
from packaging import version
from toil.lib.conversions import convert_units
LSB_PARAMS_FILENAME = "lsb.params"
LSF_CONF_FILENAME = "lsf.conf"
LSF_CONF_ENV = ["LSF_CONFDIR", "LSF_ENVDIR"]
DEFAULT_LSF_UNITS = "KB"
DEFAULT_RESOURCE_UNITS = "MB"
LSF_JSON_OUTPUT_MIN_VERSION = "10.1.0.2"
logger = logging.getLogger(__name__)
[docs]
def find(basedir, string):
"""
walk basedir and return all files matching string
"""
matches = []
for root, dirnames, filenames in os.walk(basedir):
for filename in fnmatch.filter(filenames, string):
matches.append(os.path.join(root, filename))
return matches
[docs]
def find_first_match(basedir, string):
"""
return the first file that matches string starting from basedir
"""
matches = find(basedir, string)
return matches[0] if matches else matches
[docs]
def get_conf_file(filename, env):
conf_path = os.environ.get(env)
if not conf_path:
return None
conf_file = find_first_match(conf_path, filename)
return conf_file
[docs]
def apply_conf_file(fn, conf_filename):
for env in LSF_CONF_ENV:
conf_file = get_conf_file(conf_filename, env)
if conf_file:
with open(conf_file, encoding="utf-8") as conf_handle:
value = fn(conf_handle)
if value:
return value
return None
[docs]
def per_core_reserve_from_stream(stream):
for k, v in tokenize_conf_stream(stream):
if k in {"RESOURCE_RESERVE_PER_SLOT", "RESOURCE_RESERVE_PER_TASK"}:
return v.upper()
return None
[docs]
def get_lsf_units_from_stream(stream):
for k, v in tokenize_conf_stream(stream):
if k == "LSF_UNIT_FOR_LIMITS":
return v
return None
[docs]
def tokenize_conf_stream(conf_handle):
"""
convert the key=val pairs in a LSF config stream to tuples of tokens
"""
for line in conf_handle:
if line.startswith("#"):
continue
tokens = line.split("=")
if len(tokens) != 2:
continue
yield (tokens[0].strip(), tokens[1].strip())
[docs]
def apply_bparams(fn):
"""
apply fn to each line of bparams, returning the result
"""
cmd = ["bparams", "-a"]
try:
output = subprocess.check_output(cmd, stderr=subprocess.STDOUT).decode("utf-8")
except subprocess.CalledProcessError as exc:
logger.debug(exc.output.decode("utf-8"))
return None
return fn(output.split("\n"))
[docs]
def apply_lsadmin(fn):
"""
apply fn to each line of lsadmin, returning the result
"""
cmd = ["lsadmin", "showconf", "lim"]
try:
output = subprocess.check_output(cmd, stderr=subprocess.STDOUT).decode("utf-8")
except subprocess.CalledProcessError as exc:
logger.debug(exc.output.decode("utf-8"))
return None
return fn(output.split("\n"))
[docs]
def get_lsf_units(resource: bool = False) -> str:
"""
check if we can find LSF_UNITS_FOR_LIMITS in lsadmin and lsf.conf
files, preferring the value in bparams, then lsadmin, then the lsf.conf file
"""
lsf_units = apply_bparams(get_lsf_units_from_stream)
if lsf_units:
return lsf_units
lsf_units = apply_lsadmin(get_lsf_units_from_stream)
if lsf_units:
return lsf_units
lsf_units = apply_conf_file(get_lsf_units_from_stream, LSF_CONF_FILENAME)
if lsf_units:
return lsf_units
# -R usage units are in MB, not KB by default
if resource:
return DEFAULT_RESOURCE_UNITS
else:
return DEFAULT_LSF_UNITS
[docs]
def parse_mem_and_cmd_from_output(output: str):
"""Use regex to find "MAX MEM" and "Command" inside of an output."""
# Handle hard wrapping in the middle of words and arbitrary
# indents. May drop spaces at the starts of lines that aren't
# meant to be part of the indent.
cleaned_up_output = " ".join(re.sub(r"\n\s*", "", output).split(","))
max_mem = re.search(r"MAX ?MEM: ?(.*?);", cleaned_up_output)
command = re.search(r"Command ?<(.*?)>", cleaned_up_output)
return max_mem, command
[docs]
def get_lsf_version():
"""
Get current LSF version
"""
cmd = ["lsid"]
try:
output = subprocess.check_output(cmd).decode("utf-8")
except:
return None
bjobs_search = re.search("IBM Spectrum LSF Standard (.*),", output)
if bjobs_search:
lsf_version = bjobs_search.group(1)
return lsf_version
else:
return None
[docs]
def check_lsf_json_output_supported():
"""Check if the current LSF system supports bjobs json output."""
try:
lsf_version = get_lsf_version()
if lsf_version and (
version.parse(lsf_version) >= version.parse(LSF_JSON_OUTPUT_MIN_VERSION)
):
return True
except:
return False
return False
[docs]
def parse_memory(mem: float) -> str:
"""Parse memory parameter."""
megabytes_of_mem = convert_units(float(mem), src_unit="B", dst_unit="MB")
if megabytes_of_mem < 1:
megabytes_of_mem = 1.0
# round as a string here to avoid returning something like 1.231e+12
return f"{megabytes_of_mem:.0f}MB"
[docs]
def per_core_reservation():
"""
returns True if the cluster is configured for reservations to be per core,
False if it is per job
"""
per_core = apply_bparams(per_core_reserve_from_stream)
if per_core:
if per_core.upper() == "Y":
return True
else:
return False
per_core = apply_lsadmin(per_core_reserve_from_stream)
if per_core:
if per_core.upper() == "Y":
return True
else:
return False
per_core = apply_conf_file(per_core_reserve_from_stream, LSB_PARAMS_FILENAME)
if per_core and per_core.upper() == "Y":
return True
return False
if __name__ == "__main__":
print(get_lsf_units())
print(per_core_reservation())