toil.provisioners.clusterScaler

Module Contents

Classes

BinPackedFit

If jobShapes is a set of tasks with run requirements (mem/disk/cpu), and nodeShapes is a sorted

NodeReservation

The amount of resources that we expect to be available on a given node at each point in time.

ClusterScaler

ScalerThread

A thread that automatically scales the number of either preemptible or non-preemptible worker

ClusterStats

Functions

adjustEndingReservationForJob(reservation, jobShape, ...)

Add a job to an ending reservation that ends at wallTime.

split(nodeShape, jobShape, wallTime)

Partition a node allocation into two to fit the job.

binPacking(nodeShapes, jobShapes, goalTime)

Using the given node shape bins, pack the given job shapes into nodes to

Attributes

logger

EVICTION_THRESHOLD

RESERVE_SMALL_LIMIT

RESERVE_SMALL_AMOUNT

RESERVE_BREAKPOINTS

RESERVE_FRACTIONS

OS_SIZE

FailedConstraint

toil.provisioners.clusterScaler.logger
toil.provisioners.clusterScaler.EVICTION_THRESHOLD
toil.provisioners.clusterScaler.RESERVE_SMALL_LIMIT
toil.provisioners.clusterScaler.RESERVE_SMALL_AMOUNT
toil.provisioners.clusterScaler.RESERVE_BREAKPOINTS: List[Union[int, float]]
toil.provisioners.clusterScaler.RESERVE_FRACTIONS = [0.25, 0.2, 0.1, 0.06, 0.02]
toil.provisioners.clusterScaler.OS_SIZE
toil.provisioners.clusterScaler.FailedConstraint
class toil.provisioners.clusterScaler.BinPackedFit(nodeShapes, targetTime=defaultTargetTime)[source]

If jobShapes is a set of tasks with run requirements (mem/disk/cpu), and nodeShapes is a sorted list of available computers to run these jobs on, this function attempts to return a dictionary representing the minimum set of computerNode computers needed to run the tasks in jobShapes.

Uses a first fit decreasing (FFD) bin packing like algorithm to calculate an approximate minimum number of nodes that will fit the given list of jobs. BinPackingFit assumes the ordered list, nodeShapes, is ordered for “node preference” outside of BinPackingFit beforehand. So when virtually “creating” nodes, the first node within nodeShapes that fits the job is the one that’s added.

Parameters
  • nodeShapes (list) – The properties of an atomic node allocation, in terms of wall-time, memory, cores, disk, and whether it is preemptible or not.

  • targetTime (float) – The time before which all jobs should at least be started.

Returns

The minimum number of minimal node allocations estimated to be required to run all the jobs in jobShapes.

nodeReservations: Dict[toil.provisioners.abstractProvisioner.Shape, List[NodeReservation]]
binPack(jobShapes)[source]

Pack a list of jobShapes into the fewest nodes reasonable.

Can be run multiple times.

Returns any distinct Shapes that did not fit, mapping to reasons they did not fit.

Parameters

jobShapes (List[toil.provisioners.abstractProvisioner.Shape]) –

Return type

Dict[toil.provisioners.abstractProvisioner.Shape, List[FailedConstraint]]

addJobShape(jobShape)[source]

Add the job to the first node reservation in which it will fit. (This is the bin-packing aspect).

Returns the job shape again, and a list of failed constraints, if it did not fit.

Parameters

jobShape (toil.provisioners.abstractProvisioner.Shape) –

Return type

Optional[Tuple[toil.provisioners.abstractProvisioner.Shape, List[FailedConstraint]]]

getRequiredNodes()[source]

Return a dict from node shape to number of nodes required to run the packed jobs.

Return type

Dict[toil.provisioners.abstractProvisioner.Shape, int]

class toil.provisioners.clusterScaler.NodeReservation(shape)[source]

The amount of resources that we expect to be available on a given node at each point in time.

To represent the resources available in a reservation, we represent a reservation as a linked list of NodeReservations, each giving the resources free within a single timeslice.

Parameters

shape (toil.provisioners.abstractProvisioner.Shape) –

__str__()[source]

Return str(self).

Return type

str

get_failed_constraints(job_shape)[source]

Check if a job shape’s resource requirements will fit within this allocation.

If the job does not fit, returns the failing constraints: the resources that can’t be accomodated, and the limits that were hit.

If the job does fit, returns an empty list.

Must always agree with fits()! This codepath is slower and used for diagnosis.

Parameters

job_shape (toil.provisioners.abstractProvisioner.Shape) –

Return type

List[FailedConstraint]

fits(jobShape)[source]

Check if a job shape’s resource requirements will fit within this allocation.

Parameters

