toil.provisioners.clusterScaler

Attributes

logger

EVICTION_THRESHOLD

RESERVE_SMALL_LIMIT

RESERVE_SMALL_AMOUNT

RESERVE_BREAKPOINTS

RESERVE_FRACTIONS

OS_SIZE

FailedConstraint

Exceptions

JobTooBigError

Raised in the scaler thread when a job cannot fit in any available node

Classes

BinPackedFit

If jobShapes is a set of tasks with run requirements (mem/disk/cpu), and nodeShapes is a sorted

NodeReservation

The amount of resources that we expect to be available on a given node at each point in time.

ClusterScaler

ScalerThread

A thread that automatically scales the number of either preemptible or non-preemptible worker

ClusterStats

Functions

adjustEndingReservationForJob(reservation, jobShape, ...)

Add a job to an ending reservation that ends at wallTime.

split(nodeShape, jobShape, wallTime)

Partition a node allocation into two to fit the job.

binPacking(nodeShapes, jobShapes, goalTime)

Using the given node shape bins, pack the given job shapes into nodes to

Module Contents

toil.provisioners.clusterScaler.logger
toil.provisioners.clusterScaler.EVICTION_THRESHOLD
toil.provisioners.clusterScaler.RESERVE_SMALL_LIMIT
toil.provisioners.clusterScaler.RESERVE_SMALL_AMOUNT
toil.provisioners.clusterScaler.RESERVE_BREAKPOINTS: List[int | float]
toil.provisioners.clusterScaler.RESERVE_FRACTIONS = [0.25, 0.2, 0.1, 0.06, 0.02]
toil.provisioners.clusterScaler.OS_SIZE
toil.provisioners.clusterScaler.FailedConstraint
class toil.provisioners.clusterScaler.BinPackedFit(nodeShapes, targetTime=defaultTargetTime)[source]

If jobShapes is a set of tasks with run requirements (mem/disk/cpu), and nodeShapes is a sorted list of available computers to run these jobs on, this function attempts to return a dictionary representing the minimum set of computerNode computers needed to run the tasks in jobShapes.

Uses a first fit decreasing (FFD) bin packing like algorithm to calculate an approximate minimum number of nodes that will fit the given list of jobs. BinPackingFit assumes the ordered list, nodeShapes, is ordered for “node preference” outside of BinPackingFit beforehand. So when virtually “creating” nodes, the first node within nodeShapes that fits the job is the one that’s added.

Parameters:
  • nodeShapes (list) – The properties of an atomic node allocation, in terms of wall-time, memory, cores, disk, and whether it is preemptible or not.

  • targetTime (float) – The time before which all jobs should at least be started.

Returns:

The minimum number of minimal node allocations estimated to be required to run all the jobs in jobShapes.

nodeReservations: Dict[toil.provisioners.abstractProvisioner.Shape, List[NodeReservation]]
nodeShapes
targetTime
binPack(jobShapes)[source]

Pack a list of jobShapes into the fewest nodes reasonable.

Can be run multiple times.

Returns any distinct Shapes that did not fit, mapping to reasons they did not fit.

Parameters:

jobShapes (List[toil.provisioners.abstractProvisioner.Shape])

Return type:

Dict[toil.provisioners.abstractProvisioner.Shape, List[FailedConstraint]]

addJobShape(jobShape)[source]

Add the job to the first node reservation in which it will fit. (This is the bin-packing aspect).

Returns the job shape again, and a list of failed constraints, if it did not fit.

Parameters:

jobShape (toil.provisioners.abstractProvisioner.Shape)

Return type:

Optional[Tuple[toil.provisioners.abstractProvisioner.Shape, List[FailedConstraint]]]

getRequiredNodes()[source]

Return a dict from node shape to number of nodes required to run the packed jobs.

Return type:

Dict[toil.provisioners.abstractProvisioner.Shape, int]

class toil.provisioners.clusterScaler.NodeReservation(shape)[source]

The amount of resources that we expect to be available on a given node at each point in time.

To represent the resources available in a reservation, we represent a reservation as a linked list of NodeReservations, each giving the resources free within a single timeslice.

Parameters:

shape (toil.provisioners.abstractProvisioner.Shape)

shape
nReservation: NodeReservation | None = None
__str__()[source]
Return type:

str

get_failed_constraints(job_shape)[source]

Check if a job shape’s resource requirements will fit within this allocation.

If the job does not fit, returns the failing constraints: the resources that can’t be accomodated, and the limits that were hit.

