toil.provisioners.clusterScaler¶
Attributes¶
Exceptions¶
Raised in the scaler thread when a job cannot fit in any available node |
Classes¶
An abstract base class to represent the interface the batch system must provide to Toil. |
|
A batch system that supports a variable number of worker nodes. |
|
The coresUsed attribute is a floating point value between 0 (all cores idle) and 1 (all cores |
|
Produced by the Toil-integrated autoscaler to describe the number of |
|
Produced by the Toil-integrated autoscaler describe the number of |
|
Class to represent configuration operations for a toil workflow run. |
|
Stores all the information that the Toil Leader ever needs to know about a Job. |
|
A description of a job that hosts a service. |
|
A thread whose join() method re-raises exceptions raised during run(). While join() is |
|
A context manager for ensuring that the execution of its body takes at least a given amount |
|
Interface for provisioning worker nodes to use in a Toil cluster. |
|
Represents a job or a node's "shape", in terms of the dimensions of memory, cores, disk and |
|
If jobShapes is a set of tasks with run requirements (mem/disk/cpu), and nodeShapes is a sorted |
|
The amount of resources that we expect to be available on a given node at each point in time. |
|
A thread that automatically scales the number of either preemptible or non-preemptible worker |
|
Functions¶
|
Return a binary value as a human readable string with units. |
|
Given a string representation of some memory (i.e. '1024 Mib'), return the |
|
Deprecated. |
|
Add a job to an ending reservation that ends at wallTime. |
|
Partition a node allocation into two to fit the job. |
|
Using the given node shape bins, pack the given job shapes into nodes to |
Module Contents¶
- class toil.provisioners.clusterScaler.AbstractBatchSystem[source]¶
Bases:
abc.ABCAn abstract base class to represent the interface the batch system must provide to Toil.
- classmethod supportsAutoDeployment()[source]¶
- Abstractmethod:
- Return type:
Whether this batch system supports auto-deployment of the user script itself.
If it does, the
setUserScript()can be invoked to set the resource object representing the user script.Note to implementors: If your implementation returns True here, it should also override
- classmethod supportsWorkerCleanup()[source]¶
- Abstractmethod:
- Return type:
Whether this batch system supports worker cleanup.
Indicates whether this batch system invokes
BatchSystemSupport.workerCleanup()after the last job for a particular workflow invocation finishes. Note that the term worker refers to an entire node, not just a worker process. A worker process may run more than one job sequentially, and more than one concurrent worker process may exist on a worker node, for the same workflow. The batch system is said to shut down after the last worker process terminates.
- abstract setUserScript(userScript)[source]¶
Set the user script for this workflow.
This method must be called before the first job is issued to this batch system, and only if
supportsAutoDeployment()returns True, otherwise it will raise an exception.- Parameters:
userScript (toil.resource.Resource) – the resource object representing the user script or module and the modules it depends on.
- Return type:
None
- set_message_bus(message_bus)[source]¶
Give the batch system an opportunity to connect directly to the message bus, so that it can send informational messages about the jobs it is running to other Toil components.
- Parameters:
message_bus (toil.bus.MessageBus)
- Return type:
None
- abstract issueBatchJob(command, job_desc, job_environment=None)[source]¶
Issues a job with the specified command to the batch system and returns a unique job ID number.
- Parameters:
command (str) – the command to execute somewhere to run the Toil worker process
job_desc (toil.job.JobDescription) – the JobDescription for the job being run
job_environment (Optional[Dict[str, str]]) – a collection of job-specific environment variables to be set on the worker.
- Returns:
a unique job ID number that can be used to reference the newly issued job
- Return type:
- abstract killBatchJobs(jobIDs)[source]¶
Kills the given job IDs. After returning, the killed jobs will not appear in the results of getRunningBatchJobIDs. The killed job will not be returned from getUpdatedBatchJob.
- Parameters:
jobIDs (List[int]) – list of IDs of jobs to kill
- Return type:
None
- abstract getIssuedBatchJobIDs()[source]¶
Gets all currently issued jobs
- Returns:
A list of jobs (as job ID numbers) currently issued (may be running, or may be waiting to be run). Despite the result being a list, the ordering should not be depended upon.
- Return type:
List[int]
- abstract getRunningBatchJobIDs()[source]¶
Gets a map of jobs as job ID numbers that are currently running (not just waiting) and how long they have been running, in seconds.
- abstract getUpdatedBatchJob(maxWait)[source]¶
Returns information about job that has updated its status (i.e. ceased running, either successfully or with an error). Each such job will be returned exactly once.
Does not return info for jobs killed by killBatchJobs, although they may cause None to be returned earlier than maxWait.
- Parameters:
maxWait (int) – the number of seconds to block, waiting for a result
- Returns:
If a result is available, returns UpdatedBatchJobInfo. Otherwise it returns None. wallTime is the number of seconds (a strictly positive float) in wall-clock time the job ran for, or None if this batch system does not support tracking wall time.
- Return type:
Optional[UpdatedBatchJobInfo]
- getSchedulingStatusMessage()[source]¶
Get a log message fragment for the user about anything that might be going wrong in the batch system, if available.
If no useful message is available, return None.
This can be used to report what resource is the limiting factor when scheduling jobs, for example. If the leader thinks the workflow is stuck, the message can be displayed to the user to help them diagnose why it might be stuck.
- Returns:
User-directed message about scheduling state.
- Return type:
Optional[str]
- abstract shutdown()[source]¶
Called at the completion of a toil invocation. Should cleanly terminate all worker threads.
- Return type:
None
- abstract setEnv(name, value=None)[source]¶
Set an environment variable for the worker process before it is launched.
The worker process will typically inherit the environment of the machine it is running on but this method makes it possible to override specific variables in that inherited environment before the worker is launched. Note that this mechanism is different to the one used by the worker internally to set up the environment of a job. A call to this method affects all jobs issued after this method returns. Note to implementors: This means that you would typically need to copy the variables before enqueuing a job.
If no value is provided it will be looked up from the current environment.
- classmethod add_options(parser)[source]¶
If this batch system provides any command line options, add them to the given parser.
- Parameters:
parser (Union[argparse.ArgumentParser, argparse._ArgumentGroup])
- Return type:
None
- classmethod setOptions(setOption)[source]¶
Process command line or configuration options relevant to this batch system.
- Parameters:
setOption (toil.batchSystems.options.OptionSetter) – A function with signature setOption(option_name, parsing_function=None, check_function=None, default=None, env=None) returning nothing, used to update run configuration as a side effect.
- Return type:
None
- getWorkerContexts()[source]¶
Get a list of picklable context manager objects to wrap worker work in, in order.
Can be used to ask the Toil worker to do things in-process (such as configuring environment variables, hot-deploying user scripts, or cleaning up a node) that would otherwise require a wrapping “executor” process.
- Return type:
List[ContextManager[Any]]
- class toil.provisioners.clusterScaler.AbstractScalableBatchSystem[source]¶
Bases:
AbstractBatchSystemA batch system that supports a variable number of worker nodes.
Used by
toil.provisioners.clusterScaler.ClusterScalerto scale the number of worker nodes in the cluster up or down depending on overall load.- abstract getNodes(preemptible=None, timeout=600)[source]¶
Returns a dictionary mapping node identifiers of preemptible or non-preemptible nodes to NodeInfo objects, one for each node.
- abstract nodeInUse(nodeIP)[source]¶
Can be used to determine if a worker node is running any tasks. If the node is doesn’t exist, this function should simply return False.
- abstract ignoreNode(nodeAddress)[source]¶
Stop sending jobs to this node. Used in autoscaling when the autoscaler is ready to terminate a node, but jobs are still running. This allows the node to be terminated after the current jobs have finished.
- Parameters:
nodeAddress (str) – IP address of node to ignore.
- Return type:
None
- class toil.provisioners.clusterScaler.NodeInfo(coresUsed, memoryUsed, coresTotal, memoryTotal, requestedCores, requestedMemory, workers)[source]¶
The coresUsed attribute is a floating point value between 0 (all cores idle) and 1 (all cores busy), reflecting the CPU load of the node.
The memoryUsed attribute is a floating point value between 0 (no memory used) and 1 (all memory used), reflecting the memory pressure on the node.
The coresTotal and memoryTotal attributes are the node’s resources, not just the used resources
The requestedCores and requestedMemory attributes are all the resources that Toil Jobs have reserved on the node, regardless of whether the resources are actually being used by the Jobs.
The workers attribute is an integer reflecting the number of workers currently active workers on the node.
- class toil.provisioners.clusterScaler.ClusterDesiredSizeMessage[source]¶
Bases:
NamedTupleProduced by the Toil-integrated autoscaler to describe the number of instances of a certain type that it thinks will be needed.
- class toil.provisioners.clusterScaler.ClusterSizeMessage[source]¶
Bases:
NamedTupleProduced by the Toil-integrated autoscaler describe the number of instances of a certain type in a cluster.
- class toil.provisioners.clusterScaler.Config[source]¶
Class to represent configuration operations for a toil workflow run.
- batch_logs_dir: str | None¶
The backing scheduler will be instructed, if possible, to save logs to this directory, where the leader can read them.
- workflowID: str | None¶
This attribute uniquely identifies the job store and therefore the workflow. It is necessary in order to distinguish between two consecutive workflows for which self.jobStore is the same, e.g. when a job store name is reused after a previous run has finished successfully and its job store has been clean up.
- defaultAccelerators: List[toil.job.AcceleratorRequirement]¶
- prepare_start()[source]¶
After options are set, prepare for initial start of workflow.
- Return type:
None
- prepare_restart()[source]¶
Before restart options are set, prepare for a restart of a workflow. Set up any execution-specific parameters and clear out any stale ones.
- Return type:
None
- setOptions(options)[source]¶
Creates a config object from the options object.
- Parameters:
options (argparse.Namespace)
- Return type:
None
- toil.provisioners.clusterScaler.defaultTargetTime = 1800¶
- class toil.provisioners.clusterScaler.JobDescription(requirements, jobName, unitName='', displayName='', local=None)[source]¶
Bases:
RequirerStores all the information that the Toil Leader ever needs to know about a Job.
- This includes:
Resource requirements.
Which jobs are children or follow-ons or predecessors of this job.
A reference to the Job object in the job store.
Can be obtained from an actual (i.e. executable) Job object, and can be used to obtain the Job object from the JobStore.
Never contains other Jobs or JobDescriptions: all reference is by ID.
Subclassed into variants for checkpoint jobs and service jobs that have their specific parameters.
- Parameters:
- get_chain()[source]¶
Get all the jobs that executed in this job’s chain, in order.
For each job, produces a named tuple with its various names and its original job store ID. The jobs in the chain are in execution order.
If the job hasn’t run yet or it didn’t chain, produces a one-item list.
- Return type:
List[toil.bus.Names]
- serviceHostIDsInBatches()[source]¶
Find all batches of service host job IDs that can be started at the same time.
(in the order they need to start in)
- Return type:
Iterator[List[str]]
- successorsAndServiceHosts()[source]¶
Get an iterator over all child, follow-on, and service job IDs.
- Return type:
Iterator[str]
- allSuccessors()[source]¶
Get an iterator over all child, follow-on, and chained, inherited successor job IDs.
Follow-ons will come before children.
- Return type:
Iterator[str]
- successors_by_phase()[source]¶
Get an iterator over all child/follow-on/chained inherited successor job IDs, along with their phase numbere on the stack.
Phases ececute higher numbers to lower numbers.
- property services¶
- Get a collection of the IDs of service host jobs for this job, in arbitrary order.
Will be empty if the job has no unfinished services.
- has_body()[source]¶
Returns True if we have a job body associated, and False otherwise.
- Return type:
- attach_body(file_store_id, user_script)[source]¶
Attach a job body to this JobDescription.
Takes the file store ID that the body is stored at, and the required user script module.
The file store ID can also be “firstJob” for the root job, stored as a shared file instead.
- Parameters:
file_store_id (str)
user_script (toil.resource.ModuleDescriptor)
- Return type:
None
- get_body()[source]¶
Get the information needed to load the job body.
- Returns:
a file store ID (or magic shared file name “firstJob”) and a user script module.
- Return type:
Tuple[str, toil.resource.ModuleDescriptor]
Fails if no body is attached; check has_body() first.
- nextSuccessors()[source]¶
Return the collection of job IDs for the successors of this job that are ready to run.
If those jobs have multiple predecessor relationships, they may still be blocked on other jobs.
Returns None when at the final phase (all successors done), and an empty collection if there are more phases but they can’t be entered yet (e.g. because we are waiting for the job itself to run).
- Return type:
Optional[Set[str]]
- filterSuccessors(predicate)[source]¶
Keep only successor jobs for which the given predicate function approves.
The predicate function is called with the job’s ID.
Treats all other successors as complete and forgets them.
- filterServiceHosts(predicate)[source]¶
Keep only services for which the given predicate approves.
The predicate function is called with the service host job’s ID.
Treats all other services as complete and forgets them.
- clear_nonexistent_dependents(job_store)[source]¶
Remove all references to child, follow-on, and associated service jobs that do not exist.
That is to say, all those that have been completed and removed.
- Parameters:
job_store (toil.jobStores.abstractJobStore.AbstractJobStore)
- Return type:
None
- is_subtree_done()[source]¶
Check if the subtree is done.
- Returns:
True if the job appears to be done, and all related child, follow-on, and service jobs appear to be finished and removed.
- Return type:
- replace(other)[source]¶
Take on the ID of another JobDescription, retaining our own state and type.
When updated in the JobStore, we will save over the other JobDescription.
Useful for chaining jobs: the chained-to job can replace the parent job.
Merges cleanup state and successors other than this job from the job being replaced into this one.
- Parameters:
other (JobDescription) – Job description to replace.
- Return type:
None
- assert_is_not_newer_than(other)[source]¶
Make sure this JobDescription is not newer than a prospective new version of the JobDescription.
- Parameters:
other (JobDescription)
- Return type:
None
- is_updated_by(other)[source]¶
Return True if the passed JobDescription is a distinct, newer version of this one.
- Parameters:
other (JobDescription)
- Return type:
- addChild(childID)[source]¶
Make the job with the given ID a child of the described job.
- Parameters:
childID (str)
- Return type:
None
- addFollowOn(followOnID)[source]¶
Make the job with the given ID a follow-on of the described job.
- Parameters:
followOnID (str)
- Return type:
None
- addServiceHostJob(serviceID, parentServiceID=None)[source]¶
Make the ServiceHostJob with the given ID a service of the described job.
If a parent ServiceHostJob ID is given, that parent service will be started first, and must have already been added.
- hasChild(childID)[source]¶
Return True if the job with the given ID is a child of the described job.
- hasFollowOn(followOnID)[source]¶
Test if the job with the given ID is a follow-on of the described job.
- hasServiceHostJob(serviceID)[source]¶
Test if the ServiceHostJob is a service of the described job.
- Return type:
- renameReferences(renames)[source]¶
Apply the given dict of ID renames to all references to jobs.
Does not modify our own ID or those of finished predecessors. IDs not present in the renames dict are left as-is.
- Parameters:
renames (Dict[TemporaryID, str]) – Rename operations to apply.
- Return type:
None
- addPredecessor()[source]¶
Notify the JobDescription that a predecessor has been added to its Job.
- Return type:
None
- onRegistration(jobStore)[source]¶
Perform setup work that requires the JobStore.
Called by the Job saving logic when this JobDescription meets the JobStore and has its ID assigned.
Overridden to perform setup work (like hooking up flag files for service jobs) that requires the JobStore.
- Parameters:
jobStore (toil.jobStores.abstractJobStore.AbstractJobStore) – The job store we are being placed into
- Return type:
None
- setupJobAfterFailure(exit_status=None, exit_reason=None)[source]¶
Configure job after a failure.
Reduce the remainingTryCount if greater than zero and set the memory to be at least as big as the default memory (in case of exhaustion of memory, which is common).
Requires a configuration to have been assigned (see
toil.job.Requirer.assignConfig()).- Parameters:
exit_status (Optional[int]) – The exit code from the job.
exit_reason (Optional[toil.batchSystems.abstractBatchSystem.BatchJobExitReason]) – The reason the job stopped, if available from the batch system.
- Return type:
None
- getLogFileHandle(jobStore)[source]¶
Create a context manager that yields a file handle to the log file.
Assumes logJobStoreFileID is set.
- property remainingTryCount¶
- Get the number of tries remaining.
The try count set on the JobDescription, or the default based on the retry count from the config if none is set.
- clearRemainingTryCount()[source]¶
Clear remainingTryCount and set it back to its default value.
- Returns:
True if a modification to the JobDescription was made, and False otherwise.
- Return type:
- class toil.provisioners.clusterScaler.ServiceJobDescription(*args, **kwargs)[source]¶
Bases:
JobDescriptionA description of a job that hosts a service.
- toil.provisioners.clusterScaler.bytes2human(n)[source]¶
Return a binary value as a human readable string with units.
- Parameters:
n (SupportsInt)
- Return type:
- toil.provisioners.clusterScaler.human2bytes(string)[source]¶
Given a string representation of some memory (i.e. ‘1024 Mib’), return the integer number of bytes.
- toil.provisioners.clusterScaler.old_retry(delays=DEFAULT_DELAYS, timeout=DEFAULT_TIMEOUT, predicate=lambda e: ...)[source]¶
Deprecated.
Retry an operation while the failure matches a given predicate and until a given timeout expires, waiting a given amount of time in between attempts. This function is a generator that yields contextmanagers. See doctests below for example usage.
- Parameters:
delays (Iterable[float]) – an interable yielding the time in seconds to wait before each retried attempt, the last element of the iterable will be repeated.
timeout (float) – a overall timeout that should not be exceeded for all attempts together. This is a best-effort mechanism only and it won’t abort an ongoing attempt, even if the timeout expires during that attempt.
predicate (Callable[[Exception],bool]) – a unary callable returning True if another attempt should be made to recover from the given exception. The default value for this parameter will prevent any retries!
- Returns:
a generator yielding context managers, one per attempt
- Return type:
Iterator
Retry for a limited amount of time:
>>> true = lambda _:True >>> false = lambda _:False >>> i = 0 >>> for attempt in old_retry( delays=[0], timeout=.1, predicate=true ): ... with attempt: ... i += 1 ... raise RuntimeError('foo') Traceback (most recent call last): ... RuntimeError: foo >>> i > 1 True
If timeout is 0, do exactly one attempt:
>>> i = 0 >>> for attempt in old_retry( timeout=0 ): ... with attempt: ... i += 1 ... raise RuntimeError( 'foo' ) Traceback (most recent call last): ... RuntimeError: foo >>> i 1
Don’t retry on success:
>>> i = 0 >>> for attempt in old_retry( delays=[0], timeout=.1, predicate=true ): ... with attempt: ... i += 1 >>> i 1
Don’t retry on unless predicate returns True:
>>> i = 0 >>> for attempt in old_retry( delays=[0], timeout=.1, predicate=false): ... with attempt: ... i += 1 ... raise RuntimeError( 'foo' ) Traceback (most recent call last): ... RuntimeError: foo >>> i 1
- class toil.provisioners.clusterScaler.ExceptionalThread(group=None, target=None, name=None, args=(), kwargs=None, *, daemon=None)[source]¶
Bases:
threading.ThreadA thread whose join() method re-raises exceptions raised during run(). While join() is idempotent, the exception is only during the first invocation of join() that successfully joined the thread. If join() times out, no exception will be re reraised even though an exception might already have occurred in run().
When subclassing this thread, override tryRun() instead of run().
>>> def f(): ... assert 0 >>> t = ExceptionalThread(target=f) >>> t.start() >>> t.join() Traceback (most recent call last): ... AssertionError
>>> class MyThread(ExceptionalThread): ... def tryRun( self ): ... assert 0 >>> t = MyThread() >>> t.start() >>> t.join() Traceback (most recent call last): ... AssertionError
- exc_info = None¶
- run()[source]¶
Method representing the thread’s activity.
You may override this method in a subclass. The standard run() method invokes the callable object passed to the object’s constructor as the target argument, if any, with sequential and keyword arguments taken from the args and kwargs arguments, respectively.
- Return type:
None
- join(*args, **kwargs)[source]¶
Wait until the thread terminates.
This blocks the calling thread until the thread whose join() method is called terminates – either normally or through an unhandled exception or until the optional timeout occurs.
When the timeout argument is present and not None, it should be a floating point number specifying a timeout for the operation in seconds (or fractions thereof). As join() always returns None, you must call is_alive() after join() to decide whether a timeout happened – if the thread is still alive, the join() call timed out.
When the timeout argument is not present or None, the operation will block until the thread terminates.
A thread can be join()ed many times.
join() raises a RuntimeError if an attempt is made to join the current thread as that would cause a deadlock. It is also an error to join() a thread before it has been started and attempts to do so raises the same exception.
- class toil.provisioners.clusterScaler.throttle(min_interval)[source]¶
A context manager for ensuring that the execution of its body takes at least a given amount of time, sleeping if necessary. It is a simpler version of LocalThrottle if used as a decorator.
Ensures that body takes at least the given amount of time.
>>> start = time.time() >>> with throttle(1): ... pass >>> 1 <= time.time() - start <= 1.1 True
Ditto when used as a decorator.
>>> @throttle(1) ... def f(): ... pass >>> start = time.time() >>> f() >>> 1 <= time.time() - start <= 1.1 True
If the body takes longer by itself, don’t throttle.
>>> start = time.time() >>> with throttle(1): ... time.sleep(2) >>> 2 <= time.time() - start <= 2.1 True
Ditto when used as a decorator.
>>> @throttle(1) ... def f(): ... time.sleep(2) >>> start = time.time() >>> f() >>> 2 <= time.time() - start <= 2.1 True
If an exception occurs, don’t throttle.
>>> start = time.time() >>> try: ... with throttle(1): ... raise ValueError('foo') ... except ValueError: ... end = time.time() ... raise Traceback (most recent call last): ... ValueError: foo >>> 0 <= end - start <= 0.1 True
Ditto when used as a decorator.
>>> @throttle(1) ... def f(): ... raise ValueError('foo') >>> start = time.time() >>> try: ... f() ... except ValueError: ... end = time.time() ... raise Traceback (most recent call last): ... ValueError: foo >>> 0 <= end - start <= 0.1 True
- class toil.provisioners.clusterScaler.AbstractProvisioner(clusterName=None, clusterType='mesos', zone=None, nodeStorage=50, nodeStorageOverrides=None, enable_fuse=False)[source]¶
Bases:
abc.ABCInterface for provisioning worker nodes to use in a Toil cluster.
- Parameters:
- LEADER_HOME_DIR = '/root/'¶
- abstract supportedClusterTypes()[source]¶
Get all the cluster types that this provisioner implementation supports.
- Return type:
Set[str]
- abstract createClusterSettings()[source]¶
Initialize class for a new cluster, to be deployed, when running outside the cloud.
- abstract readClusterSettings()[source]¶
Initialize class from an existing cluster. This method assumes that the instance we are running on is the leader.
Implementations must call _setLeaderWorkerAuthentication().
- setAutoscaledNodeTypes(nodeTypes)[source]¶
Set node types, shapes and spot bids for Toil-managed autoscaling. :param nodeTypes: A list of node types, as parsed with parse_node_types.
- hasAutoscaledNodeTypes()[source]¶
Check if node types have been configured on the provisioner (via setAutoscaledNodeTypes).
- Returns:
True if node types are configured for autoscaling, and false otherwise.
- Return type:
- getAutoscaledInstanceShapes()[source]¶
Get all the node shapes and their named instance types that the Toil autoscaler should manage.
- static retryPredicate(e)[source]¶
Return true if the exception e should be retried by the cluster scaler. For example, should return true if the exception was due to exceeding an API rate limit. The error will be retried with exponential backoff.
- Parameters:
e – exception raised during execution of setNodeCount
- Returns:
boolean indicating whether the exception e should be retried
- abstract launchCluster(*args, **kwargs)[source]¶
Initialize a cluster and create a leader node.
Implementations must call _setLeaderWorkerAuthentication() with the leader so that workers can be launched.
- Parameters:
leaderNodeType – The leader instance.
leaderStorage – The amount of disk to allocate to the leader in gigabytes.
owner – Tag identifying the owner of the instances.
- abstract addNodes(nodeTypes, numNodes, preemptible, spotBid=None)[source]¶
Used to add worker nodes to the cluster
- Parameters:
- Returns:
number of nodes successfully added
- Return type:
- addManagedNodes(nodeTypes, minNodes, maxNodes, preemptible, spotBid=None)[source]¶
Add a group of managed nodes of the given type, up to the given maximum. The nodes will automatically be launched and terminated depending on cluster load.
Raises ManagedNodesNotSupportedException if the provisioner implementation or cluster configuration can’t have managed nodes.
- Parameters:
minNodes – The minimum number of nodes to scale to
maxNodes – The maximum number of nodes to scale to
preemptible – whether or not the nodes will be preemptible
spotBid – The bid for preemptible nodes if applicable (this can be set in config, also).
nodeTypes (Set[str])
- Return type:
None
- abstract terminateNodes(nodes)[source]¶
Terminate the nodes represented by given Node objects
- Parameters:
nodes (List[toil.provisioners.node.Node]) – list of Node objects
- Return type:
None
- abstract getProvisionedWorkers(instance_type=None, preemptible=None)[source]¶
Gets all nodes, optionally of the given instance type or preemptability, from the provisioner. Includes both static and autoscaled nodes.
- Parameters:
- Returns:
list of Node objects
- Return type:
- abstract getNodeShape(instance_type, preemptible=False)[source]¶
The shape of a preemptible or non-preemptible node managed by this provisioner. The node shape defines key properties of a machine, such as its number of cores or the time between billing intervals.
- abstract destroyCluster()[source]¶
Terminates all nodes in the specified cluster and cleans up all resources associated with the cluster. :param clusterName: identifier of the cluster to terminate.
- Return type:
None
- class InstanceConfiguration[source]¶
Allows defining the initial setup for an instance and then turning it into an Ignition configuration for instance user data.
- addFile(path, filesystem='root', mode='0755', contents='', append=False)[source]¶
Make a file on the instance with the given filesystem, mode, and contents.
See the storage.files section: https://github.com/kinvolk/ignition/blob/flatcar-master/doc/configuration-v2_2.md
- addUnit(name, enabled=True, contents='')[source]¶
Make a systemd unit on the instance with the given name (including .service), and content. Units will be enabled by default.
- Unit logs can be investigated with:
systemctl status whatever.service
- or:
journalctl -xe
- getBaseInstanceConfiguration()[source]¶
Get the base configuration for both leader and worker instances for all cluster types.
- Return type:
- addVolumesService(config)[source]¶
Add a service to prepare and mount local scratch volumes.
- Parameters:
config (InstanceConfiguration)
- addNodeExporterService(config)[source]¶
Add the node exporter service for Prometheus to an instance configuration.
- Parameters:
config (InstanceConfiguration)
- add_toil_service(config, role, keyPath=None, preemptible=False)[source]¶
Add the Toil leader or worker service to an instance configuration.
Will run Mesos master or agent as appropriate in Mesos clusters. For Kubernetes clusters, will just sleep to provide a place to shell into on the leader, and shouldn’t run on the worker.
- Parameters:
role (str) – Should be ‘leader’ or ‘worker’. Will not work for ‘worker’ until leader credentials have been collected.
keyPath (str) – path on the node to a server-side encryption key that will be added to the node after it starts. The service will wait until the key is present before starting.
preemptible (bool) – Whether a worker should identify itself as preemptible or not to the scheduler.
config (InstanceConfiguration)
- getKubernetesValues(architecture='amd64')[source]¶
Returns a dict of Kubernetes component versions and paths for formatting into Kubernetes-related templates.
- Parameters:
architecture (str)
- addKubernetesServices(config, architecture='amd64')[source]¶
Add installing Kubernetes and Kubeadm and setting up the Kubelet to run when configured to an instance configuration. The same process applies to leaders and workers.
- Parameters:
config (InstanceConfiguration)
architecture (str)
- abstract getKubernetesAutoscalerSetupCommands(values)[source]¶
Return Bash commands that set up the Kubernetes cluster autoscaler for provisioning from the environment supported by this provisioner.
Should only be implemented if Kubernetes clusters are supported.
- getKubernetesCloudProvider()[source]¶
Return the Kubernetes cloud provider (for example, ‘aws’), to pass to the kubelets in a Kubernetes cluster provisioned using this provisioner.
Defaults to None if not overridden, in which case no cloud provider integration will be used.
- Returns:
Cloud provider name, or None
- Return type:
Optional[str]
- addKubernetesLeader(config)[source]¶
Add services to configure as a Kubernetes leader, if Kubernetes is already set to be installed.
- Parameters:
config (InstanceConfiguration)
- addKubernetesWorker(config, authVars, preemptible=False)[source]¶
Add services to configure as a Kubernetes worker, if Kubernetes is already set to be installed.
Authenticate back to the leader using the JOIN_TOKEN, JOIN_CERT_HASH, and JOIN_ENDPOINT set in the given authentication data dict.
- Parameters:
config (InstanceConfiguration) – The configuration to add services to
preemptible (bool) – Whether the worker should be labeled as preemptible or not
- class toil.provisioners.clusterScaler.Shape(wallTime, memory, cores, disk, preemptible)[source]¶
Represents a job or a node’s “shape”, in terms of the dimensions of memory, cores, disk and wall-time allocation.
The wallTime attribute stores the number of seconds of a node allocation, e.g. 3600 for AWS. FIXME: and for jobs?
The memory and disk attributes store the number of bytes required by a job (or provided by a node) in RAM or on disk (SSD or HDD), respectively.
- Parameters:
- toil.provisioners.clusterScaler.logger¶
- toil.provisioners.clusterScaler.EVICTION_THRESHOLD¶
- toil.provisioners.clusterScaler.RESERVE_SMALL_LIMIT¶
- toil.provisioners.clusterScaler.RESERVE_SMALL_AMOUNT¶
- toil.provisioners.clusterScaler.RESERVE_FRACTIONS = [0.25, 0.2, 0.1, 0.06, 0.02]¶
- toil.provisioners.clusterScaler.OS_SIZE¶
- toil.provisioners.clusterScaler.FailedConstraint¶
- class toil.provisioners.clusterScaler.BinPackedFit(nodeShapes, targetTime=defaultTargetTime)[source]¶
If jobShapes is a set of tasks with run requirements (mem/disk/cpu), and nodeShapes is a sorted list of available computers to run these jobs on, this function attempts to return a dictionary representing the minimum set of computerNode computers needed to run the tasks in jobShapes.
Uses a first fit decreasing (FFD) bin packing like algorithm to calculate an approximate minimum number of nodes that will fit the given list of jobs. BinPackingFit assumes the ordered list, nodeShapes, is ordered for “node preference” outside of BinPackingFit beforehand. So when virtually “creating” nodes, the first node within nodeShapes that fits the job is the one that’s added.
- Parameters:
- Returns:
The minimum number of minimal node allocations estimated to be required to run all the jobs in jobShapes.
- nodeReservations: Dict[toil.provisioners.abstractProvisioner.Shape, List[NodeReservation]]¶
- binPack(jobShapes)[source]¶
Pack a list of jobShapes into the fewest nodes reasonable.
Can be run multiple times.
Returns any distinct Shapes that did not fit, mapping to reasons they did not fit.
- Parameters:
jobShapes (List[toil.provisioners.abstractProvisioner.Shape])
- Return type:
Dict[toil.provisioners.abstractProvisioner.Shape, List[FailedConstraint]]
- addJobShape(jobShape)[source]¶
Add the job to the first node reservation in which it will fit. (This is the bin-packing aspect).
Returns the job shape again, and a list of failed constraints, if it did not fit.
- Parameters:
jobShape (toil.provisioners.abstractProvisioner.Shape)
- Return type:
Optional[Tuple[toil.provisioners.abstractProvisioner.Shape, List[FailedConstraint]]]
- class toil.provisioners.clusterScaler.NodeReservation(shape)[source]¶
The amount of resources that we expect to be available on a given node at each point in time.
To represent the resources available in a reservation, we represent a reservation as a linked list of NodeReservations, each giving the resources free within a single timeslice.
- Parameters:
- get_failed_constraints(job_shape)[source]¶
Check if a job shape’s resource requirements will fit within this allocation.
If the job does not fit, returns the failing constraints: the resources that can’t be accomodated, and the limits that were hit.
If the job does fit, returns an empty list.
Must always agree with fits()! This codepath is slower and used for diagnosis.
- Parameters:
job_shape (toil.provisioners.abstractProvisioner.Shape)
- Return type:
List[FailedConstraint]
- fits(jobShape)[source]¶
Check if a job shape’s resource requirements will fit within this allocation.
- Parameters:
jobShape (toil.provisioners.abstractProvisioner.Shape)
- Return type:
- subtract(jobShape)[source]¶
Subtract the resources necessary to run a jobShape from the reservation.
- Parameters:
jobShape (toil.provisioners.abstractProvisioner.Shape)
- Return type:
None
- attemptToAddJob(jobShape, nodeShape, targetTime)[source]¶
Attempt to pack a job into this reservation timeslice and/or the reservations after it.
jobShape is the Shape of the job requirements, nodeShape is the Shape of the node this is a reservation for, and targetTime is the maximum time to wait before starting this job.
- Parameters:
jobShape (toil.provisioners.abstractProvisioner.Shape)
nodeShape (toil.provisioners.abstractProvisioner.Shape)
targetTime (float)
- Return type:
- toil.provisioners.clusterScaler.adjustEndingReservationForJob(reservation, jobShape, wallTime)[source]¶
Add a job to an ending reservation that ends at wallTime.
(splitting the reservation if the job doesn’t fill the entire timeslice)
- Parameters:
reservation (NodeReservation)
jobShape (toil.provisioners.abstractProvisioner.Shape)
wallTime (float)
- Return type:
None
- toil.provisioners.clusterScaler.split(nodeShape, jobShape, wallTime)[source]¶
Partition a node allocation into two to fit the job.
Returning the modified shape of the node and a new node reservation for the extra time that the job didn’t fill.
- Parameters:
nodeShape (toil.provisioners.abstractProvisioner.Shape)
jobShape (toil.provisioners.abstractProvisioner.Shape)
wallTime (float)
- Return type:
Tuple[toil.provisioners.abstractProvisioner.Shape, NodeReservation]
- toil.provisioners.clusterScaler.binPacking(nodeShapes, jobShapes, goalTime)[source]¶
Using the given node shape bins, pack the given job shapes into nodes to get them done in the given amount of time.
Returns a dict saying how many of each node will be needed, a dict from job shapes that could not fit to reasons why.
- Parameters:
nodeShapes (List[toil.provisioners.abstractProvisioner.Shape])
jobShapes (List[toil.provisioners.abstractProvisioner.Shape])
goalTime (float)
- Return type:
Tuple[Dict[toil.provisioners.abstractProvisioner.Shape, int], Dict[toil.provisioners.abstractProvisioner.Shape, List[FailedConstraint]]]
- class toil.provisioners.clusterScaler.ClusterScaler(provisioner, leader, config)[source]¶
- Parameters:
provisioner (toil.provisioners.abstractProvisioner.AbstractProvisioner)
leader (toil.leader.Leader)
config (toil.common.Config)
- addCompletedJob(job, wallTime)[source]¶
Adds the shape of a completed job to the queue, allowing the scalar to use the last N completed jobs in factoring how many nodes are required in the cluster. :param toil.job.JobDescription job: The description of the completed job :param int wallTime: The wall-time taken to complete the job in seconds.
- Parameters:
job (toil.job.JobDescription)
wallTime (int)
- Return type:
None
- setStaticNodes(nodes, preemptible)[source]¶
Used to track statically provisioned nodes. This method must be called before any auto-scaled nodes are provisioned.
These nodes are treated differently than auto-scaled nodes in that they should not be automatically terminated.
- Parameters:
nodes (List[toil.provisioners.node.Node]) – list of Node objects
preemptible (bool)
- Return type:
None
- getStaticNodes(preemptible)[source]¶
Returns nodes set in setStaticNodes().
- Parameters:
preemptible (bool)
- Returns:
Statically provisioned nodes.
- Return type:
Dict[str, toil.provisioners.node.Node]
- smoothEstimate(nodeShape, estimatedNodeCount)[source]¶
Smooth out fluctuations in the estimate for this node compared to previous runs.
Returns an integer.
- Parameters:
nodeShape (toil.provisioners.abstractProvisioner.Shape)
estimatedNodeCount (int)
- Return type:
- getEstimatedNodeCounts(queuedJobShapes, currentNodeCounts)[source]¶
Given the resource requirements of queued jobs and the current size of the cluster.
Returns a dict mapping from nodeShape to the number of nodes we want in the cluster right now, and a dict from job shapes that are too big to run on any node to reasons why.
- Parameters:
queuedJobShapes (List[toil.provisioners.abstractProvisioner.Shape])
currentNodeCounts (Dict[toil.provisioners.abstractProvisioner.Shape, int])
- Return type:
Tuple[Dict[toil.provisioners.abstractProvisioner.Shape, int], Dict[toil.provisioners.abstractProvisioner.Shape, List[FailedConstraint]]]
- updateClusterSize(estimatedNodeCounts)[source]¶
Given the desired and current size of the cluster, attempts to launch/remove instances to get to the desired size.
Also attempts to remove ignored nodes that were marked for graceful removal.
Returns the new size of the cluster.
- Parameters:
estimatedNodeCounts (Dict[toil.provisioners.abstractProvisioner.Shape, int])
- Return type:
- setNodeCount(instance_type, numNodes, preemptible=False, force=False)[source]¶
Attempt to grow or shrink the number of preemptible or non-preemptible worker nodes in the cluster to the given value, or as close a value as possible, and, after performing the necessary additions or removals of worker nodes, return the resulting number of preemptible or non-preemptible nodes currently in the cluster.
- Parameters:
instance_type (str) – The instance type to add or remove.
numNodes (int) – Desired size of the cluster
preemptible (bool) – whether the added nodes will be preemptible, i.e. whether they may be removed spontaneously by the underlying platform at any time.
force (bool) – If False, the provisioner is allowed to deviate from the given number of nodes. For example, when downsizing a cluster, a provisioner might leave nodes running if they have active jobs running on them.
- Returns:
the number of worker nodes in the cluster after making the necessary adjustments. This value should be, but is not guaranteed to be, close or equal to the numNodes argument. It represents the closest possible approximation of the actual cluster size at the time this method returns.
- Return type:
- filter_out_static_nodes(nodes, preemptible=False)[source]¶
- Parameters:
nodes (Dict[toil.provisioners.node.Node, toil.batchSystems.abstractBatchSystem.NodeInfo])
preemptible (bool)
- Return type:
List[Tuple[toil.provisioners.node.Node, toil.batchSystems.abstractBatchSystem.NodeInfo]]
- getNodes(preemptible=None)[source]¶
Returns a dictionary mapping node identifiers of preemptible or non-preemptible nodes to NodeInfo objects, one for each node.
This method is the definitive source on nodes in cluster, & is responsible for consolidating cluster state between the provisioner & batch system.
- Parameters:
preemptible (bool) – If True (False) only (non-)preemptible nodes will be returned. If None, all nodes will be returned.
- Return type:
Dict[toil.provisioners.node.Node, toil.batchSystems.abstractBatchSystem.NodeInfo]
- exception toil.provisioners.clusterScaler.JobTooBigError(job=None, shape=None, constraints=None)[source]¶
Bases:
ExceptionRaised in the scaler thread when a job cannot fit in any available node type and is likely to lock up the workflow.
- Parameters:
job (Optional[toil.job.JobDescription])
shape (Optional[toil.provisioners.abstractProvisioner.Shape])
constraints (Optional[List[FailedConstraint]])
- class toil.provisioners.clusterScaler.ScalerThread(provisioner, leader, config, stop_on_exception=False)[source]¶
Bases:
toil.lib.threading.ExceptionalThreadA thread that automatically scales the number of either preemptible or non-preemptible worker nodes according to the resource requirements of the queued jobs.
The scaling calculation is essentially as follows: start with 0 estimated worker nodes. For each queued job, check if we expect it can be scheduled into a worker node before a certain time (currently one hour). Otherwise, attempt to add a single new node of the smallest type that can fit that job.
At each scaling decision point a comparison between the current, C, and newly estimated number of nodes is made. If the absolute difference is less than beta * C then no change is made, else the size of the cluster is adapted. The beta factor is an inertia parameter that prevents continual fluctuations in the number of nodes.
- Parameters:
provisioner (toil.provisioners.abstractProvisioner.AbstractProvisioner)
leader (toil.leader.Leader)
config (toil.common.Config)
stop_on_exception (bool)
- check()[source]¶
Attempt to join any existing scaler threads that may have died or finished.
This insures any exceptions raised in the threads are propagated in a timely fashion.
- Return type:
None
- addCompletedJob(job, wallTime)[source]¶
- Parameters:
job (toil.job.JobDescription)
wallTime (int)
- Return type:
None
- class toil.provisioners.clusterScaler.ClusterStats(path, batchSystem, clusterName)[source]¶
- Parameters:
path (str)
batchSystem (toil.batchSystems.abstractBatchSystem.AbstractBatchSystem)
clusterName (Optional[str])