jobShape (toil.provisioners.abstractProvisioner.Shape) –

Return type

bool

shapes()[source]

Get all time-slice shapes, in order, from this reservation on.

Return type

List[toil.provisioners.abstractProvisioner.Shape]

subtract(jobShape)[source]

Subtract the resources necessary to run a jobShape from the reservation.

Parameters

jobShape (toil.provisioners.abstractProvisioner.Shape) –

Return type

None

attemptToAddJob(jobShape, nodeShape, targetTime)[source]

Attempt to pack a job into this reservation timeslice and/or the reservations after it.

jobShape is the Shape of the job requirements, nodeShape is the Shape of the node this is a reservation for, and targetTime is the maximum time to wait before starting this job.

Parameters
Return type

bool

toil.provisioners.clusterScaler.adjustEndingReservationForJob(reservation, jobShape, wallTime)[source]

Add a job to an ending reservation that ends at wallTime.

(splitting the reservation if the job doesn’t fill the entire timeslice)

Parameters
Return type

None

toil.provisioners.clusterScaler.split(nodeShape, jobShape, wallTime)[source]

Partition a node allocation into two to fit the job.

Returning the modified shape of the node and a new node reservation for the extra time that the job didn’t fill.

Parameters
Return type

Tuple[toil.provisioners.abstractProvisioner.Shape, NodeReservation]

toil.provisioners.clusterScaler.binPacking(nodeShapes, jobShapes, goalTime)[source]

Using the given node shape bins, pack the given job shapes into nodes to get them done in the given amount of time.

Returns a dict saying how many of each node will be needed, a dict from job shapes that could not fit to reasons why.

Parameters
Return type

Tuple[Dict[toil.provisioners.abstractProvisioner.Shape, int], Dict[toil.provisioners.abstractProvisioner.Shape, List[FailedConstraint]]]

class toil.provisioners.clusterScaler.ClusterScaler(provisioner, leader, config)[source]
Parameters
getAverageRuntime(jobName, service=False)[source]
Parameters
  • jobName (str) –

  • service (bool) –

Return type

float

addCompletedJob(job, wallTime)[source]

Adds the shape of a completed job to the queue, allowing the scalar to use the last N completed jobs in factoring how many nodes are required in the cluster. :param toil.job.JobDescription job: The description of the completed job :param int wallTime: The wall-time taken to complete the job in seconds.

Parameters
Return type

None

setStaticNodes(nodes, preemptible)[source]

Used to track statically provisioned nodes. This method must be called before any auto-scaled nodes are provisioned.

These nodes are treated differently than auto-scaled nodes in that they should not be automatically terminated.

Parameters
Return type

None

getStaticNodes(preemptible)[source]

Returns nodes set in setStaticNodes().

Parameters

preemptible (bool) –

Returns

Statically provisioned nodes.

Return type

Dict[str, toil.provisioners.node.Node]

smoothEstimate(nodeShape, estimatedNodeCount)[source]

Smooth out fluctuations in the estimate for this node compared to previous runs.

Returns an integer.

Parameters
Return type

int

getEstimatedNodeCounts(queuedJobShapes, currentNodeCounts)[source]

Given the resource requirements of queued jobs and the current size of the cluster.

Returns a dict mapping from nodeShape to the number of nodes we want in the cluster right now, and a dict from job shapes that are too big to run on any node to reasons why.

Parameters
Return type

Tuple[Dict[toil.provisioners.abstractProvisioner.Shape, int], Dict[toil.provisioners.abstractProvisioner.Shape, List[FailedConstraint]]]

updateClusterSize(estimatedNodeCounts)[source]

Given the desired and current size of the cluster, attempts to launch/remove instances to get to the desired size.

Also attempts to remove ignored nodes that were marked for graceful removal.

Returns the new size of the cluster.

Parameters

estimatedNodeCounts (Dict[toil.provisioners.abstractProvisioner.Shape, int]) –

Return type

Dict[toil.provisioners.abstractProvisioner.Shape, int]

setNodeCount(instance_type, numNodes, preemptible=False, force=False)[source]

Attempt to grow or shrink the number of preemptible or non-preemptible worker nodes in the cluster to the given value, or as close a value as possible, and, after performing the necessary additions or removals of worker nodes, return the resulting number of preemptible or non-preemptible nodes currently in the cluster.

Parameters
  • instance_type (str) – The instance type to add or remove.

  • numNodes (int) – Desired size of the cluster

  • preemptible (bool) – whether the added nodes will be preemptible, i.e. whether they may be removed spontaneously by the underlying platform at any time.

  • force (bool) – If False, the provisioner is allowed to deviate from the given number of nodes. For example, when downsizing a cluster, a provisioner might leave nodes running if they have active jobs running on them.

