# Copyright (C) 2015-2021 Regents of the University of California
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import datetime
import logging
import random
import time
import uuid
from argparse import Namespace
from collections import defaultdict
from queue import Empty, Queue
from threading import Event, Thread
from typing import Optional
from unittest.mock import MagicMock
from toil.batchSystems.abstractBatchSystem import (
AbstractBatchSystem,
AbstractScalableBatchSystem,
NodeInfo,
)
from toil.common import Config
from toil.job import JobDescription
from toil.lib.conversions import human2bytes as h2b
from toil.options.common import defaultTargetTime
from toil.provisioners.abstractProvisioner import AbstractProvisioner, Shape
from toil.provisioners.clusterScaler import (
BinPackedFit,
ClusterScaler,
NodeReservation,
ScalerThread,
)
from toil.provisioners.node import Node
from toil.test import ToilTest, slow
logger = logging.getLogger(__name__)
# simplified c4.8xlarge (preemptible)
c4_8xlarge_preemptible = Shape(
wallTime=3600, memory=h2b("60G"), cores=36, disk=h2b("100G"), preemptible=True
)
# simplified c4.8xlarge (non-preemptible)
c4_8xlarge = Shape(
wallTime=3600, memory=h2b("60G"), cores=36, disk=h2b("100G"), preemptible=False
)
# simplified r3.8xlarge (non-preemptible)
r3_8xlarge = Shape(
wallTime=3600, memory=h2b("260G"), cores=32, disk=h2b("600G"), preemptible=False
)
# simplified r5.2xlarge (non-preemptible)
r5_2xlarge = Shape(
wallTime=3600, memory=h2b("64Gi"), cores=8, disk=h2b("50G"), preemptible=False
)
# simplified r5.4xlarge (non-preemptible)
r5_4xlarge = Shape(
wallTime=3600, memory=h2b("128Gi"), cores=16, disk=h2b("50G"), preemptible=False
)
# simplified t2.micro (non-preemptible)
t2_micro = Shape(
wallTime=3600, memory=h2b("1G"), cores=1, disk=h2b("8G"), preemptible=False
)
[docs]
class BinPackingTest(ToilTest):
[docs]
def setUp(self):
self.node_shapes_for_testing = [c4_8xlarge_preemptible, r3_8xlarge]
self.bpf = BinPackedFit(self.node_shapes_for_testing)
[docs]
def testPackingOneShape(self):
"""Pack one shape and check that the resulting reservations look sane."""
self.bpf.nodeReservations[c4_8xlarge_preemptible] = [
NodeReservation(c4_8xlarge_preemptible)
]
self.bpf.addJobShape(
Shape(
wallTime=1000,
cores=2,
memory=h2b("1G"),
disk=h2b("2G"),
preemptible=True,
)
)
self.assertEqual(self.bpf.nodeReservations[r3_8xlarge], [])
self.assertEqual(
[x.shapes() for x in self.bpf.nodeReservations[c4_8xlarge_preemptible]],
[
[
Shape(
wallTime=1000,
memory=h2b("59G"),
cores=34,
disk=h2b("98G"),
preemptible=True,
),
Shape(
wallTime=2600,
memory=h2b("60G"),
cores=36,
disk=h2b("100G"),
preemptible=True,
),
]
],
)
[docs]
def testSorting(self):
"""
Test that sorting is correct: preemptible, then memory, then cores, then disk,
then wallTime.
"""
shapeList = [
c4_8xlarge_preemptible,
r3_8xlarge,
c4_8xlarge,
c4_8xlarge,
t2_micro,
t2_micro,
c4_8xlarge,
r3_8xlarge,
r3_8xlarge,
t2_micro,
]
shapeList.sort()
assert shapeList == [
c4_8xlarge_preemptible,
t2_micro,
t2_micro,
t2_micro,
c4_8xlarge,
c4_8xlarge,
c4_8xlarge,
r3_8xlarge,
r3_8xlarge,
r3_8xlarge,
]
[docs]
def testAddingInitialNode(self):
"""Pack one shape when no nodes are available and confirm that we fit one node properly."""
self.bpf.addJobShape(
Shape(
wallTime=1000,
cores=2,
memory=h2b("1G"),
disk=h2b("2G"),
preemptible=True,
)
)
self.assertEqual(
[x.shapes() for x in self.bpf.nodeReservations[c4_8xlarge_preemptible]],
[
[
Shape(
wallTime=1000,
memory=h2b("59G"),
cores=34,
disk=h2b("98G"),
preemptible=True,
),
Shape(
wallTime=2600,
memory=h2b("60G"),
cores=36,
disk=h2b("100G"),
preemptible=True,
),
]
],
)
[docs]
def testLowTargetTime(self):
"""
Test that a low targetTime (0) parallelizes jobs aggressively (1000 queued jobs require
1000 nodes).
Ideally, low targetTime means: Start quickly and maximize parallelization after the
cpu/disk/mem have been packed.
Disk/cpu/mem packing is prioritized first, so we set job resource reqs so that each
t2.micro (1 cpu/8G disk/1G RAM) can only run one job at a time with its resources.
Each job is parametrized to take 300 seconds, so (the minimum of) 1 of them should fit into
each node's 0 second window, so we expect 1000 nodes.
"""
allocation = self.run1000JobsOnMicros(
jobCores=1,
jobMem=h2b("1G"),
jobDisk=h2b("1G"),
jobTime=300,
globalTargetTime=0,
)
self.assertEqual(allocation, {t2_micro: 1000})
[docs]
def testHighTargetTime(self):
"""
Test that a high targetTime (3600 seconds) maximizes packing within the targetTime.
Ideally, high targetTime means: Maximize packing within the targetTime after the
cpu/disk/mem have been packed.
Disk/cpu/mem packing is prioritized first, so we set job resource reqs so that each
t2.micro (1 cpu/8G disk/1G RAM) can only run one job at a time with its resources.
Each job is parametrized to take 300 seconds, so 12 of them should fit into each node's
3600 second window. 1000/12 = 83.33, so we expect 84 nodes.
"""
allocation = self.run1000JobsOnMicros(
jobCores=1,
jobMem=h2b("1G"),
jobDisk=h2b("1G"),
jobTime=300,
globalTargetTime=3600,
)
self.assertEqual(allocation, {t2_micro: 84})
[docs]
def testZeroResourceJobs(self):
"""
Test that jobs requiring zero cpu/disk/mem pack first, regardless of targetTime.
Disk/cpu/mem packing is prioritized first, so we set job resource reqs so that each
t2.micro (1 cpu/8G disk/1G RAM) can run a seemingly infinite number of jobs with its
resources.
Since all jobs should pack cpu/disk/mem-wise on a t2.micro, we expect only one t2.micro to
be provisioned. If we raise this, as in testLowTargetTime, it will launch 1000 t2.micros.
"""
allocation = self.run1000JobsOnMicros(
jobCores=0, jobMem=0, jobDisk=0, jobTime=300, globalTargetTime=0
)
self.assertEqual(allocation, {t2_micro: 1})
[docs]
def testLongRunningJobs(self):
"""
Test that jobs with long run times (especially service jobs) are aggressively parallelized.
This is important, because services are one case where the degree of parallelization
really, really matters. If you have multiple services, they may all need to be running
simultaneously before any real work can be done.
Despite setting globalTargetTime=3600, this should launch 1000 t2.micros because each job's
estimated runtime (30000 seconds) extends well beyond 3600 seconds.
"""
allocation = self.run1000JobsOnMicros(
jobCores=1,
jobMem=h2b("1G"),
jobDisk=h2b("1G"),
jobTime=30000,
globalTargetTime=3600,
)
self.assertEqual(allocation, {t2_micro: 1000})
[docs]
def run1000JobsOnMicros(self, jobCores, jobMem, jobDisk, jobTime, globalTargetTime):
"""Test packing 1000 jobs on t2.micros. Depending on the targetTime and resources,
these should pack differently.
"""
node_shapes_for_testing = [t2_micro]
bpf = BinPackedFit(node_shapes_for_testing, targetTime=globalTargetTime)
for _ in range(1000):
bpf.addJobShape(
Shape(
wallTime=jobTime,
memory=jobMem,
cores=jobCores,
disk=jobDisk,
preemptible=False,
)
)
return bpf.getRequiredNodes()
[docs]
def testPathologicalCase(self):
"""Test a pathological case where only one node can be requested to fit months' worth of jobs.
If the reservation is extended to fit a long job, and the
bin-packer naively searches through all the reservation slices
to find the first slice that fits, it will happily assign the
first slot that fits the job, even if that slot occurs days in
the future.
"""
# Add one job that partially fills an r3.8xlarge for 1000 hours
self.bpf.addJobShape(
Shape(
wallTime=3600000,
memory=h2b("10G"),
cores=0,
disk=h2b("10G"),
preemptible=False,
)
)
for _ in range(500):
# Add 500 CPU-hours worth of jobs that fill an r3.8xlarge
self.bpf.addJobShape(
Shape(
wallTime=3600,
memory=h2b("26G"),
cores=32,
disk=h2b("60G"),
preemptible=False,
)
)
# Hopefully we didn't assign just one node to cover all those jobs.
self.assertNotEqual(
self.bpf.getRequiredNodes(), {r3_8xlarge: 1, c4_8xlarge_preemptible: 0}
)
# If we got here we didn't crash.
[docs]
class ClusterScalerTest(ToilTest):
[docs]
def setUp(self):
super().setUp()
self.config = Config()
self.config.targetTime = 1800
self.config.nodeTypes = [r3_8xlarge, c4_8xlarge_preemptible]
# Set up the mock leader
self.leader = MockBatchSystemAndProvisioner(config=self.config, secondsPerJob=1)
# It is also a full mock provisioner, so configure it to be that as well
self.provisioner = self.leader
# Pretend that Shapes are actually strings we can use for instance type names.
self.provisioner.setAutoscaledNodeTypes(
[({t}, None) for t in self.config.nodeTypes]
)
[docs]
def testRounding(self):
"""
Test to make sure the ClusterScaler's rounding rounds properly.
"""
# Get a ClusterScaler
self.config.targetTime = 1
self.config.betaInertia = 0.0
self.config.maxNodes = [2, 3]
scaler = ClusterScaler(self.provisioner, self.leader, self.config)
# Exact integers round to themselves
self.assertEqual(scaler._round(0.0), 0)
self.assertEqual(scaler._round(1.0), 1)
self.assertEqual(scaler._round(-1.0), -1)
self.assertEqual(scaler._round(123456789101112.13), 123456789101112)
# Decimals other than X.5 round to the side they are closer to
self.assertEqual(scaler._round(1e-10), 0)
self.assertEqual(scaler._round(0.5 + 1e-15), 1)
self.assertEqual(scaler._round(-0.9), -1)
self.assertEqual(scaler._round(-0.4), 0)
# Decimals at exactly X.5 round away from 0
self.assertEqual(scaler._round(0.5), 1)
self.assertEqual(scaler._round(-0.5), -1)
self.assertEqual(scaler._round(2.5), 3)
self.assertEqual(scaler._round(-2.5), -3)
self.assertEqual(scaler._round(15.5), 16)
self.assertEqual(scaler._round(-15.5), -16)
self.assertEqual(scaler._round(123456789101112.5), 123456789101113)
[docs]
def testMaxNodes(self):
"""
Set the scaler to be very aggressive, give it a ton of jobs, and
make sure it doesn't go over maxNodes.
"""
self.config.targetTime = 1
self.config.betaInertia = 0.0
self.config.maxNodes = [2, 3]
scaler = ClusterScaler(self.provisioner, self.leader, self.config)
jobShapes = [
Shape(
wallTime=3600,
cores=2,
memory=h2b("1G"),
disk=h2b("2G"),
preemptible=True,
)
] * 1000
jobShapes.extend(
[
Shape(
wallTime=3600,
cores=2,
memory=h2b("1G"),
disk=h2b("2G"),
preemptible=False,
)
]
* 1000
)
estimatedNodeCounts, could_not_fit = scaler.getEstimatedNodeCounts(
jobShapes, defaultdict(int)
)
self.assertEqual(estimatedNodeCounts[r3_8xlarge], 2)
self.assertEqual(estimatedNodeCounts[c4_8xlarge_preemptible], 3)
self.assertEqual(len(could_not_fit), 0)
[docs]
def testMinNodes(self):
"""
Without any jobs queued, the scaler should still estimate "minNodes" nodes.
"""
self.config.betaInertia = 0.0
self.config.minNodes = [2, 3]
scaler = ClusterScaler(self.provisioner, self.leader, self.config)
jobShapes = []
estimatedNodeCounts, could_not_fit = scaler.getEstimatedNodeCounts(
jobShapes, defaultdict(int)
)
self.assertEqual(estimatedNodeCounts[r3_8xlarge], 2)
self.assertEqual(estimatedNodeCounts[c4_8xlarge_preemptible], 3)
self.assertEqual(len(could_not_fit), 0)
[docs]
def testPreemptibleDeficitResponse(self):
"""
When a preemptible deficit was detected by a previous run of the
loop, the scaler should add non-preemptible nodes to
compensate in proportion to preemptibleCompensation.
"""
self.config.targetTime = 1
self.config.betaInertia = 0.0
self.config.maxNodes = [10, 10]
# This should mean that one non-preemptible node is launched
# for every two preemptible nodes "missing".
self.config.preemptibleCompensation = 0.5
# In this case, we want to explicitly set up the config so
# that we can have preemptible and non-preemptible nodes of
# the same type. That is the only situation where
# preemptibleCompensation applies.
self.config.nodeTypes = [c4_8xlarge_preemptible, c4_8xlarge]
self.provisioner.setAutoscaledNodeTypes(
[({t}, None) for t in self.config.nodeTypes]
)
scaler = ClusterScaler(self.provisioner, self.leader, self.config)
# Simulate a situation where a previous run caused a
# "deficit" of 5 preemptible nodes (e.g. a spot bid was lost)
scaler.preemptibleNodeDeficit[c4_8xlarge] = 5
# Add a bunch of preemptible jobs (so the bin-packing
# estimate for the non-preemptible node should still be 0)
jobShapes = [
Shape(
wallTime=3600,
cores=2,
memory=h2b("1G"),
disk=h2b("2G"),
preemptible=True,
)
] * 1000
estimatedNodeCounts, could_not_fit = scaler.getEstimatedNodeCounts(
jobShapes, defaultdict(int)
)
# We don't care about the estimated size of the preemptible
# nodes. All we want to know is if we responded to the deficit
# properly: 0.5 * 5 (preemptibleCompensation * the deficit) = 3 (rounded up).
self.assertEqual(
estimatedNodeCounts[self.provisioner.node_shapes_for_testing[1]], 3
)
self.assertEqual(len(could_not_fit), 0)
[docs]
def testPreemptibleDeficitIsSet(self):
"""
Make sure that updateClusterSize sets the preemptible deficit if
it can't launch preemptible nodes properly. That way, the
deficit can be communicated to the next run of
estimateNodeCount.
"""
# Mock out addNodes. We want to pretend it had trouble
# launching all 5 nodes, and could only launch 3.
self.provisioner.addNodes = MagicMock(return_value=3)
# Pretend there are no nodes in the cluster right now
self.provisioner.getProvisionedWorkers = MagicMock(return_value=[])
# In this case, we want to explicitly set up the config so
# that we can have preemptible and non-preemptible nodes of
# the same type. That is the only situation where
# preemptibleCompensation applies.
self.config.nodeTypes = [c4_8xlarge_preemptible, c4_8xlarge]
self.provisioner.setAutoscaledNodeTypes(
[({t}, None) for t in self.config.nodeTypes]
)
scaler = ClusterScaler(self.provisioner, self.leader, self.config)
estimatedNodeCounts = {c4_8xlarge_preemptible: 5, c4_8xlarge: 0}
scaler.updateClusterSize(estimatedNodeCounts)
self.assertEqual(scaler.preemptibleNodeDeficit[c4_8xlarge_preemptible], 2)
self.provisioner.addNodes.assert_called_once()
# OK, now pretend this is a while later, and actually launched
# the nodes properly. The deficit should disappear
self.provisioner.addNodes = MagicMock(return_value=5)
scaler.updateClusterSize(estimatedNodeCounts)
self.assertEqual(scaler.preemptibleNodeDeficit[c4_8xlarge], 0)
[docs]
def testNoLaunchingIfDeltaAlreadyMet(self):
"""
Check that the scaler doesn't try to launch "0" more instances if
the delta was able to be met by unignoring nodes.
"""
# We have only one node type for simplicity
self.provisioner.setAutoscaledNodeTypes([({c4_8xlarge}, None)])
scaler = ClusterScaler(self.provisioner, self.leader, self.config)
# Pretend there is one ignored worker in the cluster
self.provisioner.getProvisionedWorkers = MagicMock(
return_value=[
Node(
"127.0.0.1",
"127.0.0.1",
"testNode",
datetime.datetime.now().isoformat(),
nodeType=c4_8xlarge,
preemptible=True,
)
]
)
scaler.ignoredNodes.add("127.0.0.1")
# Exercise the updateClusterSize logic
self.provisioner.addNodes = MagicMock()
scaler.updateClusterSize({c4_8xlarge: 1})
self.assertFalse(
self.provisioner.addNodes.called,
"addNodes was called when no new nodes were needed",
)
self.assertEqual(
len(scaler.ignoredNodes),
0,
"The scaler didn't unignore an ignored node when " "scaling up",
)
[docs]
def testBetaInertia(self):
# This is really high, but makes things easy to calculate.
self.config.betaInertia = 0.5
scaler = ClusterScaler(self.provisioner, self.leader, self.config)
# OK, smoothing things this much should get us 50% of the way to 100.
self.assertEqual(scaler.smoothEstimate(c4_8xlarge_preemptible, 100), 50)
# Now we should be at 75%.
self.assertEqual(scaler.smoothEstimate(c4_8xlarge_preemptible, 100), 75)
# We should eventually converge on our estimate as long as betaInertia is below 1.
for _ in range(1000):
scaler.smoothEstimate(c4_8xlarge_preemptible, 100)
self.assertEqual(scaler.smoothEstimate(c4_8xlarge_preemptible, 100), 100)
[docs]
def test_overhead_accounting_large(self):
"""
If a node has a certain raw memory or disk capacity, that won't all be
available when it actually comes up; some disk and memory will be used
by the OS, and the backing scheduler (Mesos, Kubernetes, etc.).
Make sure this overhead is accounted for for large nodes.
"""
# Use a small node (60G) and a big node (260G)
# If the job needs 100% of the memory of the instance type, it won't
# fit and will need a bigger node.
self._check_job_estimate([(c4_8xlarge, 0), (r3_8xlarge, 1)], memory=h2b("60G"))
# If the job needs 98% of the memory of the instance type, it won't
# fit and will need a bigger node.
self._check_job_estimate(
[(c4_8xlarge, 0), (r3_8xlarge, 1)], memory=int(h2b("60G") * 0.98)
)
# If the job needs 90% of the memory of the instance type, it will fit.
self._check_job_estimate(
[(c4_8xlarge, 1), (r3_8xlarge, 0)], memory=int(h2b("60G") * 0.90)
)
# If the job needs 100% of the disk of the instance type, it won't
# fit and will need a bigger node.
self._check_job_estimate([(c4_8xlarge, 0), (r3_8xlarge, 1)], disk=h2b("100G"))
# If the job needs all but 7G of the disk of the instance type, it won't
# fit and will need a bigger node.
self._check_job_estimate([(c4_8xlarge, 0), (r3_8xlarge, 1)], disk=h2b("93G"))
# If the job leaves 10% and 10G of the disk free, it fits
self._check_job_estimate([(c4_8xlarge, 1), (r3_8xlarge, 0)], disk=h2b("90G"))
[docs]
def test_overhead_accounting_small(self):
"""
If a node has a certain raw memory or disk capacity, that won't all be
available when it actually comes up; some disk and memory will be used
by the OS, and the backing scheduler (Mesos, Kubernetes, etc.).
Make sure this overhead is accounted for for small nodes.
"""
# Use a small node (1G) and a big node (260G)
# If the job needs 100% of the memory of the instance type, it won't
# fit and will need a bigger node.
self._check_job_estimate([(t2_micro, 0), (r3_8xlarge, 1)], memory=h2b("1G"))
# If the job needs all but 100M of the memory of the instance type, it
# won't fit and will need a bigger node.
self._check_job_estimate(
[(t2_micro, 0), (r3_8xlarge, 1)], memory=h2b("1G") - h2b("100M")
)
# If the job needs no more than 90% of the memory on the node *and*
# leaves at least 384M free for overhead, we can rely on it fitting on a 1G
# memory node.
jobs = [
Shape(
wallTime=3600,
cores=1,
memory=h2b("1G") - h2b("384M"),
disk=h2b("2G"),
preemptible=True,
)
]
self._check_job_estimate(
[(t2_micro, 1), (r3_8xlarge, 0)], memory=h2b("1G") - h2b("384M")
)
[docs]
def test_overhead_accounting_observed(self):
"""
If a node has a certain raw memory or disk capacity, that won't all be
available when it actually comes up; some disk and memory will be used
by the OS, and the backing scheduler (Mesos, Kubernetes, etc.).
Make sure this overhead is accounted for so that real-world observed
failures cannot happen again.
"""
# In
# https://github.com/DataBiosphere/toil/issues/4147#issuecomment-1179587561
# a user observed what seems to be a nominally 64 GiB node
# (r5.2xlarge?) with Mesos reporting "61.0 GB" available, and a
# nominally 32 GiB node with Mesos reporting "29.9 GB" available. It's
# not clear if Mesos is thinking in actual GB or GiB here.
# A 62.5Gi job is sent to the larger node
self._check_job_estimate(
[(r5_2xlarge, 0), (r5_4xlarge, 1)], memory=h2b("62.5 Gi")
)
def _check_job_estimate(
self, nodes: list[tuple[Shape, int]], cores=1, memory=1, disk=1
) -> None:
"""
Make sure that a job with the given requirements, when run on the given
nodes, produces the given numbers of them.
"""
self.provisioner.setAutoscaledNodeTypes([({node}, None) for node, _ in nodes])
self.config.targetTime = 1
self.config.betaInertia = 0.0
# We only need up to one node
self.config.maxNodes = [1] * len(nodes)
scaler = ClusterScaler(self.provisioner, self.leader, self.config)
jobs = [
Shape(
wallTime=3600, cores=cores, memory=memory, disk=disk, preemptible=True
)
]
logger.debug("Try and fit jobs: %s", jobs)
counts, could_not_fit = scaler.getEstimatedNodeCounts(jobs, defaultdict(int))
for node, count in nodes:
seen_count = counts.get(node, 0)
if seen_count != count:
logger.error("Saw %s/%s instances of node %s", seen_count, count, node)
self.assertEqual(seen_count, count)
self.assertEqual(len(could_not_fit), 0)
[docs]
class ScalerThreadTest(ToilTest):
def _testClusterScaling(self, config, numJobs, numPreemptibleJobs, jobShape):
"""
Test the ClusterScaler class with different patterns of job creation. Tests ascertain that
autoscaling occurs and that all the jobs are run.
"""
# First do simple test of creating 100 preemptible and non-premptable jobs and check the
# jobs are completed okay, then print the amount of worker time expended and the total
# number of worker nodes used.
mock = MockBatchSystemAndProvisioner(config=config, secondsPerJob=2.0)
mock.setAutoscaledNodeTypes([({t}, None) for t in config.nodeTypes])
mock.start()
clusterScaler = ScalerThread(mock, mock, config, stop_on_exception=True)
clusterScaler.start()
try:
# Add 100 jobs to complete
list(map(lambda x: mock.addJob(jobShape=jobShape), list(range(numJobs))))
list(
map(
lambda x: mock.addJob(jobShape=jobShape, preemptible=True),
list(range(numPreemptibleJobs)),
)
)
# Add some completed jobs
for preemptible in (True, False):
if (
preemptible
and numPreemptibleJobs > 0
or not preemptible
and numJobs > 0
):
# Add 1000 random jobs
for _ in range(1000):
x = mock.getNodeShape(nodeType=jobShape)
iJ = JobDescription(
requirements=dict(
memory=random.randrange(1, x.memory),
cores=random.randrange(1, x.cores),
disk=random.randrange(1, x.disk),
preemptible=preemptible,
),
jobName="testClusterScaling",
unitName="",
)
clusterScaler.addCompletedJob(
iJ, random.choice(list(range(1, x.wallTime)))
)
startTime = time.time()
# Wait while the cluster processes the jobs
while (
mock.getNumberOfJobsIssued(preemptible=False) > 0
or mock.getNumberOfJobsIssued(preemptible=True) > 0
or mock.getNumberOfNodes() > 0
or mock.getNumberOfNodes(preemptible=True) > 0
):
logger.debug(
"Running, non-preemptible queue size: %s, non-preemptible workers: %s, "
"preemptible queue size: %s, preemptible workers: %s"
% (
mock.getNumberOfJobsIssued(preemptible=False),
mock.getNumberOfNodes(preemptible=False),
mock.getNumberOfJobsIssued(preemptible=True),
mock.getNumberOfNodes(preemptible=True),
)
)
clusterScaler.check()
time.sleep(0.5)
logger.debug(
"We waited %s for cluster to finish" % (time.time() - startTime)
)
finally:
clusterScaler.shutdown()
mock.shutDown()
# Print some info about the autoscaling
logger.debug(
"Total-jobs: %s: Max-workers: %s, "
"Total-worker-time: %s, Worker-time-per-job: %s"
% (
mock.totalJobs,
sum(mock.maxWorkers.values()),
mock.totalWorkerTime,
mock.totalWorkerTime // mock.totalJobs if mock.totalJobs > 0 else 0.0,
)
)
[docs]
@slow
def testClusterScaling(self):
"""
Test scaling for a batch of non-preemptible jobs and no preemptible jobs (makes debugging
easier).
"""
config = Config()
# Make defaults dummy values
config.defaultMemory = h2b("1Gi")
config.defaultCores = 1
config.defaultDisk = h2b("1Gi")
# No preemptible nodes/jobs
config.maxPreemptibleNodes = [] # No preemptible nodes
# Non-preemptible parameters
config.nodeTypes = [Shape(20, h2b("10Gi"), 10, h2b("100Gi"), False)]
config.minNodes = [0]
config.maxNodes = [10]
# Algorithm parameters
config.targetTime = defaultTargetTime
config.betaInertia = 0.1
config.scaleInterval = 3
self._testClusterScaling(
config,
numJobs=100,
numPreemptibleJobs=0,
jobShape=Shape(20, h2b("7Gi"), 10, h2b("80Gi"), False),
)
[docs]
@slow
def testClusterScalingMultipleNodeTypes(self):
small_node = Shape(20, h2b("5Gi"), 10, h2b("20Gi"), False)
small_job = Shape(20, h2b("3Gi"), 10, h2b("4Gi"), False)
medium_node = Shape(20, h2b("10Gi"), 10, h2b("20Gi"), False)
medium_job = Shape(20, h2b("7Gi"), 10, h2b("4Gi"), False)
large_node = Shape(20, h2b("20Gi"), 10, h2b("20Gi"), False)
large_job = Shape(20, h2b("16Gi"), 10, h2b("4Gi"), False)
numJobs = 100
config = Config()
# Make defaults dummy values
config.defaultMemory = h2b("1Gi")
config.defaultCores = 1
config.defaultDisk = h2b("1Gi")
# No preemptible nodes/jobs
config.preemptibleNodeTypes = []
config.minPreemptibleNodes = []
config.maxPreemptibleNodes = [] # No preemptible nodes
# Make sure the node types don't have to be ordered
config.nodeTypes = [large_node, small_node, medium_node]
config.minNodes = [0, 0, 0]
config.maxNodes = [10, 10] # test expansion of this list
# Algorithm parameters
config.targetTime = defaultTargetTime
config.betaInertia = 0.1
config.scaleInterval = 3
mock = MockBatchSystemAndProvisioner(config=config, secondsPerJob=2.0)
mock.setAutoscaledNodeTypes([({t}, None) for t in config.nodeTypes])
clusterScaler = ScalerThread(mock, mock, config, stop_on_exception=True)
clusterScaler.start()
mock.start()
try:
# Add small jobs
list(map(lambda x: mock.addJob(jobShape=small_job), list(range(numJobs))))
list(map(lambda x: mock.addJob(jobShape=medium_job), list(range(numJobs))))
# Add medium completed jobs
for i in range(1000):
iJ = JobDescription(
requirements=dict(
memory=random.choice(
range(small_job.memory, medium_job.memory)
),
cores=medium_job.cores,
disk=large_job.disk,
preemptible=False,
),
jobName="testClusterScaling",
unitName="",
)
clusterScaler.addCompletedJob(iJ, random.choice(range(1, 10)))
while mock.getNumberOfJobsIssued() > 0 or mock.getNumberOfNodes() > 0:
logger.debug("%i nodes currently provisioned" % mock.getNumberOfNodes())
# Make sure there are no large nodes
self.assertEqual(mock.getNumberOfNodes(nodeType=large_node), 0)
clusterScaler.check()
time.sleep(0.5)
finally:
clusterScaler.shutdown()
mock.shutDown()
# Make sure jobs ran on both the small and medium node types
self.assertTrue(mock.totalJobs > 0)
self.assertTrue(mock.maxWorkers[small_node] > 0)
self.assertTrue(mock.maxWorkers[medium_node] > 0)
self.assertEqual(mock.maxWorkers[large_node], 0)
[docs]
@slow
def testClusterScalingWithPreemptibleJobs(self):
"""
Test scaling simultaneously for a batch of preemptible and non-preemptible jobs.
"""
config = Config()
node_shape = Shape(20, h2b("10Gi"), 10, h2b("20Gi"), False)
preemptible_node_shape = Shape(20, h2b("10Gi"), 10, h2b("20Gi"), True)
job_shape = Shape(20, h2b("7Gi"), 10, h2b("2Gi"), False)
preemptible_job_shape = Shape(20, h2b("7Gi"), 10, h2b("2Gi"), True)
# Make defaults dummy values
config.defaultMemory = h2b("1Gi")
config.defaultCores = 1
config.defaultDisk = h2b("1Gi")
# non-preemptible node parameters
config.nodeTypes = [node_shape, preemptible_node_shape]
config.minNodes = [0, 0]
config.maxNodes = [10, 10]
# Algorithm parameters
config.targetTime = defaultTargetTime
config.betaInertia = 0.9
config.scaleInterval = 3
self._testClusterScaling(
config, numJobs=100, numPreemptibleJobs=100, jobShape=job_shape
)
[docs]
class MockBatchSystemAndProvisioner(AbstractScalableBatchSystem, AbstractProvisioner):
"""Mimics a leader, job batcher, provisioner and scalable batch system."""
def __init__(self, config, secondsPerJob):
super().__init__(clusterName="clusterName", clusterType="mesos")
# To mimic parallel preemptible and non-preemptible queues
# for jobs we create two parallel instances of the following class
self.config = config
self.secondsPerJob = secondsPerJob
self.provisioner = self
self.batchSystem = self
self.jobQueue = Queue()
self.updatedJobsQueue = Queue()
self.jobBatchSystemIDToIssuedJob = {}
self.totalJobs = 0 # Count of total jobs processed
self.totalWorkerTime = 0.0 # Total time spent in worker threads
self.toilMetrics = None
self.nodesToWorker = {} # Map from Node to instances of the Worker class
self.workers = defaultdict(list) # Instances of the Worker class, by node shape
self.maxWorkers = defaultdict(int) # Maximum number of workers, by node shape
self.running = False
self.leaderThread = Thread(target=self._leaderFn)
self.toilState = Namespace()
self.toilState.bus = Namespace()
self.toilState.bus.publish = MagicMock()
[docs]
def start(self):
self.running = True
self.leaderThread.start()
[docs]
def shutDown(self):
self.running = False
self.leaderThread.join()
# Stub out all AbstractBatchSystem methods since they are never called
for name, value in AbstractBatchSystem.__dict__.items():
if getattr(value, "__isabstractmethod__", False):
exec("def %s(): pass" % name)
# Without this, the class would end up with .name and .value attributes
del name, value
# AbstractScalableBatchSystem methods
[docs]
def nodeInUse(self, nodeIP):
return False
[docs]
def ignoreNode(self, nodeAddress):
pass
[docs]
def unignoreNode(self, nodeAddress):
pass
[docs]
def supportedClusterTypes(self):
return {"mesos"}
[docs]
def createClusterSettings(self):
pass
[docs]
def readClusterSettings(self):
pass
# AbstractProvisioner methods
[docs]
def setAutoscaledNodeTypes(
self, node_types: list[tuple[set[Shape], Optional[float]]]
):
self.node_shapes_for_testing = sorted(it for t in node_types for it in t[0])
super().setAutoscaledNodeTypes(node_types)
[docs]
def getProvisionedWorkers(self, instance_type=None, preemptible=None):
"""
Returns a list of Node objects, each representing a worker node in the cluster
:param preemptible: If True only return preemptible nodes else return non-preemptible nodes
:return: list of Node
"""
nodesToWorker = self.nodesToWorker
results = []
if instance_type:
results = [node for node in nodesToWorker if node.nodeType == instance_type]
else:
results = list(nodesToWorker.keys())
if preemptible is not None:
results = [node for node in results if node.preemptible == preemptible]
return results
[docs]
def terminateNodes(self, nodes):
if nodes:
self._removeNodes(nodes)
[docs]
def remainingBillingInterval(self, node):
pass
[docs]
def addJob(self, jobShape, preemptible=False):
"""
Add a job to the job queue
"""
self.totalJobs += 1
jobID = uuid.uuid4()
self.jobBatchSystemIDToIssuedJob[jobID] = JobDescription(
requirements={
"memory": jobShape.memory,
"cores": jobShape.cores,
"disk": jobShape.disk,
"preemptible": preemptible,
},
jobName=f"job{self.totalJobs}",
)
self.jobQueue.put(jobID)
# JobBatcher functionality
[docs]
def getNumberOfJobsIssued(self, preemptible=None):
if preemptible is not None:
jobList = [
job
for job in list(self.jobQueue.queue)
if self.jobBatchSystemIDToIssuedJob[job].preemptible == preemptible
]
return len(jobList)
else:
return self.jobQueue.qsize()
[docs]
def getJobs(self):
# jobBatchSystemIDToIssuedJob may be modified while we are working.
# So copy it.
return dict(self.jobBatchSystemIDToIssuedJob).values()
# AbstractScalableBatchSystem functionality
[docs]
def getNodes(self, preemptible: Optional[bool] = False, timeout: int = 600):
nodes = dict()
for node in self.nodesToWorker:
if node.preemptible == preemptible:
worker = self.nodesToWorker[node]
nodes[node.privateIP] = NodeInfo(
coresTotal=0,
coresUsed=0,
requestedCores=1,
memoryTotal=0,
memoryUsed=0,
requestedMemory=1,
workers=1 if worker.busyEvent.is_set() else 0,
)
return nodes
# AbstractProvisioner functionality
[docs]
def addNodes(self, nodeTypes: set[str], numNodes, preemptible) -> int:
nodeType = next(iter(nodeTypes))
self._addNodes(numNodes=numNodes, nodeType=nodeType, preemptible=preemptible)
return self.getNumberOfNodes(nodeType=nodeType, preemptible=preemptible)
[docs]
def getNodeShape(self, nodeType, preemptible=False):
# Assume node shapes and node types are the same thing for testing
# TODO: this isn't really allowed by the type requirements on AbstractProvisioner
return nodeType
[docs]
def getWorkersInCluster(self, nodeShape):
return self.workers[nodeShape]
[docs]
def launchCluster(
self,
leaderNodeType,
keyName,
userTags=None,
vpcSubnet=None,
leaderStorage=50,
nodeStorage=50,
botoPath=None,
**kwargs,
):
pass
[docs]
def destroyCluster(self) -> None:
pass
[docs]
def getLeader(self):
pass
def _leaderFn(self):
while self.running:
updatedJobID = None
try:
updatedJobID = self.updatedJobsQueue.get(timeout=1.0)
except Empty:
continue
if updatedJobID:
del self.jobBatchSystemIDToIssuedJob[updatedJobID]
time.sleep(0.1)
def _addNodes(self, numNodes, nodeType: str, preemptible=False):
nodeShape = self.getNodeShape(nodeType=nodeType, preemptible=preemptible)
class Worker:
def __init__(self, jobQueue, updatedJobsQueue, secondsPerJob):
self.busyEvent = Event()
self.stopEvent = Event()
def workerFn():
while True:
if self.stopEvent.is_set():
return
try:
jobID = jobQueue.get(timeout=1.0)
except Empty:
continue
updatedJobsQueue.put(jobID)
self.busyEvent.set()
time.sleep(secondsPerJob)
self.busyEvent.clear()
self.startTime = time.time()
self.worker = Thread(target=workerFn)
self.worker.start()
def stop(self):
self.stopEvent.set()
self.worker.join()
return time.time() - self.startTime
for _ in range(numNodes):
node = Node(
"127.0.0.1",
uuid.uuid4(),
"testNode",
datetime.datetime.now().isoformat() + "Z",
nodeType=nodeType,
preemptible=preemptible,
)
self.nodesToWorker[node] = Worker(
self.jobQueue, self.updatedJobsQueue, self.secondsPerJob
)
self.workers[nodeShape].append(self.nodesToWorker[node])
self.maxWorkers[nodeShape] = max(
self.maxWorkers[nodeShape], len(self.workers[nodeShape])
)
def _removeNodes(self, nodes):
logger.debug(
"Removing nodes. %s workers and %s to terminate.",
len(self.nodesToWorker),
len(nodes),
)
for node in nodes:
try:
nodeShape = self.getNodeShape(node.nodeType, node.preemptible)
worker = self.nodesToWorker.pop(node)
self.workers[nodeShape].pop()
self.totalWorkerTime += worker.stop()
except KeyError:
# Node isn't our responsibility
pass
[docs]
def getNumberOfNodes(self, nodeType=None, preemptible=None):
if nodeType:
nodeShape = self.getNodeShape(nodeType=nodeType, preemptible=preemptible)
return len(self.workers[nodeShape])
else:
return len(self.nodesToWorker)