If the job does fit, returns an empty list.

Must always agree with fits()! This codepath is slower and used for diagnosis.

Parameters:

job_shape (toil.provisioners.abstractProvisioner.Shape)

Return type:

List[FailedConstraint]

fits(jobShape)[source]

Check if a job shape’s resource requirements will fit within this allocation.

Parameters:

jobShape (toil.provisioners.abstractProvisioner.Shape)

Return type:

bool

shapes()[source]

Get all time-slice shapes, in order, from this reservation on.

Return type:

List[toil.provisioners.abstractProvisioner.Shape]

subtract(jobShape)[source]

Subtract the resources necessary to run a jobShape from the reservation.

Parameters:

jobShape (toil.provisioners.abstractProvisioner.Shape)

Return type:

None

attemptToAddJob(jobShape, nodeShape, targetTime)[source]

Attempt to pack a job into this reservation timeslice and/or the reservations after it.

jobShape is the Shape of the job requirements, nodeShape is the Shape of the node this is a reservation for, and targetTime is the maximum time to wait before starting this job.

Parameters:
Return type:

bool

toil.provisioners.clusterScaler.adjustEndingReservationForJob(reservation, jobShape, wallTime)[source]

Add a job to an ending reservation that ends at wallTime.

(splitting the reservation if the job doesn’t fill the entire timeslice)

Parameters:
Return type:

None

toil.provisioners.clusterScaler.split(nodeShape, jobShape, wallTime)[source]

Partition a node allocation into two to fit the job.

Returning the modified shape of the node and a new node reservation for the extra time that the job didn’t fill.

Parameters:
Return type:

Tuple[toil.provisioners.abstractProvisioner.Shape, NodeReservation]

toil.provisioners.clusterScaler.binPacking(nodeShapes, jobShapes, goalTime)[source]

Using the given node shape bins, pack the given job shapes into nodes to get them done in the given amount of time.

Returns a dict saying how many of each node will be needed, a dict from job shapes that could not fit to reasons why.

Parameters:
Return type:

Tuple[Dict[toil.provisioners.abstractProvisioner.Shape, int], Dict[toil.provisioners.abstractProvisioner.Shape, List[FailedConstraint]]]

class toil.provisioners.clusterScaler.ClusterScaler(provisioner, leader, config)[source]
Parameters:
provisioner
leader
config
static: Dict[bool, Dict[str, toil.provisioners.node.Node]]
on_too_big: List[Callable[[toil.provisioners.abstractProvisioner.Shape, List[toil.provisioners.abstractProvisioner.Shape]], Any]] = []
jobNameToAvgRuntime: Dict[str, float]
jobNameToNumCompleted: Dict[str, int]
totalAvgRuntime = 0.0
totalJobsCompleted = 0
targetTime: float
betaInertia
nodeShapeToType
instance_types
nodeShapes
ignoredNodes: Set[str]
preemptibleNodeDeficit
previousWeightedEstimate
minNodes
maxNodes
node_shapes_after_overhead
without_overhead
totalNodes: Dict[toil.provisioners.abstractProvisioner.Shape, int]
getAverageRuntime(jobName, service=False)[source]
Parameters:
Return type:

float

addCompletedJob(job, wallTime)[source]

Adds the shape of a completed job to the queue, allowing the scalar to use the last N completed jobs in factoring how many nodes are required in the cluster. :param toil.job.JobDescription job: The description of the completed job :param int wallTime: The wall-time taken to complete the job in seconds.

Parameters:
Return type:

None

setStaticNodes(nodes, preemptible)[source]

Used to track statically provisioned nodes. This method must be called before any auto-scaled nodes are provisioned.

These nodes are treated differently than auto-scaled nodes in that they should not be automatically terminated.

Parameters:
Return type:

None

getStaticNodes(preemptible)[source]

Returns nodes set in setStaticNodes().

Parameters:

preemptible (bool)

Returns:

Statically provisioned nodes.

Return type:

Dict[str, toil.provisioners.node.Node]

smoothEstimate(nodeShape, estimatedNodeCount)[source]

Smooth out fluctuations in the estimate for this node compared to previous runs.

Returns an integer.

Parameters:
Return type:

int

getEstimatedNodeCounts(queuedJobShapes, currentNodeCounts)[source]

Given the resource requirements of queued jobs and the current size of the cluster.

Returns a dict mapping from nodeShape to the number of nodes we want in the cluster right now, and a dict from job shapes that are too big to run on any node to reasons why.