Returns

the number of worker nodes in the cluster after making the necessary adjustments. This value should be, but is not guaranteed to be, close or equal to the numNodes argument. It represents the closest possible approximation of the actual cluster size at the time this method returns.

Return type

int

filter_out_static_nodes(nodes, preemptible=False)[source]
Parameters
Return type

List[Tuple[toil.provisioners.node.Node, toil.batchSystems.abstractBatchSystem.NodeInfo]]

getNodes(preemptible=None)[source]

Returns a dictionary mapping node identifiers of preemptible or non-preemptible nodes to NodeInfo objects, one for each node.

This method is the definitive source on nodes in cluster, & is responsible for consolidating cluster state between the provisioner & batch system.

Parameters

preemptible (bool) – If True (False) only (non-)preemptible nodes will be returned. If None, all nodes will be returned.

Return type

Dict[toil.provisioners.node.Node, toil.batchSystems.abstractBatchSystem.NodeInfo]

shutDown()[source]
Return type

None

exception toil.provisioners.clusterScaler.JobTooBigError(job=None, shape=None, constraints=None)[source]

Bases: Exception

digraph inheritance8761f06f82 { bgcolor=transparent; rankdir=LR; size="8.0, 12.0"; "JobTooBigError" [URL="#toil.provisioners.clusterScaler.JobTooBigError",fillcolor=white,fontname="Vera Sans, DejaVu Sans, Liberation Sans, Arial, Helvetica, sans",fontsize=10,height=0.25,shape=box,style="setlinewidth(0.5),filled",target="_top",tooltip="Raised in the scaler thread when a job cannot fit in any available node"]; }

Raised in the scaler thread when a job cannot fit in any available node type and is likely to lock up the workflow.

Parameters
__str__()[source]

Stringify the exception, including the message.

Return type

str

class toil.provisioners.clusterScaler.ScalerThread(provisioner, leader, config, stop_on_exception=False)[source]

Bases: toil.lib.threading.ExceptionalThread

digraph inheritance7a5f34fa71 { bgcolor=transparent; rankdir=LR; size="8.0, 12.0"; "ExceptionalThread" [URL="../../lib/threading/index.html#toil.lib.threading.ExceptionalThread",fillcolor=white,fontname="Vera Sans, DejaVu Sans, Liberation Sans, Arial, Helvetica, sans",fontsize=10,height=0.25,shape=box,style="setlinewidth(0.5),filled",target="_top",tooltip="A thread whose join() method re-raises exceptions raised during run(). While join() is"]; "Thread" -> "ExceptionalThread" [arrowsize=0.5,style="setlinewidth(0.5)"]; "ScalerThread" [URL="#toil.provisioners.clusterScaler.ScalerThread",fillcolor=white,fontname="Vera Sans, DejaVu Sans, Liberation Sans, Arial, Helvetica, sans",fontsize=10,height=0.25,shape=box,style="setlinewidth(0.5),filled",target="_top",tooltip="A thread that automatically scales the number of either preemptible or non-preemptible worker"]; "ExceptionalThread" -> "ScalerThread" [arrowsize=0.5,style="setlinewidth(0.5)"]; "Thread" [fillcolor=white,fontname="Vera Sans, DejaVu Sans, Liberation Sans, Arial, Helvetica, sans",fontsize=10,height=0.25,shape=box,style="setlinewidth(0.5),filled",tooltip="A class that represents a thread of control."]; }

A thread that automatically scales the number of either preemptible or non-preemptible worker nodes according to the resource requirements of the queued jobs.

The scaling calculation is essentially as follows: start with 0 estimated worker nodes. For each queued job, check if we expect it can be scheduled into a worker node before a certain time (currently one hour). Otherwise, attempt to add a single new node of the smallest type that can fit that job.

At each scaling decision point a comparison between the current, C, and newly estimated number of nodes is made. If the absolute difference is less than beta * C then no change is made, else the size of the cluster is adapted. The beta factor is an inertia parameter that prevents continual fluctuations in the number of nodes.

Parameters
check()[source]

Attempt to join any existing scaler threads that may have died or finished.

This insures any exceptions raised in the threads are propagated in a timely fashion.

Return type

None

shutdown()[source]

Shutdown the cluster.

Return type

None

addCompletedJob(job, wallTime)[source]
Parameters
Return type

None

tryRun()[source]
Return type

None

class toil.provisioners.clusterScaler.ClusterStats(path, batchSystem, clusterName)[source]
Parameters
shutDownStats()[source]
Return type

None

startStats(preemptible)[source]
Parameters

preemptible (bool) –

Return type

None

checkStats()[source]
Return type

None