import textwrap
from queue import Queue
import pytest
import toil.batchSystems.slurm
from toil.batchSystems.abstractBatchSystem import BatchJobExitReason, EXIT_STATUS_UNAVAILABLE_VALUE
from toil.common import Config
from toil.lib.misc import CalledProcessErrorStderr
from toil.test import ToilTest
# TODO: Come up with a better way to mock the commands then monkey-patching the
# command-calling functions.
[docs]
def call_sacct(args, **_) -> str:
"""
The arguments passed to `call_command` when executing `sacct` are:
['sacct', '-n', '-j', '<comma-separated list of job-ids>', '--format',
'JobIDRaw,State,ExitCode', '-P', '-S', '1970-01-01']
The multi-line output is something like::
1234|COMPLETED|0:0
1234.batch|COMPLETED|0:0
1235|PENDING|0:0
1236|FAILED|0:2
1236.extern|COMPLETED|0:0
"""
# Fake output per fake job-id.
sacct_info = {
609663: "609663|FAILED|0:2\n609663.extern|COMPLETED|0:0\n",
754725: "754725|TIMEOUT|0:0\n754725.extern|COMPLETED|0:0\n754725.0|COMPLETED|0:0\n",
765096: "765096|FAILED|0:9\n765096.extern|COMPLETED|0:0\n765096.0|CANCELLED by 54386|0:9\n",
767925: "767925|FAILED|2:0\n767925.extern|COMPLETED|0:0\n767925.0|FAILED|2:0\n",
785023: "785023|FAILED|127:0\n785023.batch|FAILED|127:0\n785023.extern|COMPLETED|0:0\n",
789456: "789456|FAILED|1:0\n",
789724: "789724|RUNNING|0:0\n789724.batch|RUNNING|0:0\n789724.extern|RUNNING|0:0\n",
789868: "789868|PENDING|0:0\n",
789869: "789869|COMPLETED|0:0\n789869.batch|COMPLETED|0:0\n789869.extern|COMPLETED|0:0\n",
}
job_ids = [int(job_id) for job_id in args[3].split(',')]
stdout = ""
# Glue the fake outputs for the request job-ids together in a single string
for job_id in job_ids:
stdout += sacct_info.get(job_id, "")
return stdout
[docs]
def call_scontrol(args, **_) -> str:
"""
The arguments passed to `call_command` when executing `scontrol` are:
``['scontrol', 'show', 'job']`` or ``['scontrol', 'show', 'job', '<job-id>']``
"""
job_id = int(args[3]) if len(args) > 3 else None
# Fake output per fake job-id.
scontrol_info = {
787204: textwrap.dedent("""\
JobId=787204 JobName=toil_job_6_CWLJob
UserId=rapthor-mloose(54386) GroupId=rapthor-mloose(54038) MCS_label=N/A
Priority=11067 Nice=0 Account=rapthor QOS=normal
JobState=COMPLETED Reason=None Dependency=(null)
Requeue=0 Restarts=0 BatchFlag=1 Reboot=0 ExitCode=0:0
RunTime=00:00:05 TimeLimit=5-00:00:00 TimeMin=N/A
SubmitTime=2021-10-11T17:20:42 EligibleTime=2021-10-11T17:20:42
AccrueTime=2021-10-11T17:20:42
StartTime=2021-10-11T17:20:43 EndTime=2021-10-11T17:20:48 Deadline=N/A
SuspendTime=None SecsPreSuspend=0 LastSchedEval=2021-10-11T17:20:43
Partition=normal AllocNode:Sid=batch-01:1912150
ReqNodeList=(null) ExcNodeList=(null)
NodeList=wn-hb-01
BatchHost=wn-hb-01
NumNodes=1 NumCPUs=1 NumTasks=0 CPUs/Task=1 ReqB:S:C:T=0:0:*:*
TRES=cpu=1,mem=2G,node=1,billing=1
Socks/Node=* NtasksPerN:B:S:C=0:0:*:* CoreSpec=*
MinCPUsNode=1 MinMemoryNode=2G MinTmpDiskNode=0
Features=(null) DelayBoot=00:00:00
OverSubscribe=OK Contiguous=0 Licenses=(null) Network=(null)
Command=(null)
WorkDir=/home/rapthor-mloose/code/toil/cwl-v1.2
StdErr=/home/rapthor-mloose/code/toil/cwl-v1.2/tmp/toil_19512746-a9f4-4b99-b9ff-48ca5c1b661c.6.787204.err.log
StdIn=/dev/null
StdOut=/home/rapthor-mloose/code/toil/cwl-v1.2/tmp/toil_19512746-a9f4-4b99-b9ff-48ca5c1b661c.6.787204.out.log
Power=
NtasksPerTRES:0
"""),
789724: textwrap.dedent("""\
JobId=789724 JobName=run_prefactor-cwltool.sh
UserId=rapthor-mloose(54386) GroupId=rapthor-mloose(54038) MCS_label=N/A
Priority=7905 Nice=0 Account=rapthor QOS=normal
JobState=RUNNING Reason=None Dependency=(null)
Requeue=0 Restarts=0 BatchFlag=1 Reboot=0 ExitCode=0:0
RunTime=17:22:59 TimeLimit=5-00:00:00 TimeMin=N/A
SubmitTime=2021-10-14T17:37:17 EligibleTime=2021-10-14T17:37:17
AccrueTime=2021-10-14T17:37:17
StartTime=2021-10-14T17:37:18 EndTime=2021-10-19T17:37:18 Deadline=N/A
SuspendTime=None SecsPreSuspend=0 LastSchedEval=2021-10-14T17:37:18
Partition=normal AllocNode:Sid=batch-01:2814774
ReqNodeList=(null) ExcNodeList=wn-ca-[01-02],wn-db-[01-06]
NodeList=wn-ha-01
BatchHost=wn-ha-01
NumNodes=1 NumCPUs=20 NumTasks=1 CPUs/Task=20 ReqB:S:C:T=0:0:*:*
TRES=cpu=20,mem=160000M,node=1,billing=20
Socks/Node=* NtasksPerN:B:S:C=0:0:*:* CoreSpec=*
MinCPUsNode=20 MinMemoryCPU=8000M MinTmpDiskNode=0
Features=(null) DelayBoot=00:00:00
OverSubscribe=OK Contiguous=0 Licenses=(null) Network=(null)
Command=/project/rapthor/Software/prefactor/sbin/run_prefactor-cwltool.sh L721962 HBA_target
WorkDir=/project/rapthor/Share/prefactor/L721962
StdErr=/project/rapthor/Share/prefactor/L721962/slurm-789724.out
StdIn=/dev/null
StdOut=/project/rapthor/Share/prefactor/L721962/slurm-789724.out
Power=
NtasksPerTRES:0
"""),
789728: textwrap.dedent("""\
JobId=789728 JobName=sleep.sh
UserId=rapthor-mloose(54386) GroupId=rapthor-mloose(54038) MCS_label=N/A
Priority=8005 Nice=0 Account=rapthor QOS=normal
JobState=PENDING Reason=ReqNodeNotAvail,_UnavailableNodes:wn-db-05 Dependency=(null)
Requeue=0 Restarts=0 BatchFlag=1 Reboot=0 ExitCode=0:0
RunTime=00:00:00 TimeLimit=5-00:00:00 TimeMin=N/A
SubmitTime=2021-10-14T18:08:11 EligibleTime=2021-10-14T18:08:11
AccrueTime=2021-10-14T18:08:11
StartTime=Unknown EndTime=Unknown Deadline=N/A
SuspendTime=None SecsPreSuspend=0 LastSchedEval=2021-10-15T11:00:07
Partition=normal AllocNode:Sid=batch-01:2814774
ReqNodeList=wn-db-05 ExcNodeList=(null)
NodeList=(null)
NumNodes=1 NumCPUs=1 NumTasks=1 CPUs/Task=1 ReqB:S:C:T=0:0:*:*
TRES=cpu=1,mem=8000M,node=1,billing=1
Socks/Node=* NtasksPerN:B:S:C=0:0:*:* CoreSpec=*
MinCPUsNode=1 MinMemoryCPU=8000M MinTmpDiskNode=0
Features=(null) DelayBoot=00:00:00
OverSubscribe=OK Contiguous=0 Licenses=(null) Network=(null)
Command=/home/rapthor-mloose/tmp/sleep.sh
WorkDir=/home/rapthor-mloose/tmp
StdErr=/home/rapthor-mloose/tmp/slurm-789728.out
StdIn=/dev/null
StdOut=/home/rapthor-mloose/tmp/slurm-789728.out
Power=
NtasksPerTRES:0
"""),
}
if job_id is not None:
try:
stdout = scontrol_info[job_id]
except KeyError:
raise CalledProcessErrorStderr(1, "slurm_load_jobs error: Invalid job id specified")
else:
# Glue the fake outputs for the request job-ids together in a single string
stdout = ""
for value in scontrol_info.values():
stdout += value + '\n'
return stdout
[docs]
def call_sacct_raises(*_):
"""
Fake that the `sacct` command fails by raising a `CalledProcessErrorStderr`
"""
raise CalledProcessErrorStderr(1, "sacct: error: Problem talking to the database: "
"Connection timed out")
[docs]
class FakeBatchSystem:
"""
Class that implements a minimal Batch System, needed to create a Worker (see below).
"""
def __init__(self):
self.config = self.__fake_config()
[docs]
def getWaitDuration(self):
return 10;
def __fake_config(self):
"""
Returns a dummy config for the batch system tests. We need a workflowID to be set up
since we are running tests without setting up a jobstore. This is the class version
to be used when an instance is not available.
:rtype: toil.common.Config
"""
config = Config()
from uuid import uuid4
config.workflowID = str(uuid4())
config.cleanWorkDir = 'always'
return config
[docs]
class SlurmTest(ToilTest):
"""
Class for unit-testing SlurmBatchSystem
"""
[docs]
def setUp(self):
self.monkeypatch = pytest.MonkeyPatch()
self.worker = toil.batchSystems.slurm.SlurmBatchSystem.GridEngineThread(
newJobsQueue=Queue(),
updatedJobsQueue=Queue(),
killQueue=Queue(),
killedJobsQueue=Queue(),
boss=FakeBatchSystem())
####
#### tests for _getJobDetailsFromSacct()
####
[docs]
def test_getJobDetailsFromSacct_one_exists(self):
self.monkeypatch.setattr(toil.batchSystems.slurm, "call_command", call_sacct)
expected_result = {785023: ("FAILED", 127)}
result = self.worker._getJobDetailsFromSacct(list(expected_result))
assert result == expected_result, f"{result} != {expected_result}"
[docs]
def test_getJobDetailsFromSacct_one_not_exists(self):
self.monkeypatch.setattr(toil.batchSystems.slurm, "call_command", call_sacct)
expected_result = {1234: (None, None)}
result = self.worker._getJobDetailsFromSacct(list(expected_result))
assert result == expected_result, f"{result} != {expected_result}"
[docs]
def test_getJobDetailsFromSacct_many_all_exist(self):
self.monkeypatch.setattr(toil.batchSystems.slurm, "call_command", call_sacct)
expected_result = {754725: ("TIMEOUT", 0), 789456: ("FAILED", 1), 789724: ("RUNNING", 0),
789868: ("PENDING", 0), 789869: ("COMPLETED", 0)}
result = self.worker._getJobDetailsFromSacct(list(expected_result))
assert result == expected_result, f"{result} != {expected_result}"
[docs]
def test_getJobDetailsFromSacct_many_some_exist(self):
self.monkeypatch.setattr(toil.batchSystems.slurm, "call_command", call_sacct)
expected_result = {609663: ("FAILED", 130), 767925: ("FAILED", 2), 1234: (None, None),
1235: (None, None), 765096: ("FAILED", 137)}
result = self.worker._getJobDetailsFromSacct(list(expected_result))
assert result == expected_result, f"{result} != {expected_result}"
[docs]
def test_getJobDetailsFromSacct_many_none_exist(self):
self.monkeypatch.setattr(toil.batchSystems.slurm, "call_command", call_sacct)
expected_result = {1234: (None, None), 1235: (None, None), 1236: (None, None)}
result = self.worker._getJobDetailsFromSacct(list(expected_result))
assert result == expected_result, f"{result} != {expected_result}"
####
#### tests for _getJobDetailsFromScontrol()
####
[docs]
def test_getJobDetailsFromScontrol_one_exists(self):
self.monkeypatch.setattr(toil.batchSystems.slurm, "call_command", call_scontrol)
expected_result = {789724: ("RUNNING", 0)}
result = self.worker._getJobDetailsFromScontrol(list(expected_result))
assert result == expected_result, f"{result} != {expected_result}"
[docs]
def test_getJobDetailsFromScontrol_one_not_exists(self):
"""
Asking for the job details of a single job that `scontrol` doesn't know about should
raise an exception.
"""
self.monkeypatch.setattr(toil.batchSystems.slurm, "call_command", call_scontrol)
expected_result = {1234: (None, None)}
try:
_ = self.worker._getJobDetailsFromScontrol(list(expected_result))
except CalledProcessErrorStderr:
pass
else:
assert False, "Expected exception CalledProcessErrorStderr"
[docs]
def test_getJobDetailsFromScontrol_many_all_exist(self):
self.monkeypatch.setattr(toil.batchSystems.slurm, "call_command", call_scontrol)
expected_result = {787204: ("COMPLETED", 0), 789724: ("RUNNING", 0), 789728: ("PENDING", 0)}
result = self.worker._getJobDetailsFromScontrol(list(expected_result))
assert result == expected_result, f"{result} != {expected_result}"
[docs]
def test_getJobDetailsFromScontrol_many_some_exist(self):
self.monkeypatch.setattr(toil.batchSystems.slurm, "call_command", call_scontrol)
expected_result = {787204: ("COMPLETED", 0), 789724: ("RUNNING", 0), 1234: (None, None)}
result = self.worker._getJobDetailsFromScontrol(list(expected_result))
assert result == expected_result, f"{result} != {expected_result}"
[docs]
def test_getJobDetailsFromScontrol_many_none_exist(self):
self.monkeypatch.setattr(toil.batchSystems.slurm, "call_command", call_scontrol)
expected_result = {1234: (None, None), 1235: (None, None), 1236: (None, None)}
result = self.worker._getJobDetailsFromScontrol(list(expected_result))
assert result == expected_result, f"{result} != {expected_result}"
###
### tests for getJobExitCode
###
[docs]
def test_getJobExitCode_job_exists(self):
self.monkeypatch.setattr(toil.batchSystems.slurm, "call_command", call_sacct)
job_id = '785023' # FAILED
expected_result = (127, BatchJobExitReason.FAILED)
result = self.worker.getJobExitCode(job_id)
assert result == expected_result, f"{result} != {expected_result}"
[docs]
def test_getJobExitCode_job_not_exists(self):
self.monkeypatch.setattr(toil.batchSystems.slurm, "call_command", call_sacct)
job_id = '1234' # Non-existent
expected_result = None
result = self.worker.getJobExitCode(job_id)
assert result == expected_result, f"{result} != {expected_result}"
[docs]
def test_getJobExitCode_sacct_raises_job_exists(self):
"""
This test forces the use of `scontrol` to get job information, by letting `sacct`
raise an exception.
"""
self.monkeypatch.setattr(self.worker, "_getJobDetailsFromSacct", call_sacct_raises)
self.monkeypatch.setattr(toil.batchSystems.slurm, "call_command", call_scontrol)
job_id = '787204' # COMPLETED
expected_result = (0, BatchJobExitReason.FINISHED)
result = self.worker.getJobExitCode(job_id)
assert result == expected_result, f"{result} != {expected_result}"
[docs]
def test_getJobExitCode_sacct_raises_job_not_exists(self):
"""
This test forces the use of `scontrol` to get job information, by letting `sacct`
raise an exception. Next, `scontrol` should also raise because it doesn't know the job.
"""
self.monkeypatch.setattr(self.worker, "_getJobDetailsFromSacct", call_sacct_raises)
self.monkeypatch.setattr(toil.batchSystems.slurm, "call_command", call_scontrol)
job_id = '1234' # Non-existent
try:
_ = self.worker.getJobExitCode(job_id)
except CalledProcessErrorStderr:
pass
else:
assert False, "Exception CalledProcessErrorStderr not raised"
###
### Tests for coalesce_job_exit_codes
###
[docs]
def test_coalesce_job_exit_codes_one_exists(self):
self.monkeypatch.setattr(toil.batchSystems.slurm, "call_command", call_sacct)
job_ids = ['785023'] # FAILED
expected_result = [(127, BatchJobExitReason.FAILED)]
result = self.worker.coalesce_job_exit_codes(job_ids)
assert result == expected_result, f"{result} != {expected_result}"
[docs]
def test_coalesce_job_exit_codes_one_not_exists(self):
self.monkeypatch.setattr(toil.batchSystems.slurm, "call_command", call_sacct)
job_ids = ['1234'] # Non-existent
expected_result = [None]
result = self.worker.coalesce_job_exit_codes(job_ids)
assert result == expected_result, f"{result} != {expected_result}"
[docs]
def test_coalesce_job_exit_codes_many_all_exist(self):
self.monkeypatch.setattr(toil.batchSystems.slurm, "call_command", call_sacct)
job_ids = ['754725', # TIMEOUT,
'789456', # FAILED,
'789724', # RUNNING,
'789868', # PENDING,
'789869'] # COMPLETED
# RUNNING and PENDING jobs should return None
expected_result = [
(EXIT_STATUS_UNAVAILABLE_VALUE, BatchJobExitReason.KILLED),
(1, BatchJobExitReason.FAILED),
None,
None,
(0, BatchJobExitReason.FINISHED)
]
result = self.worker.coalesce_job_exit_codes(job_ids)
assert result == expected_result, f"{result} != {expected_result}"
[docs]
def test_coalesce_job_exit_codes_some_exists(self):
self.monkeypatch.setattr(toil.batchSystems.slurm, "call_command", call_sacct)
job_ids = ['609663', # FAILED (SIGINT)
'767925', # FAILED,
'789724', # RUNNING,
'999999', # Non-existent,
'789869'] # COMPLETED
# RUNNING job should return None
expected_result = [
(130, BatchJobExitReason.FAILED),
(2, BatchJobExitReason.FAILED),
None,
None,
(0, BatchJobExitReason.FINISHED)
]
result = self.worker.coalesce_job_exit_codes(job_ids)
assert result == expected_result, f"{result} != {expected_result}"
[docs]
def test_coalesce_job_exit_codes_sacct_raises_job_exists(self):
"""
This test forces the use of `scontrol` to get job information, by letting `sacct`
raise an exception.
"""
self.monkeypatch.setattr(self.worker, "_getJobDetailsFromSacct", call_sacct_raises)
self.monkeypatch.setattr(toil.batchSystems.slurm, "call_command", call_scontrol)
job_ids = ['787204'] # COMPLETED
expected_result = [(0, BatchJobExitReason.FINISHED)]
result = self.worker.coalesce_job_exit_codes(job_ids)
assert result == expected_result, f"{result} != {expected_result}"
[docs]
def test_coalesce_job_exit_codes_sacct_raises_job_not_exists(self):
"""
This test forces the use of `scontrol` to get job information, by letting `sacct`
raise an exception. Next, `scontrol` should also raise because it doesn't know the job.
"""
self.monkeypatch.setattr(self.worker, "_getJobDetailsFromSacct", call_sacct_raises)
self.monkeypatch.setattr(toil.batchSystems.slurm, "call_command", call_scontrol)
job_ids = ['1234'] # Non-existent
try:
_ = self.worker.coalesce_job_exit_codes(job_ids)
except CalledProcessErrorStderr:
pass
else:
assert False, "Exception CalledProcessErrorStderr not raised"