Source code for toil.test.batchSystems.parasolTestSupport

# Copyright (C) 2015-2021 Regents of the University of California
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import os
import signal
import subprocess
import tempfile
import threading
import time

from toil import physicalMemory
from toil.lib.objects import InnerClass
from toil.lib.threading import cpu_count

log = logging.getLogger(__name__)


[docs]class ParasolTestSupport: """ For test cases that need a running Parasol leader and worker on the local host """ def _startParasol(self, numCores=None, memory=None): if numCores is None: numCores = cpu_count() if memory is None: memory = physicalMemory() self.numCores = numCores self.memory = memory self.leader = self.ParasolLeaderThread() self.leader.start() self.worker = self.ParasolWorkerThread() self.worker.start() while self.leader.popen is None or self.worker.popen is None: log.info('Waiting for leader and worker processes') time.sleep(.1) def _stopParasol(self): self.worker.popen.kill() self.worker.join() self.leader.popen.kill() self.leader.join() for path in ('para.results', 'parasol.jid'): if os.path.exists(path): os.remove(path)
[docs] class ParasolThread(threading.Thread): # Lock is used because subprocess is NOT thread safe: http://tinyurl.com/pkp5pgq lock = threading.Lock() def __init__(self): threading.Thread.__init__(self) self.popen = None
[docs] def parasolCommand(self): raise NotImplementedError
[docs] def run(self): command = self.parasolCommand() with self.lock: self.popen = subprocess.Popen(command) status = self.popen.wait() if status != 0 and status != -signal.SIGKILL: log.error("Command '%s' failed with %i.", command, status) raise subprocess.CalledProcessError(status, command) log.info('Exiting %s', self.__class__.__name__)
[docs] @InnerClass class ParasolLeaderThread(ParasolThread): def __init__(self): super().__init__() self.machineList = None
[docs] def run(self): with tempfile.NamedTemporaryFile(prefix='machineList.txt', mode='w') as f: self.machineList = f.name # name - Network name # cpus - Number of CPUs we can use # ramSize - Megabytes of memory # tempDir - Location of (local) temp dir # localDir - Location of local data dir # localSize - Megabytes of local disk # switchName - Name of switch this is on f.write('localhost {numCores} {ramSize} {tempDir} {tempDir} 1024 foo'.format( numCores=self.outer.numCores, tempDir=tempfile.gettempdir(), ramSize=self.outer.memory / 1024 / 1024)) f.flush() super().run()
[docs] def parasolCommand(self): return ['paraHub', '-spokes=1', '-debug', self.machineList]
[docs] @InnerClass class ParasolWorkerThread(ParasolThread):
[docs] def parasolCommand(self): return ['paraNode', '-cpu=%i' % self.outer.numCores, '-randomDelay=0', '-debug', 'start']