Source code for toil.test.provisioners.gceProvisionerTest

# Copyright (C) 2015-2021 Regents of the University of California
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import os
import subprocess
from abc import abstractmethod
from uuid import uuid4

import pytest

from toil.test import (ToilTest,
                       integrative,
                       needs_fetchable_appliance,
                       needs_google_project,
                       needs_google_storage,
                       slow,
                       timeLimit)
from toil.version import exactPython

log = logging.getLogger(__name__)


[docs] @needs_google_project @needs_google_storage @integrative @needs_fetchable_appliance @slow class AbstractGCEAutoscaleTest(ToilTest): projectID = os.getenv('TOIL_GOOGLE_PROJECTID')
[docs] def sshUtil(self, command): baseCommand = ['toil', 'ssh-cluster', '--insecure', '-p=gce', self.clusterName] callCommand = baseCommand + command subprocess.check_call(callCommand)
[docs] def rsyncUtil(self, src, dest): baseCommand = ['toil', 'rsync-cluster', '--insecure', '-p=gce', self.clusterName] callCommand = baseCommand + [src, dest] subprocess.check_call(callCommand)
[docs] def destroyClusterUtil(self): callCommand = ['toil', 'destroy-cluster', '-p=gce', self.clusterName] subprocess.check_call(callCommand)
[docs] def createClusterUtil(self, args=None): if args is None: args = [] callCommand = ['toil', 'launch-cluster', self.clusterName, '-p=gce', '--keyPairName=%s' % self.keyName, '--leaderNodeType=%s' % self.leaderInstanceType, '--zone=%s' % self.googleZone] if self.botoDir is not None: callCommand += ['--boto=%s' % self.botoDir] callCommand = callCommand + args if args else callCommand log.info("createClusterUtil: %s" % ''.join(callCommand)) subprocess.check_call(callCommand)
[docs] def cleanJobStoreUtil(self): callCommand = ['toil', 'clean', self.jobStore] subprocess.check_call(callCommand)
def __init__(self, methodName): super().__init__(methodName=methodName) # TODO: add TOIL_GOOGLE_KEYNAME to needs_google_project or ssh with SA account self.keyName = os.getenv('TOIL_GOOGLE_KEYNAME') # TODO: remove this when switching to google jobstore self.botoDir = os.getenv('TOIL_BOTO_DIR') # TODO: get this from SA account or add an environment variable self.googleZone = 'us-west1-a' self.leaderInstanceType = 'n1-standard-1' self.instanceTypes = ["n1-standard-2"] self.numWorkers = ['2'] self.numSamples = 2 self.spotBid = 0.15
[docs] def setUp(self): super().setUp()
[docs] def tearDown(self): super().tearDown() self.destroyClusterUtil() self.cleanJobStoreUtil()
#def getMatchingRoles(self, clusterName): # ctx = AWSProvisioner._buildContext(clusterName) # roles = list(ctx.local_roles()) # return roles
[docs] def launchCluster(self): self.createClusterUtil()
@abstractmethod def _getScript(self): """ Download the test script needed by the inheriting unit test class. """ raise NotImplementedError() @abstractmethod def _runScript(self, toilOptions): """ Modify the provided Toil options to suit the test Toil script, then run the script with those arguments. :param toilOptions: List of Toil command line arguments. This list may need to be modified to suit the test script's requirements. """ raise NotImplementedError() def _test(self, preemptibleJobs=False): """ Does the work of the testing. Many features' test are thrown in here is no particular order """ self.launchCluster() # TODO: What is the point of this test? #assert len(self.getMatchingRoles(self.clusterName)) == 1 # TODO: Add a check of leader and node storage size if set. # --never-download prevents silent upgrades to pip, wheel and setuptools venv_command = ['virtualenv', '--system-site-packages', '--never-download', '--python', exactPython, '/home/venv'] self.sshUtil(venv_command) self._getScript() toilOptions = [self.jobStore, '--batchSystem=mesos', '--workDir=/var/lib/toil', '--clean=always', '--retryCount=2', '--clusterStats=/home/', '--logDebug', '--logFile=/home/sort.log', '--provisioner=gce'] toilOptions.extend(['--nodeTypes=' + ",".join(self.instanceTypes), '--maxNodes=%s' % ",".join(self.numWorkers)]) if preemptibleJobs: toilOptions.extend(['--defaultPreemptible']) self._runScript(toilOptions) #TODO: Does this just check if it is still running? #assert len(self.getMatchingRoles(self.clusterName)) == 1 checkStatsCommand = ['/home/venv/bin/python', '-c', 'import json; import os; ' 'json.load(open("/home/" + [f for f in os.listdir("/home/") ' 'if f.endswith(".json")].pop()))' ] self.sshUtil(checkStatsCommand)
# TODO: Add a check to make sure everything is cleaned up.
[docs] @pytest.mark.timeout(1600) class GCEAutoscaleTest(AbstractGCEAutoscaleTest): def __init__(self, name): super().__init__(name) self.clusterName = 'provisioner-test-' + str(uuid4()) self.requestedLeaderStorage = 80
[docs] def setUp(self): super().setUp() self.jobStore = f'google:{self.projectID}:autoscale-{uuid4()}'
def _getScript(self): # TODO: Isn't this the key file? fileToSort = os.path.join(os.getcwd(), str(uuid4())) with open(fileToSort, 'w') as f: # Fixme: making this file larger causes the test to hang f.write('01234567890123456789012345678901') self.rsyncUtil(os.path.join(self._projectRootPath(), 'src/toil/test/sort/sort.py'), ':/home/sort.py') self.rsyncUtil(fileToSort, ':/home/sortFile') os.unlink(fileToSort) def _runScript(self, toilOptions): runCommand = ['/home/venv/bin/python', '/home/sort.py', '--fileToSort=/home/sortFile'] #'--sseKey=/home/sortFile'] runCommand.extend(toilOptions) log.info("_runScript: %s" % ''.join(runCommand)) self.sshUtil(runCommand)
[docs] def launchCluster(self): # add arguments to test that we can specify leader storage self.createClusterUtil(args=['--leaderStorage', str(self.requestedLeaderStorage)])
# TODO: aren't these checks inherited?
[docs] @integrative @needs_google_project @needs_google_storage def testAutoScale(self): self.instanceTypes = ["n1-standard-2"] self.numWorkers = ['2'] self._test()
[docs] @integrative @needs_google_project @needs_google_storage def testSpotAutoScale(self): self.instanceTypes = ["n1-standard-2:%f" % self.spotBid] # Some spot workers have a stopped state after being started, strangely. # This could be the natural preemption process, but it seems too rapid. self.numWorkers = ['3'] # Try 3 to account for a stopped node. self._test(preemptibleJobs=True)
[docs] @pytest.mark.timeout(1600) class GCEStaticAutoscaleTest(GCEAutoscaleTest): """ Runs the tests on a statically provisioned cluster with autoscaling enabled. """ def __init__(self, name): super().__init__(name) self.requestedNodeStorage = 20
[docs] def launchCluster(self): self.createClusterUtil(args=['--leaderStorage', str(self.requestedLeaderStorage), '--nodeTypes', ",".join(self.instanceTypes), '-w', ",".join(self.numWorkers), '--nodeStorage', str(self.requestedLeaderStorage)])
# TODO: check the number of workers and their storage #nodes = AWSProvisioner._getNodesInCluster(ctx, self.clusterName, both=True) #nodes.sort(key=lambda x: x.launch_time) # assuming that leader is first #workers = nodes[1:] # test that two worker nodes were created #self.assertEqual(2, len(workers)) # test that workers have expected storage size # just use the first worker #worker = workers[0] #worker = next(wait_instances_running(ctx.ec2, [worker])) #rootBlockDevice = worker.block_device_mapping["/dev/xvda"] #self.assertTrue(isinstance(rootBlockDevice, BlockDeviceType)) #rootVolume = ctx.ec2.get_all_volumes(volume_ids=[rootBlockDevice.volume_id])[0] #self.assertGreaterEqual(rootVolume.size, self.requestedNodeStorage) def _runScript(self, toilOptions): runCommand = ['/home/venv/bin/python', '/home/sort.py', '--fileToSort=/home/sortFile'] runCommand.extend(toilOptions) log.info("_runScript: %s" % ''.join(runCommand)) self.sshUtil(runCommand)
[docs] @pytest.mark.timeout(1800) class GCEAutoscaleTestMultipleNodeTypes(AbstractGCEAutoscaleTest): def __init__(self, name): super().__init__(name) self.clusterName = 'provisioner-test-' + str(uuid4())
[docs] def setUp(self): super().setUp() self.jobStore = f'google:{self.projectID}:multinode-{uuid4()}'
def _getScript(self): sseKeyFile = os.path.join(os.getcwd(), 'keyFile') with open(sseKeyFile, 'w') as f: f.write('01234567890123456789012345678901') self.rsyncUtil(os.path.join(self._projectRootPath(), 'src/toil/test/sort/sort.py'), ':/home/sort.py') self.rsyncUtil(sseKeyFile, ':/home/keyFile') os.unlink(sseKeyFile) def _runScript(self, toilOptions): #Set memory requirements so that sort jobs can be run # on small instances, but merge jobs must be run on large # instances runCommand = ['/home/venv/bin/python', '/home/sort.py', '--fileToSort=/home/s3am/bin/asadmin', '--sortMemory=0.6G', '--mergeMemory=3.0G'] runCommand.extend(toilOptions) #runCommand.append('--sseKey=/home/keyFile') log.info("_runScript: %s" % ''.join(runCommand)) self.sshUtil(runCommand)
[docs] @integrative @needs_google_project @needs_google_storage def testAutoScale(self): self.instanceTypes = ["n1-standard-2", "n1-standard-4"] self.numWorkers = ['2','1'] self._test()
[docs] @pytest.mark.timeout(1800) class GCERestartTest(AbstractGCEAutoscaleTest): """ This test insures autoscaling works on a restarted Toil run """ def __init__(self, name): super().__init__(name) self.clusterName = 'restart-test-' + str(uuid4())
[docs] def setUp(self): super().setUp() self.instanceTypes = ['n1-standard-1'] self.numWorkers = ['1'] self.scriptName = "/home/restartScript.py" # TODO: replace this with a google job store zone = 'us-west-2' self.jobStore = f'google:{self.projectID}:restart-{uuid4()}'
def _getScript(self): self.rsyncUtil(os.path.join(self._projectRootPath(), 'src/toil/test/provisioners/restartScript.py'), ':'+self.scriptName) def _runScript(self, toilOptions): # clean = onSuccess disallowedOptions = ['--clean=always', '--retryCount=2'] newOptions = [option for option in toilOptions if option not in disallowedOptions] try: # include a default memory - on restart the minimum memory requirement is the default, usually 2 GB command = ['/home/venv/bin/python', self.scriptName, '-e', 'FAIL=true', '--defaultMemory=50000000'] command.extend(newOptions) self.sshUtil(command) except subprocess.CalledProcessError: pass else: self.fail('Command succeeded when we expected failure') with timeLimit(1200): command = ['/home/venv/bin/python', self.scriptName, '--restart', '--defaultMemory=50000000'] command.extend(toilOptions) self.sshUtil(command)
[docs] @integrative def testAutoScaledCluster(self): self._test()