Parameters:
Return type:

Tuple[Dict[toil.provisioners.abstractProvisioner.Shape, int], Dict[toil.provisioners.abstractProvisioner.Shape, List[FailedConstraint]]]

updateClusterSize(estimatedNodeCounts)[source]

Given the desired and current size of the cluster, attempts to launch/remove instances to get to the desired size.

Also attempts to remove ignored nodes that were marked for graceful removal.

Returns the new size of the cluster.

Parameters:

estimatedNodeCounts (Dict[toil.provisioners.abstractProvisioner.Shape, int])

Return type:

Dict[toil.provisioners.abstractProvisioner.Shape, int]

setNodeCount(instance_type, numNodes, preemptible=False, force=False)[source]

Attempt to grow or shrink the number of preemptible or non-preemptible worker nodes in the cluster to the given value, or as close a value as possible, and, after performing the necessary additions or removals of worker nodes, return the resulting number of preemptible or non-preemptible nodes currently in the cluster.

Parameters:
  • instance_type (str) – The instance type to add or remove.

  • numNodes (int) – Desired size of the cluster

  • preemptible (bool) – whether the added nodes will be preemptible, i.e. whether they may be removed spontaneously by the underlying platform at any time.

  • force (bool) – If False, the provisioner is allowed to deviate from the given number of nodes. For example, when downsizing a cluster, a provisioner might leave nodes running if they have active jobs running on them.

Returns:

the number of worker nodes in the cluster after making the necessary adjustments. This value should be, but is not guaranteed to be, close or equal to the numNodes argument. It represents the closest possible approximation of the actual cluster size at the time this method returns.

Return type:

int

filter_out_static_nodes(nodes, preemptible=False)[source]
Parameters:
Return type:

List[Tuple[toil.provisioners.node.Node, toil.batchSystems.abstractBatchSystem.NodeInfo]]

getNodes(preemptible=None)[source]

Returns a dictionary mapping node identifiers of preemptible or non-preemptible nodes to NodeInfo objects, one for each node.

This method is the definitive source on nodes in cluster, & is responsible for consolidating cluster state between the provisioner & batch system.

Parameters:

preemptible (bool) – If True (False) only (non-)preemptible nodes will be returned. If None, all nodes will be returned.

Return type:

Dict[toil.provisioners.node.Node, toil.batchSystems.abstractBatchSystem.NodeInfo]

shutDown()[source]
Return type:

None

exception toil.provisioners.clusterScaler.JobTooBigError(job=None, shape=None, constraints=None)[source]

Bases: Exception

Raised in the scaler thread when a job cannot fit in any available node type and is likely to lock up the workflow.

Parameters:
job
shape
constraints
parts
msg
__str__()[source]

Stringify the exception, including the message.

Return type:

str

class toil.provisioners.clusterScaler.ScalerThread(provisioner, leader, config, stop_on_exception=False)[source]

Bases: toil.lib.threading.ExceptionalThread

A thread that automatically scales the number of either preemptible or non-preemptible worker nodes according to the resource requirements of the queued jobs.

The scaling calculation is essentially as follows: start with 0 estimated worker nodes. For each queued job, check if we expect it can be scheduled into a worker node before a certain time (currently one hour). Otherwise, attempt to add a single new node of the smallest type that can fit that job.

At each scaling decision point a comparison between the current, C, and newly estimated number of nodes is made. If the absolute difference is less than beta * C then no change is made, else the size of the cluster is adapted. The beta factor is an inertia parameter that prevents continual fluctuations in the number of nodes.

Parameters:
scaler
stop = False
stop_on_exception
stats = None
check()[source]

Attempt to join any existing scaler threads that may have died or finished.

This insures any exceptions raised in the threads are propagated in a timely fashion.

Return type:

None

shutdown()[source]

Shutdown the cluster.

Return type:

None

addCompletedJob(job, wallTime)[source]
Parameters:
Return type:

None

tryRun()[source]
Return type:

None

class toil.provisioners.clusterScaler.ClusterStats(path, batchSystem, clusterName)[source]
Parameters:
stats: Dict[str, Dict[str, List[Dict[str, Any]]]]
statsThreads: List[toil.lib.threading.ExceptionalThread] = []
statsPath
stop = False
clusterName
batchSystem
scaleable
shutDownStats()[source]
Return type:

None

startStats(preemptible)[source]
Parameters:

preemptible (bool)

Return type:

None

checkStats()[source]
Return type:

None