Source code for toil.test.batchSystems.test_gridengine

import textwrap
from queue import Queue

import pytest

import toil.batchSystems.gridengine
from toil.batchSystems.abstractGridEngineBatchSystem import ExceededRetryAttempts
from toil.common import Config
from toil.lib.misc import CalledProcessErrorStderr
from toil.test import ToilTest


[docs] class FakeBatchSystem: """ Class that implements a minimal Batch System, needed to create a Worker (see below). """ def __init__(self): self.config = self.__fake_config()
[docs] def getWaitDuration(self): return 10
def __fake_config(self): """ Returns a dummy config for the batch system tests. We need a workflowID to be set up since we are running tests without setting up a jobstore. This is the class version to be used when an instance is not available. :rtype: toil.common.Config """ config = Config() from uuid import uuid4 config.workflowID = str(uuid4()) config.cleanWorkDir = "always" return config
[docs] def with_retries(self, operation, *args, **kwargs): """ The grid engine batch system needs a with_retries function when running the GridEngineThread, so fake one """ return operation(*args, **kwargs)
[docs] def call_qstat_or_qacct(args, **_): # example outputs taken from https://2021.help.altair.com/2021.1/AltairGridEngine/8.7.0/UsersGuideGE.pdf qacct_info = {} job_id_info = { 1: {"failed": True, "exit_code": 0, "completed": True}, 2: {"failed": True, "exit_code": 2, "completed": True}, 3: {"failed": False, "exit_code": 0, "completed": True}, 4: {"failed": False, "exit_code": 10, "completed": True}, 5: {"failed": False, "exit_code": 0, "completed": False}, } for job_id, status_info in job_id_info.items(): failed = 1 if status_info["failed"] else 0 exit_status = status_info["exit_code"] qacct_info[job_id] = textwrap.dedent( f"""\ ============================================================== qname all.q hostname kailua group users owner jondoe project NONE department defaultdepartment jobname Sleeper jobnumber 10 taskid undefined account sge priority 0 qsub_time Thu Mar 10 19:58:35 2011 start_time Thu Mar 10 19:58:42 2011 end_time Thu Mar 10 19:59:43 2011 granted_pe NONE slots 1 failed {failed} exit_status {exit_status} ru_wallclock 61 ru_utime 0.070 ru_stime 0.050 ru_maxrss 1220 ru_ixrss 0 ru_ismrss 0 ru_idrss 0 """ ) if args[0] == "qstat": # This is guess for what qstat will return given a job. I'm unable to find an example for qstat. # This also assumes the second argument args[1] is -j, as that is what we try to use job_id = int(args[2]) if job_id not in job_id_info.keys() or job_id_info[job_id]["completed"]: stderr = f"Following jobs do not exist {job_id}" else: # This is not the output of qstat when the job is running, and is just a guess # We test on the existence of the string "Following jobs do not exist", so this should be okay for now stderr = f"Job exists {job_id}" raise CalledProcessErrorStderr(2, args, stderr=stderr) elif args[0] == "qacct": if args[1] != "-j": # Documentation for qacct says if -j is not found then all jobs are listed # https://gridscheduler.sourceforge.net/htmlman/htmlman1/qacct.html # This is a guess for the output of qacct. We don't have a SGE cluster and I can't find a bare qacct example output online qacct_response = "\n".join(qacct_info.values()) else: job_id = int(args[2]) if job_id not in job_id_info.keys(): # This is a guess of the behavior when the job does not exist. Since the behavior is unknown, this is not currently tested return "" qacct_response = qacct_info[job_id] return qacct_response
[docs] class GridEngineTest(ToilTest): """ Class for unit-testing GridEngineBatchSystem """
[docs] def setUp(self): self.monkeypatch = pytest.MonkeyPatch() self.worker = ( toil.batchSystems.gridengine.GridEngineBatchSystem.GridEngineThread( newJobsQueue=Queue(), updatedJobsQueue=Queue(), killQueue=Queue(), killedJobsQueue=Queue(), boss=FakeBatchSystem(), ) )
### ### Tests for coalesce_job_exit_codes for gridengine. ###
[docs] def test_coalesce_job_exit_codes_one_exists(self): self.monkeypatch.setattr( toil.batchSystems.gridengine, "call_command", call_qstat_or_qacct ) job_ids = ["1"] # FAILED expected_result = [1] result = self.worker.coalesce_job_exit_codes(job_ids) assert result == expected_result, f"{result} != {expected_result}"
[docs] def test_coalesce_job_exit_codes_one_still_running(self): self.monkeypatch.setattr( toil.batchSystems.gridengine, "call_command", call_qstat_or_qacct ) job_ids = [ "5" ] # Still running. We currently raise an exception when this happens try: self.worker.coalesce_job_exit_codes(job_ids) except ExceededRetryAttempts: pass else: raise RuntimeError("Test did not raise an exception!")
[docs] def test_coalesce_job_exit_codes_many_all_exist(self): self.monkeypatch.setattr( toil.batchSystems.gridengine, "call_command", call_qstat_or_qacct ) job_ids = [ "1", # FAILED, "2", # FAILED (with exit code that we ignore), "3", # SUCCEEDED, "4", ] # EXIT CODE 10 # RUNNING and PENDING jobs should return None expected_result = [1, 1, 0, 10] result = self.worker.coalesce_job_exit_codes(job_ids) assert result == expected_result, f"{result} != {expected_result}"