Source code for toil.utils.toilLaunchCluster

# Copyright (C) 2015-2021 Regents of the University of California
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Launches a toil leader instance with the specified provisioner."""

import logging
import os
from typing import Union

from toil import applianceSelf
from toil.common import parser_with_common_options

try:
    from toil.lib.aws import build_tag_dict_from_env
except ModuleNotFoundError:
    build_tag_dict_from_env: dict[str, str] = lambda _: {}  # type: ignore[no-redef]
from toil.lib.conversions import opt_strtobool
from toil.provisioners import check_valid_node_types, cluster_factory, parse_node_types
from toil.statsAndLogging import set_logging_from_options

logger = logging.getLogger(__name__)


[docs] def create_tags_dict(tags: list[str]) -> dict[str, str]: tags_dict = dict() for tag in tags: try: key, value = tag.split("=") except ValueError: logger.error("Tag specification '%s' must contain '='", tag) raise tags_dict[key] = value return tags_dict
[docs] def main() -> None: parser = parser_with_common_options( provisioner_options=True, jobstore_option=False, prog="toil launch-cluster" ) parser.add_argument( "-T", "--clusterType", dest="clusterType", choices=["mesos", "kubernetes"], default=None, # TODO: change default to "kubernetes" when we are ready. help="Cluster scheduler to use.", ) parser.add_argument( "--leaderNodeType", dest="leaderNodeType", required=True, help="Non-preemptible node type to use for the cluster leader.", ) parser.add_argument( "--keyPairName", dest="keyPairName", help="On AWS, the name of the AWS key pair to include on the instance." " On Google/GCE, this is the ssh key pair.", ) parser.add_argument( "--owner", dest="owner", help="The owner tag for all instances. If not given, the value in" "TOIL_OWNER_TAG will be used, or else the value of --keyPairName.", ) parser.add_argument( "--boto", dest="botoPath", help="The path to the boto credentials directory. This is transferred " "to all nodes in order to access the AWS jobStore from non-AWS instances.", ) parser.add_argument( "-t", "--tag", metavar="NAME=VALUE", dest="tags", default=[], action="append", help="Tags are added to the AWS cluster for this node and all of its " "children. Tags are of the form:\n" " -t key1=value1 --tag key2=value2\n" "Multiple tags are allowed and each tag needs its own flag. By " "default the cluster is tagged with " " {\n" ' "Name": clusterName,\n' ' "Owner": IAM username\n' " }. ", ) parser.add_argument( "--network", help="GCE cloud network to use. default: 'default'" ) parser.add_argument( "--vpcSubnet", help="VPC subnet ID to launch cluster leader in. Uses default subnet " "if not specified. This subnet needs to have auto assign IPs turned on.", ) parser.add_argument( "--use_private_ip", dest="use_private_ip", action="store_true", default=False, help="if specified, ignore the public ip of the nodes", ) parser.add_argument( "--nodeTypes", dest="nodeTypes", default=None, type=str, help="Specifies a list of comma-separated node types, each of which is " "composed of slash-separated instance types, and an optional spot " "bid set off by a colon, making the node type preemptible. Instance " "types may appear in multiple node types, and the same node type " "may appear as both preemptible and non-preemptible.\n" "Valid argument specifying two node types:\n" "\tc5.4xlarge/c5a.4xlarge:0.42,t2.large\n" "Node types:\n" "\tc5.4xlarge/c5a.4xlarge:0.42 and t2.large\n" "Instance types:\n" "\tc5.4xlarge, c5a.4xlarge, and t2.large\n" "Semantics:\n" "\tBid $0.42/hour for either c5.4xlarge or c5a.4xlarge instances,\n" "\ttreated interchangeably, while they are available at that price,\n" "\tand buy t2.large instances at full price\n" "Must also provide the --workers argument to specify how many " "workers of each node type to create.", ) parser.add_argument( "-w", "--workers", dest="workers", default=None, type=str, help="Comma-separated list of the ranges of numbers of workers of each " "node type to launch, such as '0-2,5,1-3'. If a range is given, " "workers will automatically be launched and terminated by the cluster " "to auto-scale to the workload.", ) parser.add_argument( "--leaderStorage", dest="leaderStorage", type=int, default=50, help="Specify the size (in gigabytes) of the root volume for the leader " "instance. This is an EBS volume.", ) parser.add_argument( "--nodeStorage", dest="nodeStorage", type=int, default=50, help="Specify the size (in gigabytes) of the root volume for any worker " "instances created when using the -w flag. This is an EBS volume.", ) parser.add_argument( "--forceDockerAppliance", dest="forceDockerAppliance", action="store_true", default=False, help="Disables sanity checking the existence of the docker image specified " "by TOIL_APPLIANCE_SELF, which Toil uses to provision mesos for " "autoscaling.", ) parser.add_argument( "--awsEc2ProfileArn", dest="awsEc2ProfileArn", default=None, type=str, help="If provided, the specified ARN is used as the instance profile for EC2 instances." "Useful for setting custom IAM profiles. If not specified, a new IAM role is created " "by default with sufficient access to perform basic cluster operations.", ) parser.add_argument( "--awsEc2ExtraSecurityGroupId", dest="awsEc2ExtraSecurityGroupIds", default=[], action="append", help="Any additional security groups to attach to EC2 instances. Note that a security group " "with its name equal to the cluster name will always be created, thus ensure that " "the extra security groups do not have the same name as the cluster name.", ) parser.add_argument( "--allowFuse", type=opt_strtobool, default=True, help="Enable both the leader and worker nodes to be able to run Singularity with FUSE. For " "Kubernetes, this will make the leader privileged and ask workers to run as privileged. " "(default: %(default)s)", ) # TODO Set Aws Profile in CLI options options = parser.parse_args() set_logging_from_options(options) tags = create_tags_dict(options.tags) if options.tags else build_tag_dict_from_env() # Get worker node types worker_node_types = parse_node_types(options.nodeTypes) check_valid_node_types( options.provisioner, worker_node_types + [({options.leaderNodeType}, None)] ) # Holds string ranges, like "5", or "3-10" worker_node_ranges = options.workers.split(",") if options.workers else [] # checks the validity of TOIL_APPLIANCE_SELF before proceeding applianceSelf(forceDockerAppliance=options.forceDockerAppliance) # This holds either ints to launch static nodes, or tuples of ints # specifying ranges to launch managed auto-scaling nodes, for each type. nodeCounts: list[Union[int, tuple[int, int]]] = [] if (worker_node_types != [] or worker_node_ranges != []) and not ( worker_node_types != [] and worker_node_ranges != [] ): raise RuntimeError("The --nodeTypes option requires --workers, and visa versa.") if worker_node_types and worker_node_ranges: if not len(worker_node_types) == len(worker_node_ranges): raise RuntimeError( "List of worker count ranges must be the same length as the list of node types." ) for spec in worker_node_ranges: if "-" in spec: # Provision via autoscaling parts = spec.split("-") if len(parts) != 2: raise RuntimeError("Unacceptable range: " + spec) nodeCounts.append((int(parts[0]), int(parts[1]))) else: # Provision fixed nodes nodeCounts.append(int(spec)) owner = ( options.owner or os.getenv("TOIL_OWNER_TAG") or options.keyPairName or "toil" ) # Check to see if the user specified a zone. If not, see if one is stored in an environment variable. options.zone = options.zone or os.environ.get( f"TOIL_{options.provisioner.upper()}_ZONE" ) if not options.zone: raise RuntimeError( f"Please provide a value for --zone or set a default in the " f"TOIL_{options.provisioner.upper()}_ZONE environment variable." ) if options.clusterType == "mesos": logger.warning( "You are using a Mesos cluster, which is no longer recommended as Toil is " "transitioning to Kubernetes-based clusters. Consider switching to " "--clusterType=kubernetes instead." ) if options.clusterType is None: logger.warning( 'Argument --clusterType is not set... using "mesos". ' "In future versions of Toil, the default cluster scheduler will be " 'set to "kubernetes" if the cluster type is not specified.' ) options.clusterType = "mesos" logger.info("Creating cluster %s...", options.clusterName) cluster = cluster_factory( provisioner=options.provisioner, clusterName=options.clusterName, clusterType=options.clusterType, zone=options.zone, nodeStorage=options.nodeStorage, enable_fuse=options.allowFuse, ) cluster.launchCluster( leaderNodeType=options.leaderNodeType, leaderStorage=options.leaderStorage, owner=owner, keyName=options.keyPairName, botoPath=options.botoPath, userTags=tags, network=options.network, vpcSubnet=options.vpcSubnet, awsEc2ProfileArn=options.awsEc2ProfileArn, awsEc2ExtraSecurityGroupIds=options.awsEc2ExtraSecurityGroupIds, ) for typeNum, spec in enumerate(nodeCounts): # For each batch of workers to make wanted = worker_node_types[typeNum] if isinstance(spec, int): # Make static nodes if spec == 0: # Don't make anything continue if wanted[1] is None: # Make non-spot instances cluster.addNodes(nodeTypes=wanted[0], numNodes=spec, preemptible=False) else: # We have a spot bid cluster.addNodes( nodeTypes=wanted[0], numNodes=spec, preemptible=True, spotBid=wanted[1], ) elif isinstance(spec, tuple): # Make a range of auto-scaling nodes max_count, min_count = spec if max_count < min_count: # Flip them around min_count, max_count = max_count, min_count if max_count == 0: # Don't want any continue if wanted[1] is None: # Make non-spot instances cluster.addManagedNodes( nodeTypes=wanted[0], minNodes=min_count, maxNodes=max_count, preemptible=False, ) else: # Bid at the given price. cluster.addManagedNodes( nodeTypes=wanted[0], minNodes=min_count, maxNodes=max_count, preemptible=True, spotBid=wanted[1], ) logger.info("Cluster created successfully.")