Source code for toil.test.utils.toilKillTest

# Copyright (C) 2015-2022 Regents of the University of California
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import logging
import os
import shutil
import subprocess
import sys
import time
import unittest

from toil.common import Toil
from toil.jobStores.abstractJobStore import (NoSuchFileException,
                                             NoSuchJobStoreException)
from toil.jobStores.utils import generate_locator
from toil.test import ToilTest, needs_aws_s3, needs_cwl

logger = logging.getLogger(__name__)

pkg_root = os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))  # noqa
sys.path.insert(0, pkg_root)  # noqa


[docs] class ToilKillTest(ToilTest): """A set of test cases for "toil kill".""" def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.job_store = os.path.join(os.getcwd(), "testkill")
[docs] def setUp(self): """Shared test variables.""" self.cwl = os.path.abspath("src/toil/test/utils/ABCWorkflowDebug/sleep.cwl") self.yaml = os.path.abspath("src/toil/test/utils/ABCWorkflowDebug/sleep.yaml")
[docs] def tearDown(self): """Default tearDown for unittest.""" cmd = ["toil", "clean", self.job_store] subprocess.check_call(cmd) if os.path.exists("tmp"): shutil.rmtree("tmp") unittest.TestCase.tearDown(self)
[docs] @needs_cwl def test_cwl_toil_kill(self): """Test "toil kill" on a CWL workflow with a 100 second sleep.""" run_cmd = ["toil-cwl-runner", "--jobStore", self.job_store, self.cwl, self.yaml] kill_cmd = ["toil", "kill", self.job_store] # run the sleep workflow logger.info('Running workflow: %s', ' '.join(run_cmd)) cwl_process = subprocess.Popen(run_cmd) # wait until workflow starts running while True: assert cwl_process.poll() is None, "toil-cwl-runner finished too soon" try: job_store = Toil.resumeJobStore(self.job_store) job_store.read_leader_pid() # pid file exists, now wait for the kill flag to exist if not job_store.read_kill_flag(): # kill flag exists to be deleted to kill the leader break else: logger.info('Waiting for kill flag...') except (NoSuchJobStoreException, NoSuchFileException): logger.info('Waiting for job store to be openable...') time.sleep(2) # run toil kill subprocess.check_call(kill_cmd) # after toil kill succeeds, the workflow should've exited assert cwl_process.poll() is None
[docs] @needs_aws_s3 class ToilKillTestWithAWSJobStore(ToilKillTest): """A set of test cases for "toil kill" using the AWS job store.""" def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.job_store = generate_locator("aws", decoration="testkill")
if __name__ == "__main__": unittest.main() # run all tests