scripts/autoscaling/aws/nodeup.py (1,388 lines of code) (raw):

# Copyright 2017-2022 EPAM Systems, Inc. (https://www.epam.com/) # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import argparse import base64 import socket import random from datetime import datetime, timedelta from time import sleep import boto3 from botocore.config import Config import pykube import logging import os import pytz from botocore.exceptions import ClientError from pipeline import Logger, TaskStatus, PipelineAPI, pack_script_contents, pack_powershell_script_contents from itertools import groupby from operator import itemgetter from random import randint import json from distutils.version import LooseVersion import fnmatch import sys import math import socket import jwt SPOT_UNAVAILABLE_EXIT_CODE = 5 LIMIT_EXCEEDED_EXIT_CODE = 6 INSUFFICIENT_CAPACITY_EXIT_CODE = 7 RUNNING = 16 PENDING = 0 EBS_TYPE_PARAM = "cluster.aws.ebs.type" NETWORKS_PARAM = "cluster.networks.config" NODE_WAIT_TIME_SEC = "cluster.nodeup.wait.sec" NODEUP_TASK = "InitializeNode" LIMIT_EXCEEDED_ERROR_MASSAGE = 'Instance limit exceeded. A new one will be launched as soon as free space will be available.' INSUFFICIENT_CAPACITY_ERROR_MASSAGE = 'Insufficient instance capacity.' BOTO3_RETRY_COUNT = 6 MIN_SWAP_DEVICE_SIZE = 5 LOCAL_NVME_INSTANCE_TYPES = [ 'c5d.' , 'm5d.', 'r5d.' ] DEFAULT_FS_TYPE = 'btrfs' SUPPORTED_FS_TYPES = [DEFAULT_FS_TYPE, 'ext4'] POOL_ID_KEY = 'pool_id' KUBE_CONFIG_PATH = '~/.kube/config' current_run_id = 0 api_url = None api_token = None script_path = None def is_run_id_numerical(run_id): try: int(run_id) return True except ValueError: return False def is_api_logging_enabled(): global api_token global api_url global current_run_id return is_run_id_numerical(current_run_id) and api_url and api_token def pipe_log_init(run_id): global api_token global api_url global current_run_id current_run_id = run_id api_url = os.environ["API"] api_token = os.environ["API_TOKEN"] if not is_api_logging_enabled(): logging.basicConfig(filename='nodeup.log', level=logging.INFO, format='%(asctime)s %(message)s') def pipe_log_warn(message): global api_token global api_url global script_path global current_run_id if is_api_logging_enabled(): Logger.warn('[{}] {}'.format(current_run_id, message), task_name=NODEUP_TASK, run_id=current_run_id, api_url=api_url, log_dir=script_path, omit_console=True) else: logging.warn(message) def pipe_log(message, status=TaskStatus.RUNNING): global api_token global api_url global script_path global current_run_id if is_api_logging_enabled(): Logger.log_task_event(NODEUP_TASK, '[{}] {}'.format(current_run_id, message), run_id=current_run_id, instance=str(current_run_id), log_dir=script_path, api_url=api_url, status=status, omit_console=True) else: # Log as always logging.info(message) ############################# __CLOUD_METADATA__ = None __CLOUD_TAGS__ = None def get_preference(preference_name): pipe_api = PipelineAPI(api_url, None) try: preference = pipe_api.get_preference(preference_name) if 'value' in preference: return preference['value'] else: return None except: pipe_log('An error occured while getting preference {}, empty value is going to be used'.format(preference_name)) return None def get_run_info(run_id): pipe_api = PipelineAPI(api_url, None) try: return pipe_api.load_run_efficiently(run_id) except: pipe_log('An error occured while getting info for run id {}'.format(run_id)) return None def load_cloud_config(): global __CLOUD_METADATA__ global __CLOUD_TAGS__ if not __CLOUD_METADATA__: pipe_api = PipelineAPI(api_url, None) preference = pipe_api.get_preference(NETWORKS_PARAM) if preference: data = json.loads(preference['value']) if 'regions' not in data: pipe_log('Malformed networks config file: missing "regions" section. Update config file.') raise RuntimeError('Malformed networks config file: missing "regions" section. Update config file.') __CLOUD_METADATA__ = data['regions'] if 'tags' in data: __CLOUD_TAGS__ = data['tags'] return __CLOUD_METADATA__, __CLOUD_TAGS__ def get_region_settings(aws_region): full_settings, tags = load_cloud_config() for region_settings in full_settings: if 'name' in region_settings and region_settings['name'] == aws_region: return region_settings pipe_log('Failed to find networks settings for region: %s.' % aws_region) return None def get_cloud_config_section(aws_region, section_name): cloud_metadata = get_region_settings(aws_region) if cloud_metadata and section_name in cloud_metadata and len(cloud_metadata[section_name]) > 0: return cloud_metadata[section_name] else: return None # FIXME: this method shall be synced with the pipe-common/autoscaling/awsprovider.py def get_networks_config(ec2, aws_region, instance_type): allowed_networks = get_cloud_config_section(aws_region, "networks") valid_networks = {} if allowed_networks and len(allowed_networks) > 0: try: allowed_networks_details = ec2.describe_subnets(SubnetIds=allowed_networks.values())['Subnets'] # Get the list of AZs, which offer the "instance_type", as some of the AZs can't provide certain types and an error is thrown: # Your requested instance type (xxxxx) is not supported in your requested Availability Zone (xxxxx) # The list of valid AZs will be placed into "instance_type_offerings_az_list" instance_type_offerings = ec2.describe_instance_type_offerings( LocationType='availability-zone', Filters=[ { 'Name': 'instance-type', 'Values': [ instance_type ] } ] ) instance_type_offerings_az_list = [x['Location'] for x in instance_type_offerings['InstanceTypeOfferings']] instance_type_offerings_az_list_empty = len(instance_type_offerings_az_list) == 0 if instance_type_offerings_az_list_empty: pipe_log('Empty list for the instance type offerings. Considering this as "All AZs offer this type"') else: pipe_log('Instance type {} is available only in the following AZs: {}'.format(instance_type, instance_type_offerings_az_list)) for network in allowed_networks_details: subnet_id = network['SubnetId'] az_name = network['AvailabilityZone'] subnet_ips = int(network['AvailableIpAddressCount']) az_provides_instance_type = az_name in instance_type_offerings_az_list or instance_type_offerings_az_list_empty pipe_log('Subnet {} in {} zone has {} available IP addresses. Offers {} instance type: {}'.format(subnet_id, az_name, str(subnet_ips), instance_type, az_provides_instance_type)) if subnet_ips > 0 and az_provides_instance_type: valid_networks.update({ az_name: subnet_id }) except Exception as allowed_networks_details_e: pipe_log_warn('Cannot get the details of the subnets, so we do not validate subnet usage:\n' + str(allowed_networks_details_e)) valid_networks = allowed_networks return valid_networks def get_instance_images_config(aws_region): return get_cloud_config_section(aws_region, "amis") def get_security_groups(aws_region, security_groups): if security_groups: return security_groups.split(",") config = get_cloud_config_section(aws_region, "security_group_ids") if not config: raise RuntimeError('Security group setting is required to run an instance') return config def get_well_known_hosts(aws_region): return get_cloud_config_section(aws_region, "well_known_hosts") def get_allowed_instance_image(cloud_region, instance_type, instance_platform, default_image, api_token, run_id): default_init_script = os.path.dirname(os.path.abspath(__file__)) + '/init.sh' default_embedded_scripts = None default_object = { "instance_mask_ami": default_image, "instance_mask": None, "init_script": default_init_script, "embedded_scripts": default_embedded_scripts, "fs_type": DEFAULT_FS_TYPE, "additional_spec": None } instance_images_config = get_instance_images_config(cloud_region) if not instance_images_config: return default_object for image_config in instance_images_config: permissions = set(image_config["permissions"]) if "permissions" in image_config else None try: if permissions: pipe_log('Image config with restricted roles found ({}), checking permissions'.format(permissions)) api_token_data = api_token.split(".")[1] api_token_data = api_token_data + "="*divmod(len(api_token_data),4)[1] api_token_data = json.loads(base64.urlsafe_b64decode(api_token_data)) api_token_roles = set(api_token_data["roles"]) if not (permissions & api_token_roles): continue except: # If something is wrong with the permissions check - do not use a restricted image continue docker_image_list = image_config["docker_image"] if "docker_image" in image_config else None try: if docker_image_list: pipe_log('Image config with restricted docker image found ({}), checking for match with a current run'.format(docker_image_list)) run_info = get_run_info(run_id) if not run_info or \ not 'dockerImage' in run_info or \ not run_info['dockerImage'] in docker_image_list: continue except: # If something is wrong with the permissions check - do not use a restricted image continue image_platform = image_config["platform"] instance_mask = image_config["instance_mask"] instance_mask_ami = image_config["ami"] init_script = image_config.get("init_script", default_object["init_script"]) embedded_scripts = image_config.get("embedded_scripts", default_object["embedded_scripts"]) fs_type = image_config.get("fs_type", DEFAULT_FS_TYPE) additional_spec = image_config.get("additional_spec", None) if image_platform == instance_platform and fnmatch.fnmatch(instance_type, instance_mask): return { "instance_mask_ami": instance_mask_ami, "instance_mask": instance_mask, "init_script": init_script, "embedded_scripts": embedded_scripts, "fs_type": fs_type, "additional_spec": additional_spec} return default_object def get_possible_kube_node_names(ec2, ins_id): try: response = ec2.describe_instances(InstanceIds=[ins_id]) nodename_full = response['Reservations'][0]['Instances'][0]['PrivateDnsName'] nodename = nodename_full.split('.', 1)[0] return [nodename, nodename_full, ins_id] except: return [] ############################# ROOT_DEVICE_DEFAULT = { "DeviceName": "/dev/sda1", "Ebs": {"VolumeSize": 40} } def root_device(ec2, ins_img, kms_encyr_key_id): try: pipe_log('- Getting image {} block device mapping details'.format(ins_img)) img_details = ec2.describe_images(ImageIds=[ins_img]) pipe_log('- Block device mapping details received. Proceeding with validation'.format(ins_img)) if len(img_details["Images"]) == 0: raise RuntimeError("No images found for {}".format(ins_img)) img_obj = img_details["Images"][0] if "BlockDeviceMappings" not in img_obj or len(img_obj["BlockDeviceMappings"]) == 0: raise RuntimeError("No BlockDeviceMappings found for {}".format(ins_img)) block_device_obj = img_obj["BlockDeviceMappings"][0] block_device_name = block_device_obj["DeviceName"] if "Ebs" not in block_device_obj: raise RuntimeError("No Ebs definition found for device {} in image {}".format(block_device_name, ins_img)) ebs_type_param = get_preference(EBS_TYPE_PARAM) device_spec = { "DeviceName": block_device_name, "Ebs": { "VolumeSize": block_device_obj["Ebs"]["VolumeSize"], "VolumeType": ebs_type_param} } pipe_log('- The requested EBS volume type for {} device is {}'.format(block_device_name, ebs_type_param)) if kms_encyr_key_id: device_spec["Ebs"]["Encrypted"] = True device_spec["Ebs"]["KmsKeyId"] = kms_encyr_key_id return device_spec except Exception as e: pipe_log('Error while getting image {} root device, using default device: {}\n{}'.format(ins_img, ROOT_DEVICE_DEFAULT["DeviceName"], str(e))) return ROOT_DEVICE_DEFAULT def block_device(ins_hdd, kms_encyr_key_id, name="/dev/sdb"): ebs_type_param = get_preference(EBS_TYPE_PARAM) block_device_spec = { "DeviceName": name, "Ebs": { "VolumeSize": ins_hdd, "VolumeType": ebs_type_param, "DeleteOnTermination": True, "Encrypted": True } } pipe_log('- The requested EBS volume type for {} device is {}'.format(name, ebs_type_param)) if kms_encyr_key_id: block_device_spec["Ebs"]["KmsKeyId"] = kms_encyr_key_id return block_device_spec def resource_tags(cloud_region): tags = [] region_tags = get_cloud_config_section(cloud_region, "tags") config_regions, config_tags = load_cloud_config() merged_tags = merge_tags(region_tags, config_tags) if merged_tags is None: return tags for key, value in merged_tags.iteritems(): tags.append({"Key": key, "Value": value}) return tags def merge_tags(region_tags, global_tags): if region_tags is None: return global_tags if global_tags is None: return region_tags merged = {} for key, value in global_tags.iteritems(): merged[key] = value for key, value in region_tags.iteritems(): merged[key] = value return merged def run_id_tag(run_id, pool_id): tags = [{ 'Value': run_id, 'Key': 'Name' }] if pool_id: tags.append({ 'Value': pool_id, 'Key': POOL_ID_KEY }) return tags def get_tags(run_id, cloud_region, pool_id, input_tags): tags = run_id_tag(run_id, pool_id) res_tags = resource_tags(cloud_region) if res_tags: tags.extend(res_tags) if input_tags: tags.extend(input_tags) return tags def run_id_filter(run_id): return { 'Name': 'tag:Name', 'Values': [run_id] } def get_specified_subnet(subnet, availability_zone): pipe_log('- Desired subnet id {} was specified, trying to use it'.format(subnet)) if availability_zone: pipe_log('- Desired AZ {} will be ignored'.format(availability_zone)) return subnet def get_random_subnet(ec2): subnets = ec2.describe_subnets() if "Subnets" in subnets: return random.choice(subnets['Subnets'])['SubnetId'] return None def run_instance(api_url, api_token, api_user, bid_price, ec2, aws_region, ins_hdd, kms_encyr_key_id, ins_img, ins_platform, ins_key, ins_type, is_spot, num_rep, run_id, pool_id, time_rep, kube_ip, kubeadm_token, kubeadm_cert_hash, kube_node_token, kube_cluster_name, kube_client, global_distribution_url, pre_pull_images, instance_additional_spec, availability_zone, security_groups, subnet, network_interface, is_dedicated, node_ssh_port, performance_network, input_tags, docker_data_root, docker_storage_driver, skip_system_images_load): swap_size = get_swap_size(aws_region, ins_type, is_spot) user_data_script = get_user_data_script(api_url, api_token, api_user, aws_region, ins_type, ins_img, ins_platform, kube_ip, kubeadm_token, kubeadm_cert_hash, kube_node_token, kube_cluster_name, global_distribution_url, swap_size, pre_pull_images, node_ssh_port, run_id, docker_data_root, docker_storage_driver, skip_system_images_load) if is_spot: ins_id, ins_ip = find_spot_instance(ec2, aws_region, bid_price, run_id, pool_id, ins_img, ins_type, ins_key, ins_hdd, kms_encyr_key_id, user_data_script, num_rep, time_rep, swap_size, kube_client, instance_additional_spec, availability_zone, security_groups, subnet, network_interface, is_dedicated, performance_network, input_tags) else: ins_id, ins_ip = run_on_demand_instance(ec2, aws_region, ins_img, ins_key, ins_type, ins_hdd, kms_encyr_key_id, run_id, pool_id, user_data_script, num_rep, time_rep, swap_size, kube_client, instance_additional_spec, availability_zone, security_groups, subnet, network_interface, is_dedicated, performance_network, input_tags) return ins_id, ins_ip def run_on_demand_instance(ec2, aws_region, ins_img, ins_key, ins_type, ins_hdd, kms_encyr_key_id, run_id, pool_id, user_data_script, num_rep, time_rep, swap_size, kube_client, instance_additional_spec, availability_zone, security_groups, subnet, network_interface, is_dedicated, performance_network, input_tags): pipe_log('Creating on demand instance') allowed_networks = get_networks_config(ec2, aws_region, ins_type) additional_args = instance_additional_spec if instance_additional_spec else {} subnet_id = None az_name = None if subnet: subnet_id = get_specified_subnet(subnet, availability_zone) elif allowed_networks and len(allowed_networks) > 0: if availability_zone: pipe_log('- Desired availability zone {} was specified, trying to use it'.format(availability_zone)) for az_name, az_subnet_id in allowed_networks.iteritems(): if az_name == availability_zone: az_name = availability_zone subnet_id = az_subnet_id break if subnet_id is None: az_num = randint(0, len(allowed_networks)-1) az_name = allowed_networks.items()[az_num][0] subnet_id = allowed_networks.items()[az_num][1] pipe_log('- Networks list found, subnet {} in AZ {} will be used'.format(subnet_id, az_name)) if network_interface: if subnet_id: pipe_log('- Network interface specified. Desired subnet id {} and performance network config will be ignored.'.format(subnet_id)) network_interface, subnet_id, az_name = fetch_network_interface_info(ec2, network_interface, availability_zone, allowed_networks) additional_args.update({ "NetworkInterfaces": [ { "DeviceIndex": 0, "NetworkInterfaceId": network_interface } ] }) elif performance_network: pipe_log('- Performance network requested.') if not subnet or not subnet_id: pipe_log('- Subnet is not specified, trying to get a random one...') subnet_id = get_random_subnet(ec2) pipe_log('- Subnet: {} will be used.'.format(subnet_id)) if subnet_id: additional_args.update({ "NetworkInterfaces": [ { 'DeleteOnTermination': True, 'DeviceIndex': 0, 'SubnetId': subnet_id, 'Groups': get_security_groups(aws_region, security_groups), 'InterfaceType': 'efa' } ] }) else: pipe_log('- Cannot define subnet to be launched in, will skip performance network setup and continue with default options...') pipe_log('- Default subnet in random AZ will be used') additional_args.update({'SecurityGroupIds': get_security_groups(aws_region, security_groups)}) elif subnet_id: additional_args.update({ 'SubnetId': subnet_id, 'SecurityGroupIds': get_security_groups(aws_region, security_groups) }) else: pipe_log('- Networks list NOT found, default subnet in random AZ will be used') additional_args.update({'SecurityGroupIds': get_security_groups(aws_region, security_groups)}) if is_dedicated: additional_args.update({ "Placement": { 'Tenancy': "dedicated" } }) if 'MetadataOptions' not in additional_args: additional_args.update({'MetadataOptions': { 'HttpTokens': 'optional', 'HttpPutResponseHopLimit': 2, 'HttpEndpoint': 'enabled' }}) response = {} try: response = ec2.run_instances( ImageId=ins_img, MinCount=1, MaxCount=1, KeyName=ins_key, InstanceType=ins_type, UserData=user_data_script, BlockDeviceMappings=get_block_devices(ec2, ins_img, ins_type, ins_hdd, kms_encyr_key_id, swap_size), TagSpecifications=[ { 'ResourceType': 'instance', "Tags": get_tags(run_id, aws_region, pool_id, input_tags) } ], **additional_args ) except ClientError as client_error: if 'InstanceLimitExceeded' in client_error.message: pipe_log_warn(LIMIT_EXCEEDED_ERROR_MASSAGE) sys.exit(LIMIT_EXCEEDED_EXIT_CODE) elif 'InsufficientInstanceCapacity' in client_error.message: pipe_log_warn(INSUFFICIENT_CAPACITY_ERROR_MASSAGE) sys.exit(INSUFFICIENT_CAPACITY_EXIT_CODE) else: raise client_error ins_id = response['Instances'][0]['InstanceId'] ins_ip = response['Instances'][0]['PrivateIpAddress'] pipe_log('- Instance created. ID: {}, IP: {}'.format(ins_id, ins_ip)) status_code = get_current_status(ec2, ins_id) rep = 0 while status_code != RUNNING: pipe_log('- Waiting for status checks completion...') sleep(time_rep) status_code = get_current_status(ec2, ins_id) rep = increment_or_fail(num_rep, rep, 'Exceeded retry count ({}) for instance ({}) status check'.format(num_rep, ins_id), ec2_client=ec2, kill_instance_id_on_fail=ins_id, kube_client=kube_client) pipe_log('Instance created. ID: {}, IP: {}\n-'.format(ins_id, ins_ip)) ebs_tags = resource_tags(aws_region) if input_tags: ebs_tags.extend(input_tags) if ebs_tags: instance_description = ec2.describe_instances(InstanceIds=[ins_id])['Reservations'][0]['Instances'][0] volumes = instance_description['BlockDeviceMappings'] for volume in volumes: ec2.create_tags( Resources=[volume['Ebs']['VolumeId']], Tags=ebs_tags) return ins_id, ins_ip def get_block_devices(ec2, ins_img, ins_type, ins_hdd, kms_encyr_key_id, swap_size): # Add root device block_devices = [root_device(ec2, ins_img, kms_encyr_key_id)] # Check if this is one of 'x5d' instance types (e.g. c5d), which support super fast local SSD # For these instance - we don't create "general" EBS-SSDs local_nvme_family = next(iter([ x for x in LOCAL_NVME_INSTANCE_TYPES if ins_type.startswith(x) ]), None) if local_nvme_family: pipe_log('Instance type family supports local NVME ({}). Will NOT create EBS volume'.format(ins_type)) else: block_devices.append(block_device(ins_hdd, kms_encyr_key_id)) # Add SWAP, if requested if swap_size is not None and swap_size > 0: block_devices.append(block_device(swap_size, kms_encyr_key_id, name="/dev/sdc")) return block_devices def fetch_network_interface_info(ec2, network_interface, availability_zone, allowed_networks): pipe_log('- Specific network interface was provided {}, trying to use it'.format(network_interface)) described_enis = ec2.describe_network_interfaces( NetworkInterfaceIds=[ network_interface ] ) if described_enis is None or described_enis["NetworkInterfaces"] is None or len(described_enis["NetworkInterfaces"]) == 0: raise RuntimeError('- Cannot describe network interface {}, operation failed.'.format(network_interface)) described_eni = described_enis["NetworkInterfaces"][0] eni_az_name = described_eni["AvailabilityZone"] eni_subnet_id = described_eni["SubnetId"] eni_status = described_eni["Status"] if availability_zone is not None and availability_zone != eni_az_name: raise RuntimeError('- Specified network interface {} is located in az {}, but explicitly configured az is {}, operation failed.'.format(network_interface, eni_az_name, availability_zone)) if allowed_networks and len(allowed_networks) > 0 and eni_az_name not in [az_name for az_name, _ in allowed_networks.iteritems()]: raise RuntimeError('- Specified network interface {} is located in az {}, but this az is not in allowed list, operation failed.'.format(network_interface, eni_az_name)) subnet_id = eni_subnet_id az_name = eni_az_name pipe_log('- Subnet {} in az {} will be used'.format(subnet_id, az_name)) if eni_status is None or eni_status != "available": raise RuntimeError('- Status of provided network interface {} is {}, but should be "available", operation failed.'.format(network_interface, eni_status)) pipe_log('- Network Interface {} was specified and all criteria are met, subnet {} in AZ {} will be used'.format(network_interface, subnet_id, az_name)) return network_interface,subnet_id, az_name def get_certs_string(): global api_token global api_url command_pattern = 'mkdir -p /etc/docker/certs.d/{url} && echo "{cert}" >> /etc/docker/certs.d/{url}/ca.crt' if api_url and api_token: pipe_api = PipelineAPI(api_url, None) result = pipe_api.load_certificates() if not result: return "", "" else: repo_urls = [] entries = [] for url, cert in result.iteritems(): repo_urls.append(url) entries.append(command_pattern.format(url=url, cert=cert)) return ",".join(repo_urls), " && ".join(entries) return "", "" def get_well_known_hosts_string(aws_region): pipe_log('Setting well-known hosts an instance in {} region'.format(aws_region)) command_pattern = 'echo {well_known_ip} {well_known_host} >> /etc/hosts' well_known_list = get_well_known_hosts(aws_region) if not well_known_list or len(well_known_list) == 0: return '' entries = [] for well_known_item in well_known_list: if not 'ip' in well_known_item or not 'host' in well_known_item: continue well_known_ip = well_known_item['ip'] well_known_host = well_known_item['host'] if not well_known_ip or not well_known_host: continue entries.append(command_pattern.format(well_known_ip=well_known_ip, well_known_host=well_known_host)) pipe_log('-> {}={}'.format(well_known_ip, well_known_host)) if len(entries) == 0: return '' return ' && '.join(entries) def replace_common_params(aws_region, init_script, config_section): pipe_log('Configuring {} settings for an instance in {} region'.format(config_section, aws_region)) common_list = get_cloud_config_section(aws_region, config_section) if not common_list: return init_script for common_item in common_list: if not 'name' in common_item or not 'path' in common_item: continue item_name = common_item['name'] item_path = common_item['path'] if not item_name: continue if item_path == None: item_path = '' init_script = init_script.replace('@' + item_name + '@', item_path) pipe_log('-> {}={}'.format(item_name, item_path)) return init_script def replace_proxies(aws_region, init_script): return replace_common_params(aws_region, init_script, "proxies") def get_swap_size(aws_region, ins_type, is_spot): pipe_log('Configuring swap settings for an instance in {} region'.format(aws_region)) swap_params = get_cloud_config_section(aws_region, "swap") if swap_params is None: return None swap_ratio = get_swap_ratio(swap_params) if swap_ratio is None: pipe_log("Swap ratio is not configured. Swap configuration will be skipped.") return None ram = get_instance_ram(aws_region, ins_type, is_spot) if ram is None: pipe_log("Failed to determine instance RAM. Swap configuration will be skipped.") return None swap_size = int(math.ceil(swap_ratio * ram)) if swap_size >= MIN_SWAP_DEVICE_SIZE: pipe_log("Swap device will be configured with size %d." % swap_size) return swap_size return None def replace_swap(swap_size, init_script): if swap_size is not None: return init_script.replace('@swap_size@', str(swap_size)) return init_script def get_instance_ram(aws_region, ins_type, is_spot): api = PipelineAPI(api_url, None) region_id = get_region_id(aws_region, api) if region_id is None: return None instance_types = api.get_allowed_instance_types(region_id, spot=is_spot) ram = get_ram_from_group(instance_types, 'cluster.allowed.instance.types', ins_type) if ram is None: ram = get_ram_from_group(instance_types, 'cluster.allowed.instance.types.docker', ins_type) return ram def get_ram_from_group(instance_types, group, instance_type): if group in instance_types: for current_type in instance_types[group]: if current_type['name'] == instance_type: return current_type['memory'] return None def get_region_id(aws_region, api): regions = api.get_regions() if regions is None: return None for region in regions: if region.provider == 'AWS' and region.region_id == aws_region: return region.id return None def get_swap_ratio(swap_params): for swap_param in swap_params: if not 'name' in swap_param or not 'path' in swap_param: continue item_name = swap_param['name'] if item_name == 'swap_ratio': item_value = swap_param['path'] if item_value: try: return float(item_value) except ValueError: pipe_log("Unexpected swap_ratio value: {}".format(item_value)) return None def replace_docker_images(pre_pull_images, user_data_script): global api_token payload = jwt.decode(api_token, verify=False) if 'sub' in payload: subject = payload['sub'] user_data_script = user_data_script\ .replace("@PRE_PULL_DOCKERS@", ",".join(pre_pull_images))\ .replace("@API_USER@", subject) return user_data_script else: raise RuntimeError("Pre-pulled docker initialization failed: unable to parse JWT token for docker auth.") def get_user_data_script(api_url, api_token, api_user, aws_region, ins_type, ins_img, ins_platform, kube_ip, kubeadm_token, kubeadm_cert_hash, kube_node_token, kube_cluster_name, global_distribution_url, swap_size, pre_pull_images, node_ssh_port, run_id, docker_data_root, docker_storage_driver, skip_system_images_load): allowed_instance = get_allowed_instance_image(aws_region, ins_type, ins_platform, ins_img, api_token, run_id) if allowed_instance and allowed_instance["init_script"]: init_script = open(allowed_instance["init_script"], 'r') user_data_script = init_script.read() repo_urls_string, certs_string = get_certs_string() well_known_string = get_well_known_hosts_string(aws_region) init_script.close() user_data_script = replace_proxies(aws_region, user_data_script) user_data_script = replace_swap(swap_size, user_data_script) user_data_script = replace_docker_images(pre_pull_images, user_data_script) fs_type = allowed_instance.get('fs_type', DEFAULT_FS_TYPE) if fs_type not in SUPPORTED_FS_TYPES: pipe_log_warn('Unsupported filesystem type is specified: %s. Falling back to default value %s.' % (fs_type, DEFAULT_FS_TYPE)) fs_type = DEFAULT_FS_TYPE user_data_script = user_data_script.replace('@DOCKER_CERTS@', certs_string) \ .replace('@DOCKER_REGISTRY_URLS@', repo_urls_string) \ .replace('@WELL_KNOWN_HOSTS@', well_known_string) \ .replace('@KUBE_IP@', kube_ip) \ .replace('@KUBE_TOKEN@', kubeadm_token) \ .replace('@KUBE_CERT_HASH@', kubeadm_cert_hash) \ .replace('@KUBE_NODE_TOKEN@', kube_node_token) \ .replace('@KUBE_CLUSTER_NAME@', kube_cluster_name) \ .replace('@API_URL@', api_url) \ .replace('@API_TOKEN@', api_token) \ .replace('@API_USER@', api_user) \ .replace('@FS_TYPE@', fs_type) \ .replace('@NODE_SSH_PORT@', node_ssh_port) \ .replace('@DOCKER_DATA_ROOT@', docker_data_root) \ .replace('@DOCKER_STORAGE_DRIVER@', docker_storage_driver) \ .replace('@SKIP_SYSTEM_IMAGES_LOAD@', skip_system_images_load) \ .replace('@GLOBAL_DISTRIBUTION_URL@', global_distribution_url) \ .replace('@KUBE_RESERVED_MEM@', os.getenv('KUBE_RESERVED_MEM', '')) \ .replace('@SYSTEM_RESERVED_MEM@', os.getenv('SYSTEM_RESERVED_MEM', '')) embedded_scripts = {} if allowed_instance["embedded_scripts"]: for embedded_name, embedded_path in allowed_instance["embedded_scripts"].items(): embedded_scripts[embedded_name] = open(embedded_path, 'r').read() if ins_platform == 'windows': return pack_powershell_script_contents(user_data_script, embedded_scripts) else: return pack_script_contents(user_data_script, embedded_scripts) else: raise RuntimeError('Unable to get init.sh path') def get_current_status(ec2, ins_id): try: response = ec2.describe_instance_status(InstanceIds=[ins_id]) if len(response['InstanceStatuses']) > 0: return response['InstanceStatuses'][0]['InstanceState']['Code'] else: return -1 except ClientError as client_error: if 'does not exist' in client_error.message: pipe_log_warn('Get status request for instance %s returned error %s.' % (ins_id, client_error.message)) return -1 else: raise client_error def poll_instance(sock, timeout, ip, port): result = -1 sock.settimeout(float(timeout)) try: result = sock.connect_ex((ip, port)) except Exception as e: pass sock.settimeout(None) return result def check_instance(ec2, ins_id, run_id, num_rep, time_rep, kube_client): pipe_log('Checking instance ({}) boot state'.format(ins_id)) port=8888 response = ec2.describe_instances(InstanceIds=[ins_id]) ipaddr = response['Reservations'][0]['Instances'][0]['PrivateIpAddress'] sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) pipe_log('- Waiting for instance boot up...') result = poll_instance(sock, time_rep, ipaddr, port) rep = 0 active = False while result != 0 or not active: sleep(time_rep) active = instance_is_active(ec2, ins_id) result = poll_instance(sock, time_rep, ipaddr, port) rep = increment_or_fail(num_rep, rep, 'Exceeded retry count ({}) for instance ({}) network check on port {}'.format(num_rep, ins_id, port), ec2_client=ec2, kill_instance_id_on_fail=ins_id, kube_client=kube_client) pipe_log('Instance is booted. ID: {}, IP: {}\n-'.format(ins_id, ipaddr)) def label_node(nodename, run_id, api, cluster_name, cluster_role, aws_region, additional_labels): pipe_log('Assigning instance {} to RunID: {}'.format(nodename, run_id)) obj = { "apiVersion": "v1", "kind": "Node", "metadata": { "name": nodename, "labels": { "runid": run_id, "cloud_region": aws_region } } } if additional_labels: obj["metadata"]["labels"].update(additional_labels) if cluster_name: obj["metadata"]["labels"]["cp-cluster-name"] = cluster_name if cluster_role: obj["metadata"]["labels"]["cp-cluster-role"] = cluster_role pykube.Node(api, obj).update() pipe_log('Instance {} is assigned to RunID: {}\n-'.format(nodename, run_id)) def instance_is_active(ec2, instance_id): status = get_current_status(ec2, instance_id) return status == RUNNING or status == PENDING def verify_run_id(ec2, run_id): pipe_log('Checking if instance already exists for RunID {}'.format(run_id)) response = ec2.describe_instances( Filters=[ run_id_filter(run_id), { 'Name': 'instance-state-name', 'Values': ['pending', 'running'] } ] ) ins_id = '' ins_ip = '' if len(response['Reservations']) > 0 and instance_is_active(ec2, response['Reservations'][0]['Instances'][0]['InstanceId']): ins_id = response['Reservations'][0]['Instances'][0]['InstanceId'] ins_ip = response['Reservations'][0]['Instances'][0]['PrivateIpAddress'] pipe_log('Found existing instance (ID: {}, IP: {}) for RunID {}\n-'.format(ins_id, ins_ip, run_id)) else: pipe_log('No existing instance found for RunID {}\n-'.format(run_id)) return ins_id, ins_ip def verify_regnode(ec2, ins_id, num_rep, time_rep, run_id, api): nodenames = get_possible_kube_node_names(ec2, ins_id) pipe_log('Waiting for instance {} registration in cluster with name(s) {}'.format(ins_id, nodenames)) ret_namenode = '' rep = 0 while rep <= num_rep: ret_namenode = find_node(nodenames, api) if ret_namenode: break rep = increment_or_fail(num_rep, rep, 'Exceeded retry count ({}) for instance (ID: {}, NodeName: {}) cluster registration'.format(num_rep, ins_id, nodenames), ec2_client=ec2, kill_instance_id_on_fail=ins_id, kube_client=api) sleep(time_rep) if ret_namenode: # useless? pipe_log('- Node registered in cluster as {}'.format(ret_namenode)) rep = 0 while rep <= num_rep: node = pykube.Node.objects(api).filter(field_selector={'metadata.name': ret_namenode}) status = node.response['items'][0]['status']['conditions'][3]['status'] if status == u'True': pipe_log('- Node ({}) status is READY'.format(ret_namenode)) break rep = increment_or_fail(num_rep, rep, 'Exceeded retry count ({}) for instance (ID: {}, NodeName: {}) kube node READY check'.format(num_rep, ins_id, ret_namenode), ec2_client=ec2, kill_instance_id_on_fail=ins_id, kube_client=api) sleep(time_rep) rep = 0 pipe_log('- Waiting for system agents initialization...') while rep <= num_rep: pods = pykube.objects.Pod.objects(api).filter(namespace="kube-system", field_selector={"spec.nodeName": ret_namenode}) count_pods = len(pods.response['items']) ready_pods = len([p for p in pods if p.ready]) if count_pods == ready_pods: break pipe_log('- {} of {} agents initialized. Still waiting...'.format(ready_pods, count_pods)) rep = increment_or_fail(num_rep, rep, 'Exceeded retry count ({}) for instance (ID: {}, NodeName: {}) kube system pods check'.format(num_rep, ins_id, ret_namenode), ec2_client=ec2, kill_instance_id_on_fail=ins_id, kube_client=api) sleep(time_rep) pipe_log('Instance {} successfully registred in cluster with name {}\n-'.format(ins_id, ret_namenode)) return ret_namenode def terminate_instance(ec2_client, instance_id, spot_request_id=None, kube_client=None): # Kill AWS instance if not instance_id or len(instance_id) == 0: pipe_log('[ERROR] None or empty string specified when calling terminate_instance, nothing will be done') return try: if spot_request_id: pipe_log('Cancel Spot request ({})'.format(spot_request_id)) ec2_client.cancel_spot_instance_requests(SpotInstanceRequestIds=[spot_request_id]) response = ec2_client.terminate_instances( InstanceIds=[ instance_id ] ) except Exception as terminate_exception: pipe_log('[ERROR] Error during instance {} termination:\n{}'.format(instance_id, str(terminate_exception))) return if 'TerminatingInstances' not in response or len(response['TerminatingInstances']) == 0: pipe_log('[ERROR] Unable to parse response of the {} instance termination request. ' 'TerminatingInstances entry not found or it contains 0 elements') return termination_state=response['TerminatingInstances'][0] prev_state=termination_state['PreviousState']['Name'] current_state=termination_state['CurrentState']['Name'] pipe_log('Instance {} state is changed from {} to {}'.format(instance_id, prev_state, current_state)) # Kill kube node as well (if it was able to register) if kube_client: kube_node_names = get_possible_kube_node_names(ec2_client, instance_id) kube_node_real_name = find_node(kube_node_names, kube_client) if kube_node_real_name: pipe_log('Node {} has been found in the kube cluster - it will be deleted'.format(kube_node_real_name)) try: kube_node = pykube.Node.objects(kube_client).get(name=kube_node_real_name) kube_node.delete() pipe_log('Node {} has been deleted from the kube cluster'.format(kube_node_real_name)) except Exception as node_delete_exception: pipe_log('[ERROR] Cannot delete node {} from the kube cluster:\n{}'.format(kube_node_real_name, str(node_delete_exception))) else: pipe_log('Node {} was not found in the kube cluster'.format(kube_node_real_name)) def increment_or_fail(num_rep, rep, error_message, ec2_client=None, kill_instance_id_on_fail=None, kube_client=None): rep = rep + 1 if rep > num_rep: if kill_instance_id_on_fail: spot_request_id = None pipe_log('[ERROR] Operation timed out and an instance {} will be terminated\n' 'See more details below'.format(kill_instance_id_on_fail)) instance = ec2_client.describe_instances(InstanceIds=[kill_instance_id_on_fail])['Reservations'][0]['Instances'][0] if 'SpotInstanceRequestId' in instance and instance['SpotInstanceRequestId']: spot_request_id = instance['SpotInstanceRequestId'] terminate_instance(ec2_client, kill_instance_id_on_fail, spot_request_id, kube_client) raise RuntimeError(error_message) return rep def find_node(nodes, api): for nodename in nodes: ret_namenode = get_nodename(api, nodename) if ret_namenode: return ret_namenode return '' def get_nodename(api, nodename): node = pykube.Node.objects(api).filter(field_selector={'metadata.name': nodename}) if len(node.response['items']) > 0: return nodename else: return '' def get_mean(values): n = 0 sum = 0.0 for v in values: sum += v n += 1 return sum / n def get_availability_zones(ec2): zones = ec2.describe_availability_zones() return [zone['ZoneName'] for zone in zones['AvailabilityZones']] def get_spot_prices(ec2, aws_region, instance_type, hours=3): prices = [] allowed_networks = get_networks_config(ec2, aws_region, instance_type) if allowed_networks and len(allowed_networks) > 0: zones = list(allowed_networks.keys()) else: zones = get_availability_zones(ec2) history = ec2.describe_spot_price_history( StartTime=datetime.today() - timedelta(hours=hours), EndTime=datetime.today(), InstanceTypes=[instance_type], ProductDescriptions=['Linux/UNIX'], Filters=[{'Name': 'availability-zone', 'Values': zones}], ) history = history['SpotPriceHistory'] grouper = itemgetter('AvailabilityZone') for zone, items in groupby(sorted(history, key=grouper), key=grouper): price = get_mean([float(i['SpotPrice']) for i in items]) prices.append((zone, price)) return sorted(prices, key=lambda t: t[1]) def exit_if_spot_unavailable(run_id, last_status): # will exit with code '5' if a spot request can't be fulfilled if last_status in ['capacity-not-available', 'capacity-oversubscribed', 'constraint-not-fulfillable']: pipe_log('[ERROR] Could not fulfill spot request for run {}, status: {}'.format(run_id, last_status), status=TaskStatus.FAILURE) sys.exit(SPOT_UNAVAILABLE_EXIT_CODE) def find_spot_instance(ec2, aws_region, bid_price, run_id, pool_id, ins_img, ins_type, ins_key, ins_hdd, kms_encyr_key_id, user_data_script, num_rep, time_rep, swap_size, kube_client, instance_additional_spec, availability_zone, security_groups, subnet, network_interface, is_dedicated, performance_network, input_tags): pipe_log('Creating spot request') pipe_log('- Checking spot prices for current region...') spot_prices = get_spot_prices(ec2, aws_region, ins_type) allowed_networks = get_networks_config(ec2, aws_region, ins_type) cheapest_zone = '' if len(spot_prices) == 0: pipe_log('- Unable to get prices for a spot of type {}, cheapest zone can not be determined'.format(ins_type)) else: if availability_zone: pipe_log('- Desired availability zone {} was specified, trying to use it'.format(availability_zone)) for cheapest_zone, lowest_price in spot_prices: if cheapest_zone == availability_zone: cheapest_zone = availability_zone break if not cheapest_zone: cheapest_zone, lowest_price = spot_prices[0] pipe_log('- Prices for {} spots:\n'.format(ins_type) + '\n'.join('{0}: {1:.5f}'.format(zone, price) for zone, price in spot_prices) + '\n' + '{} zone will be used'.format(cheapest_zone)) specifications = { 'ImageId': ins_img, 'InstanceType': ins_type, 'KeyName': ins_key, 'UserData': base64.b64encode(user_data_script.encode('utf-8')).decode('utf-8'), 'BlockDeviceMappings': get_block_devices(ec2, ins_img, ins_type, ins_hdd, kms_encyr_key_id, swap_size), } subnet_id = None if subnet: subnet_id = get_specified_subnet(subnet, availability_zone) elif allowed_networks and cheapest_zone in allowed_networks: subnet_id = allowed_networks[cheapest_zone] pipe_log('- Networks list found, subnet {} in AZ {} will be used'.format(subnet_id, cheapest_zone)) if network_interface: if subnet_id: pipe_log('- Network interface specified. Desired subnet id {} will be ignored'.format(subnet_id)) network_interface, subnet_id, az_name = fetch_network_interface_info(ec2, network_interface, availability_zone, allowed_networks) specifications.update({ "NetworkInterfaces": [ { "DeviceIndex": 0, "NetworkInterfaceId": network_interface } ], }) elif performance_network: pipe_log('- Performance network requested.') if not subnet or not subnet_id: pipe_log('- Subnet is not specified, trying to get a random one...') subnet_id = get_random_subnet(ec2) pipe_log('- Subnet: {} will be used.'.format(subnet_id)) if subnet_id: specifications.update({ "NetworkInterfaces": [ { 'DeleteOnTermination': True, 'DeviceIndex': 0, 'SubnetId': subnet_id, 'Groups': get_security_groups(aws_region, security_groups), 'InterfaceType': 'efa' } ] }) else: pipe_log('- Cannot define subnet to be launched in, will skip performance network setup and continue with default options...') pipe_log('- Default subnet in random AZ will be used') specifications.update({'SecurityGroupIds': get_security_groups(aws_region, security_groups)}) elif subnet_id: specifications.update({ 'SubnetId': subnet_id, 'SecurityGroupIds': get_security_groups(aws_region, security_groups) }) if cheapest_zone: specifications['Placement'] = { 'AvailabilityZone': cheapest_zone } else: pipe_log('- Networks list NOT found or cheapest zone can not be determined, default subnet in a random AZ will be used') specifications['SecurityGroupIds'] = get_security_groups(aws_region, security_groups) if instance_additional_spec: specifications.update(instance_additional_spec) if is_dedicated: specifications.update({ "Placement": { 'Tenancy': "dedicated" } }) current_time = datetime.now(pytz.utc) + timedelta(seconds=10) response = None try: response = ec2.request_spot_instances( SpotPrice=str(bid_price), InstanceCount=1, Type='one-time', ValidFrom=current_time, ValidUntil=current_time + timedelta(seconds=num_rep * time_rep), LaunchSpecification=specifications, ) except ClientError as client_error: if 'Max spot instance count exceeded' in client_error.message or \ 'InstanceLimitExceeded' in client_error.message: pipe_log_warn(LIMIT_EXCEEDED_ERROR_MASSAGE) sys.exit(LIMIT_EXCEEDED_EXIT_CODE) else: raise client_error rep = 0 ins_id = '' ins_ip = '' request_id = response['SpotInstanceRequests'][0]['SpotInstanceRequestId'] if not request_id: raise RuntimeError('Spot instance request did not return a SpotInstanceRequestId') pipe_log('- Spot request was sent. SpotInstanceRequestId: {}. Waiting for spot request registration...'.format(request_id)) # Await for spot request registration (sometimes SpotInstanceRequestId is not returned immediately) while rep <= num_rep: try: requests_list = ec2.describe_spot_instance_requests(SpotInstanceRequestIds=[request_id]) if len(requests_list['SpotInstanceRequests']) > 0: break except Exception as e: if e.response['Error']['Code'] != "InvalidSpotInstanceRequestID.NotFound": raise e rep = increment_or_fail(num_rep, rep, 'Exceeded retry count ({}) while waiting for spot request {}'.format(num_rep, request_id)) pipe_log('- Spot request {} is not yet available. Still waiting...'.format(request_id)) sleep(time_rep) # pipe_log('- Spot request {} is registered'.format(request_id)) ec2.create_tags( Resources=[request_id], Tags=run_id_tag(run_id, pool_id), ) pipe_log('- Spot request {} was tagged with RunID {}. Waiting for request fulfillment...'.format(request_id, run_id)) last_status = '' while rep <= num_rep: current_request = ec2.describe_spot_instance_requests(SpotInstanceRequestIds=[request_id])['SpotInstanceRequests'][0] status = current_request['Status']['Code'] last_status = status if status == 'fulfilled': ins_id = current_request['InstanceId'] instance = None try: instance = ec2.describe_instances(InstanceIds=[ins_id]) except Exception as describe_ex: if describe_ex.response['Error']['Code'] == "InvalidInstanceID.NotFound": pipe_log('- Spot request {} is already fulfilled but instance id {} can not be found yet. Still waiting...'.format(request_id, ins_id)) rep = increment_or_fail(num_rep, rep, 'Exceeded retry count ({}) for spot instance. Spot instance request status code: {}.' .format(num_rep, status)) sleep(time_rep) continue else: raise describe_ex instance_reservation = instance['Reservations'][0]['Instances'][0] if instance else None if not instance_reservation or 'PrivateIpAddress' not in instance_reservation or not instance_reservation['PrivateIpAddress']: pipe_log('- Spot request {} is already fulfilled but PrivateIpAddress is not yet assigned. Still waiting...'.format(request_id)) rep = increment_or_fail(num_rep, rep, 'Exceeded retry count ({}) for spot instance. Spot instance request status code: {}.' .format(num_rep, status), ec2_client=ec2, kill_instance_id_on_fail=ins_id, kube_client=kube_client) sleep(time_rep) continue ins_ip = instance_reservation['PrivateIpAddress'] ec2.create_tags( Resources=[ins_id], Tags=get_tags(run_id, aws_region, pool_id, input_tags), ) ebs_tags = resource_tags(aws_region) if input_tags: ebs_tags.extend(input_tags) if ebs_tags: volumes = instance_reservation['BlockDeviceMappings'] for volume in volumes: ec2.create_tags( Resources=[volume['Ebs']['VolumeId']], Tags=ebs_tags) # FIXME: 'modify_instance_metadata_options' shall be added to the pipe-common/autoscaling/awsprovider.py try: pipe_log('- Waiting for instance {} (spot request {}) to become RUNNING before setting IMDSv2'.format(ins_id, request_id)) ins_status = PENDING ins_status_rep = 0 while ins_status_rep <= num_rep and ins_status != RUNNING: ins_status = get_current_status(ec2, ins_id) ins_status_rep += 1 sleep(time_rep) if ins_status == RUNNING: pipe_log('- Tying to set IMDSv2 for instance {} (spot request {})'.format(ins_id, request_id)) ec2.modify_instance_metadata_options( InstanceId=ins_id, HttpTokens='optional', HttpPutResponseHopLimit=2, HttpEndpoint='enabled' ) else: raise RuntimeError('Time out error while waiting for the instance transition to RUNNING state') except Exception as modify_metadata_ex: pipe_log_warn('- [WARN] Cannot set IMDSv2 for instance {} (spot request {}):\n{}'.format(ins_id, request_id, str(modify_metadata_ex))) pipe_log('Instance is successfully created for spot request {}. ID: {}, IP: {}\n-'.format(request_id, ins_id, ins_ip)) break pipe_log('- Spot request {} is not yet fulfilled. Still waiting...'.format(request_id)) # TODO: review all this logic, it is difficult to read and maintain if rep >= num_rep: exit_if_spot_unavailable(run_id, last_status) rep = increment_or_fail(num_rep, rep, 'Exceeded retry count ({}) for spot instance. Spot instance request status code: {}.' .format(num_rep, status)) sleep(time_rep) exit_if_spot_unavailable(run_id, last_status) return ins_id, ins_ip def tag_name_is_present(instance): return 'Tags' in instance and len([tag for tag in instance['Tags'] if tag['Key'] == 'Name']) > 0 def wait_for_fulfilment(status): return status == 'not-scheduled-yet' or status == 'pending-evaluation' \ or status == 'pending-fulfillment' or status == 'fulfilled' def check_spot_request_exists(ec2, num_rep, run_id, time_rep, aws_region, pool_id, input_tags): pipe_log('Checking if spot request for RunID {} already exists...'.format(run_id)) for interation in range(0, 5): spot_req = get_spot_req_by_run_id(ec2, run_id) if spot_req: request_id = spot_req['SpotInstanceRequestId'] status = spot_req['Status']['Code'] pipe_log('- Spot request for RunID {} already exists: SpotInstanceRequestId: {}, Status: {}'.format(run_id, request_id, status)) rep = 0 if status == 'request-canceled-and-instance-running' and instance_is_active(ec2, spot_req['InstanceId']): return tag_and_get_instance(ec2, spot_req, run_id, aws_region, pool_id, input_tags) if wait_for_fulfilment(status): while status != 'fulfilled': pipe_log('- Spot request ({}) is not yet fulfilled. Waiting...'.format(request_id)) sleep(time_rep) spot_req = ec2.describe_spot_instance_requests( SpotInstanceRequestIds=[request_id])['SpotInstanceRequests'][0] status = spot_req['Status']['Code'] pipe_log('Exceeded retry count ({}) for spot instance (SpotInstanceRequestId: {}). Spot instance request status code: {}.' .format(num_rep, request_id, status)) rep = rep + 1 if rep > num_rep: exit_if_spot_unavailable(run_id, status) return '', '' if instance_is_active(ec2, spot_req['InstanceId']): return tag_and_get_instance(ec2, spot_req, run_id, aws_region, pool_id, input_tags) sleep(5) pipe_log('No spot request for RunID {} found\n-'.format(run_id)) return '', '' def get_spot_req_by_run_id(ec2, run_id): response = ec2.describe_spot_instance_requests(Filters=[run_id_filter(run_id)]) for spot_req in response['SpotInstanceRequests']: status = spot_req['Status']['Code'] if wait_for_fulfilment(status) or status == 'request-canceled-and-instance-running': return spot_req return None def tag_and_get_instance(ec2, spot_req, run_id, aws_region, pool_id, input_tags): ins_id = spot_req['InstanceId'] pipe_log('Setting \"Name={}\" tag for instance {}'.format(run_id, ins_id)) instance = ec2.describe_instances(InstanceIds=[ins_id]) ins_ip = instance['Reservations'][0]['Instances'][0]['PrivateIpAddress'] if not tag_name_is_present(instance): # create tag name if not presents ec2.create_tags( Resources=[ins_id], Tags=get_tags(run_id, aws_region, pool_id, input_tags), ) pipe_log('Tag ({}) created for instance ({})\n-'.format(run_id, ins_id)) else: pipe_log('Tag ({}) is already set for instance ({}). Skip tagging\n-'.format(run_id, ins_id)) return ins_id, ins_ip def get_aws_region(region_id): if region_id is not None: return region_id regions, tags = load_cloud_config() for region in regions: if 'default' in region and region['default']: return region['name'] pipe_log('Failed to determine region for EC2 instance') raise RuntimeError('Failed to determine region for EC2 instance') def map_labels_to_dict(additional_labels_list): additional_labels_dict = dict() for label in additional_labels_list: label_parts = label.split("=") if len(label_parts) == 1: additional_labels_dict[label_parts[0]] = None else: additional_labels_dict[label_parts[0]] = label_parts[1] return additional_labels_dict def build_tags_from_input(input_tags): if not input_tags: return [] instance_tags = [] for input_tag in input_tags: tag_parts = input_tag.split("=") if len(tag_parts) == 1: instance_tags.append({'Key': tag_parts[0]}) else: instance_tags.append({ 'Key': tag_parts[0], 'Value': tag_parts[1] }) return instance_tags def main(): parser = argparse.ArgumentParser() parser.add_argument("--ins_key", type=str, required=True) parser.add_argument("--run_id", type=str, required=True) parser.add_argument("--cluster_name", type=str, required=False) parser.add_argument("--cluster_role", type=str, required=False) parser.add_argument("--ins_type", type=str, default='m4.large') parser.add_argument("--ins_hdd", type=int, default=30) parser.add_argument("--ins_img", type=str, default='ami-f68f3899') parser.add_argument("--ins_platform", type=str, default='linux') parser.add_argument("--num_rep", type=int, default=250) # 250 x 3s = 12.5m parser.add_argument("--time_rep", type=int, default=3) parser.add_argument("--is_spot", type=bool, default=False) parser.add_argument("--bid_price", type=float, default=1.0) parser.add_argument("--kube_ip", type=str, required=True) parser.add_argument("--kubeadm_token", type=str, required=True) parser.add_argument("--kubeadm_cert_hash", type=str, required=True) parser.add_argument("--kube_node_token", type=str, required=True) parser.add_argument("--kube_cluster_name", type=str, required=False) parser.add_argument("--kms_encyr_key_id", type=str, required=False) parser.add_argument("--region_id", type=str, default=None) parser.add_argument("--availability_zone", type=str, required=False) parser.add_argument("--network_interface", type=str, required=False) parser.add_argument("--performance_network", type=bool, required=False) parser.add_argument("--subnet_id", type=str, required=False) parser.add_argument("--security_groups", type=str, required=False) parser.add_argument("--dedicated", type=bool, required=False) parser.add_argument("--node_ssh_port", type=str, default='') parser.add_argument("--docker_data_root", type=str, default='/ebs/docker') parser.add_argument("--docker_storage_driver", type=str, default='') parser.add_argument("--skip_system_images_load", type=str, default='') parser.add_argument("--label", type=str, default=[], required=False, action='append') parser.add_argument("--image", type=str, default=[], required=False, action='append') parser.add_argument("--tags", type=str, default=[], required=False, action='append') args, unknown = parser.parse_known_args() ins_key = args.ins_key run_id = args.run_id ins_type = args.ins_type ins_hdd = args.ins_hdd ins_img = args.ins_img ins_platform = args.ins_platform # Java may pass 'null' (literally) instead of the empty parameter if ins_platform == 'null': ins_platform = 'linux' num_rep = args.num_rep time_rep = args.time_rep is_spot = args.is_spot bid_price = args.bid_price cluster_name = args.cluster_name cluster_role = args.cluster_role kube_ip = args.kube_ip kubeadm_token = args.kubeadm_token kubeadm_cert_hash = args.kubeadm_cert_hash kube_node_token = args.kube_node_token kube_cluster_name = args.kube_cluster_name kms_encyr_key_id = args.kms_encyr_key_id region_id = args.region_id availability_zone = args.availability_zone network_interface = args.network_interface performance_network = args.performance_network security_groups = args.security_groups subnet = args.subnet_id is_dedicated = args.dedicated if args.dedicated else False node_ssh_port = args.node_ssh_port docker_data_root = args.docker_data_root docker_storage_driver = args.docker_storage_driver skip_system_images_load = args.skip_system_images_load pre_pull_images = args.image additional_labels = map_labels_to_dict(args.label) pool_id = additional_labels.get(POOL_ID_KEY) input_tags = build_tags_from_input(args.tags) global_distribution_url = os.getenv('GLOBAL_DISTRIBUTION_URL', default='https://cloud-pipeline-oss-builds.s3.us-east-1.amazonaws.com/') if not kube_ip or not kubeadm_token: raise RuntimeError('Kubernetes configuration is required to create a new node') pipe_log_init(run_id) wait_time_sec = get_preference(NODE_WAIT_TIME_SEC) if wait_time_sec and wait_time_sec.isdigit(): num_rep = int(wait_time_sec) / time_rep aws_region = get_aws_region(region_id) boto3.setup_default_session(region_name=aws_region) pipe_log('Started initialization of new calculation node in AWS region {}:\n' '- RunID: {}\n' '- Type: {}\n' '- Disk: {}\n' '- Image: {}\n' '- Platform: {}\n' '- IsSpot: {}\n' '- BidPrice: {}\n' '- Repeat attempts: {}\n' '- Repeat timeout: {}\n-' '- Docker data root: {}\n-' '- Docker storage driver: {}\n-'.format(aws_region, run_id, ins_type, ins_hdd, ins_img, ins_platform, str(is_spot), str(bid_price), str(num_rep), str(time_rep), docker_data_root, docker_storage_driver)) try: # Hacking max max_attempts to get rid of # "An error occurred (RequestLimitExceeded) when calling the ... operation (reached max retries: 4)" # Official solution shall be provided with https://github.com/boto/botocore/pull/1260, waiting for release # This is applied to the old versions of botocore boto3_version = LooseVersion(boto3.__version__) boto3_version_retries = LooseVersion("1.7") pipe_log('Using boto3 version {}'.format(boto3.__version__)) ec2 = None if boto3_version < boto3_version_retries: try: ec2 = boto3.client('ec2') if hasattr(ec2.meta.events, "_unique_id_handlers"): ec2.meta.events._unique_id_handlers['retry-config-ec2']['handler']._checker.__dict__['_max_attempts'] = BOTO3_RETRY_COUNT except Exception as inner_exception: pipe_log('Unable to modify retry config:\n{}'.format(str(inner_exception))) else: ec2 = boto3.client('ec2', config=Config(retries={'max_attempts': BOTO3_RETRY_COUNT})) # Setup kubernetes client try: api = pykube.HTTPClient(pykube.KubeConfig.from_service_account()) except Exception: api = pykube.HTTPClient(pykube.KubeConfig.from_file(KUBE_CONFIG_PATH)) api.session.verify = False api_url = os.environ["API"] api_token = os.environ["API_TOKEN"] api_user = os.environ["API_USER"] instance_additional_spec = None allowed_instance = get_allowed_instance_image(aws_region, ins_type, ins_platform, ins_img, api_token, run_id) if allowed_instance and allowed_instance["instance_mask"]: pipe_log('Found matching rule {instance_mask} for requested instance type {instance_type}'.format(instance_mask=allowed_instance["instance_mask"], instance_type=ins_type)) instance_additional_spec = allowed_instance["additional_spec"] if instance_additional_spec: pipe_log('Additional custom instance configuration will be added: {}'.format(instance_additional_spec)) if not ins_img or ins_img == 'null': if allowed_instance and allowed_instance["instance_mask_ami"]: ins_img = allowed_instance["instance_mask_ami"] pipe_log('Instance image was not provided explicitly, {instance_image} will be used (retrieved for {instance_mask}/{instance_type} rule)'.format(instance_image=allowed_instance["instance_mask_ami"], instance_mask=allowed_instance["instance_mask"], instance_type=ins_type)) else: pipe_log('Specified in configuration image {ami} will be used'.format(ami=ins_img)) ins_id, ins_ip = verify_run_id(ec2, run_id) if not ins_id: ins_id, ins_ip = check_spot_request_exists(ec2, num_rep, run_id, time_rep, aws_region, pool_id, input_tags) if not ins_id: ins_id, ins_ip = run_instance(api_url, api_token, api_user, bid_price, ec2, aws_region, ins_hdd, kms_encyr_key_id, ins_img, ins_platform, ins_key, ins_type, is_spot, num_rep, run_id, pool_id, time_rep, kube_ip, kubeadm_token, kubeadm_cert_hash, kube_node_token, kube_cluster_name, api, global_distribution_url, pre_pull_images, instance_additional_spec, availability_zone, security_groups, subnet, network_interface, is_dedicated, node_ssh_port, performance_network, input_tags, docker_data_root, docker_storage_driver, skip_system_images_load) check_instance(ec2, ins_id, run_id, num_rep, time_rep, api) nodename = verify_regnode(ec2, ins_id, num_rep, time_rep, run_id, api) label_node(nodename, run_id, api, cluster_name, cluster_role, aws_region, additional_labels) pipe_log('Node created:\n' '- {}\n' '- {}'.format(ins_id, ins_ip)) # External process relies on this output print(ins_id + "\t" + ins_ip + "\t" + nodename) pipe_log('{} task finished'.format(NODEUP_TASK), status=TaskStatus.SUCCESS) except Exception as e: pipe_log('[ERROR] ' + str(e), status=TaskStatus.FAILURE) raise e if __name__ == '__main__': main()