scripts/autoscaling/azure/nodeup.py (966 lines of code) (raw):

# Copyright 2017-2022 EPAM Systems, Inc. (https://www.epam.com/) # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import argparse import functools import os import math import json import sys import re import pykube import logging import fnmatch import uuid import base64 from time import sleep from random import randint from azure.common.client_factory import get_client_from_auth_file, get_client_from_cli_profile from azure.mgmt.resource import ResourceManagementClient from azure.mgmt.compute import ComputeManagementClient from azure.mgmt.network import NetworkManagementClient from msrestazure.azure_exceptions import CloudError from pipeline import Logger, TaskStatus, PipelineAPI, pack_script_contents, pack_powershell_script_contents import jwt VM_NAME_PREFIX = "az-" UUID_LENGHT = 16 DISABLE_ACCESS = 'disable_external_access' NETWORKS_PARAM = "cluster.networks.config" NODE_WAIT_TIME_SEC = "cluster.nodeup.wait.sec" NODEUP_TASK = "InitializeNode" LIMIT_EXCEEDED_EXIT_CODE = 6 LIMIT_EXCEEDED_ERROR_MASSAGE = 'Instance limit exceeded. A new one will be launched as soon as free space will be available.' LOW_PRIORITY_INSTANCE_ID_TEMPLATE = '(az-[a-z0-9]{16})[0-9A-Z]{6}' POOL_ID_KEY = 'pool_id' DEFAULT_FS_TYPE = 'btrfs' SUPPORTED_FS_TYPES = [DEFAULT_FS_TYPE, 'ext4'] MIN_SWAP_DEVICE_SIZE = 5 current_run_id = 0 api_url = None api_token = None script_path = None def is_run_id_numerical(run_id): try: int(run_id) return True except ValueError: return False def is_api_logging_enabled(): global api_token global api_url global current_run_id return is_run_id_numerical(current_run_id) and api_url and api_token def pipe_log_init(run_id): global api_token global api_url global current_run_id current_run_id = run_id api_url = os.environ["API"] api_token = os.environ["API_TOKEN"] if not is_api_logging_enabled(): logging.basicConfig(filename='nodeup.log', level=logging.INFO, format='%(asctime)s %(message)s') def pipe_log_warn(message): global api_token global api_url global script_path global current_run_id if is_api_logging_enabled(): Logger.warn('[{}] {}'.format(current_run_id, message), task_name=NODEUP_TASK, run_id=current_run_id, api_url=api_url, log_dir=script_path, omit_console=True) else: logging.warn(message) def pipe_log(message, status=TaskStatus.RUNNING): global api_token global api_url global script_path global current_run_id if is_api_logging_enabled(): Logger.log_task_event(NODEUP_TASK, '[{}] {}'.format(current_run_id, message), run_id=current_run_id, instance=str(current_run_id), log_dir=script_path, api_url=api_url, status=status, omit_console=True) else: # Log as always logging.info(message) ############################# ############################# __CLOUD_METADATA__ = None __CLOUD_TAGS__ = None def get_preference(preference_name): pipe_api = PipelineAPI(api_url, None) try: preference = pipe_api.get_preference(preference_name) if 'value' in preference: return preference['value'] else: return None except: pipe_log('An error occured while getting preference {}, empty value is going to be used'.format(preference_name)) return None def load_cloud_config(): global __CLOUD_METADATA__ global __CLOUD_TAGS__ if not __CLOUD_METADATA__: pipe_api = PipelineAPI(api_url, None) preference = pipe_api.get_preference(NETWORKS_PARAM) if preference: data = json.loads(preference['value']) if 'regions' not in data: pipe_log('Malformed networks config file: missing "regions" section. Update config file.') raise RuntimeError('Malformed networks config file: missing "regions" section. Update config file.') __CLOUD_METADATA__ = data['regions'] if 'tags' in data: __CLOUD_TAGS__ = data['tags'] return __CLOUD_METADATA__, __CLOUD_TAGS__ def get_region_settings(cloud_region): full_settings, tags = load_cloud_config() for region_settings in full_settings: if 'name' in region_settings and region_settings['name'] == cloud_region: return region_settings pipe_log('Failed to find networks settings for region: %s.' % cloud_region) return None def get_cloud_config_section(cloud_region, section_name): cloud_metadata = get_region_settings(cloud_region) if cloud_metadata and section_name in cloud_metadata and len(cloud_metadata[section_name]) > 0: return cloud_metadata[section_name] else: return None def get_networks_config(cloud_region): return get_cloud_config_section(cloud_region, "networks") def get_instance_images_config(cloud_region): return get_cloud_config_section(cloud_region, "amis") def get_allowed_zones(cloud_region): return list(get_networks_config(cloud_region).keys()) def get_security_groups(cloud_region): config = get_cloud_config_section(cloud_region, "security_group_ids") if not config: raise RuntimeError('Security group setting is required to run an instance') return config def get_well_known_hosts(cloud_region): return get_cloud_config_section(cloud_region, "well_known_hosts") def get_allowed_instance_image(cloud_region, instance_type, instance_platform, default_image): default_init_script = os.path.dirname(os.path.abspath(__file__)) + '/init.sh' default_embedded_scripts = { "fsautoscale": os.path.dirname(os.path.abspath(__file__)) + '/fsautoscale.sh' } default_object = { "instance_mask_ami": default_image, "instance_mask": None, "init_script": default_init_script, "embedded_scripts": default_embedded_scripts, "fs_type": DEFAULT_FS_TYPE} instance_images_config = get_instance_images_config(cloud_region) if not instance_images_config: return default_object for image_config in instance_images_config: image_platform = image_config["platform"] instance_mask = image_config["instance_mask"] instance_mask_ami = image_config["ami"] init_script = image_config.get("init_script", default_object["init_script"]) embedded_scripts = image_config.get("embedded_scripts", default_object["embedded_scripts"]) fs_type = image_config.get("fs_type", DEFAULT_FS_TYPE) if image_platform == instance_platform and fnmatch.fnmatch(instance_type, instance_mask): return { "instance_mask_ami": instance_mask_ami, "instance_mask": instance_mask, "init_script": init_script, "embedded_scripts": embedded_scripts, "fs_type": fs_type} return default_object ############################# SUBNET_ID_PARTS_NUMBER = 11 SUBNET_NAME_INDEX = 10 SUBNET_TYPE_INDEX = 9 RESOURCE_ID_PARTS_NUMBER = 9 RESOURCE_NAME_INDEX = 8 RESOURCE_TYPE_INDEX = 7 RESOURCE_GROUP_NAME_INDEX = 4 RESOURCE_GROUP_KEY_INDEX = 3 zone = None auth_file = os.environ.get('AZURE_AUTH_LOCATION', None) if auth_file: resource_client = get_client_from_auth_file(ResourceManagementClient, auth_path=auth_file) network_client = get_client_from_auth_file(NetworkManagementClient, auth_path=auth_file) compute_client = get_client_from_auth_file(ComputeManagementClient, auth_path=auth_file) else: resource_client = get_client_from_cli_profile(ResourceManagementClient) network_client = get_client_from_cli_profile(NetworkManagementClient) compute_client = get_client_from_cli_profile(ComputeManagementClient) resource_group_name = os.environ["AZURE_RESOURCE_GROUP"] def run_instance(api_url, api_token, api_user, instance_name, instance_type, cloud_region, run_id, pool_id, ins_hdd, ins_img, ins_platform, ssh_pub_key, user, ins_type, is_spot, kube_ip, kubeadm_token, kubeadm_cert_hash, kube_node_token, global_distribution_url, pre_pull_images): ins_key = read_ssh_key(ssh_pub_key) swap_size = get_swap_size(cloud_region, ins_type, is_spot) user_data_script = get_user_data_script(api_url, api_token, api_user, cloud_region, ins_type, ins_img, ins_platform, kube_ip, kubeadm_token, kubeadm_cert_hash, kube_node_token, global_distribution_url, swap_size, pre_pull_images) access_config = get_access_config(cloud_region) disable_external_access = False if access_config is not None: disable_external_access = DISABLE_ACCESS in access_config and access_config[DISABLE_ACCESS] if not is_spot: create_nic(instance_name, run_id, disable_external_access) return create_vm(instance_name, run_id, pool_id, instance_type, ins_img, ins_hdd, user_data_script, ins_key, user, swap_size) else: return create_low_priority_vm(instance_name, run_id, pool_id, instance_type, ins_img, ins_hdd, user_data_script, ins_key, user, swap_size, disable_external_access) def read_ssh_key(ssh_pub_key): with open(ssh_pub_key) as f: content = f.readlines() if len(content) != 1 and not content[0].startswith("ssh-rsa"): raise RuntimeError("Wrong format of ssh pub key!") ins_key = content[0] return ins_key def create_public_ip_address(instance_name, run_id): public_ip_addess_params = { 'location': zone, 'public_ip_allocation_method': 'Dynamic', 'dns_settings': { 'domain_name_label': instance_name }, 'tags': get_tags(run_id) } creation_result = network_client.public_ip_addresses.create_or_update( resource_group_name, instance_name + '-ip', public_ip_addess_params ) return creation_result.result() def create_nic(instance_name, run_id, disable_external_access): subnet_info = get_subnet_info() security_group_info = get_security_group_info() nic_params = { 'location': zone, 'ipConfigurations': [{ 'name': 'IPConfig', 'subnet': { 'id': subnet_info.id } }], "networkSecurityGroup": { 'id': security_group_info.id }, 'tags': get_tags(run_id) } if not disable_external_access: create_public_ip_address(instance_name, run_id) public_ip_address = network_client.public_ip_addresses.get( resource_group_name, instance_name + '-ip' ) nic_params["ipConfigurations"][0]["publicIpAddress"] = public_ip_address creation_result = network_client.network_interfaces.create_or_update( resource_group_name, instance_name + '-nic', nic_params ) return creation_result.result() def get_access_config(cloud_region): return get_cloud_config_section(cloud_region, "access_config") def get_security_group_info(): security_groups = get_security_groups(zone) if len(security_groups) != 1: raise AssertionError("Please specify only one security group!") resource_group, secur_grp = get_res_grp_and_res_name_from_string(security_groups[0], 'networkSecurityGroups') security_group_info = network_client.network_security_groups.get(resource_group, secur_grp) return security_group_info def get_subnet_info(): allowed_networks = get_networks_config(zone) if allowed_networks and len(allowed_networks) > 0: az_num = randint(0, len(allowed_networks) - 1) az_name = allowed_networks.items()[az_num][0] subnet_id = allowed_networks.items()[az_num][1] resource_group, network = get_res_grp_and_res_name_from_string(az_name, 'virtualNetworks') subnet = get_subnet_name_from_id(subnet_id) pipe_log('- Networks list found, subnet {} in VNET {} will be used'.format(subnet_id, az_name)) else: pipe_log('- Networks list NOT found, trying to find network from region in the same resource group...') resource_group, network, subnet = get_any_network_from_location(zone) pipe_log('- Network found, subnet {} in VNET {} will be used'.format(subnet, network)) if not resource_group or not network or not subnet: raise RuntimeError( "No networks with subnet found for location: {} in resourceGroup: {}".format(zone, resource_group_name)) subnet_info = network_client.subnets.get(resource_group, network, subnet) return subnet_info def get_any_network_from_location(location): resource_group, network, subnet = None, None, None for vnet in resource_client.resources.list(filter="resourceType eq 'Microsoft.Network/virtualNetworks' " "and location eq '{}' " "and resourceGroup eq '{}'".format(location, resource_group_name)): resource_group, network = get_res_grp_and_res_name_from_string(vnet.id, 'virtualNetworks') break if not resource_group or not network: return resource_group, network, subnet for subnet_res in network_client.subnets.list(resource_group, network): subnet = get_subnet_name_from_id(subnet_res.id) break return resource_group, network, subnet def get_disk_type(instance_type): disk_type = None for sku in compute_client.resource_skus.list(): if sku.locations[0].lower() == zone.lower() and sku.resource_type.lower() == "virtualmachines" \ and sku.name.lower() == instance_type.lower(): for capability in sku.capabilities: if capability.name.lower() == "premiumio": disk_type = "Premium_LRS" if capability.value.lower() == "true" else "StandardSSD_LRS" break return disk_type def get_os_profile(instance_name, ssh_pub_key, user, user_data_script, computer_name_parameter): profile = { computer_name_parameter: instance_name, 'admin_username': user, "linuxConfiguration": { "ssh": { "publicKeys": [ { "path": "/home/" + user + "/.ssh/authorized_keys", "key_data": "{key}".format(key=ssh_pub_key) } ] }, "disablePasswordAuthentication": True, }, "custom_data": base64.b64encode(user_data_script) } return profile def get_data_disk(size, disk_type, lun, disk_name=None): disk = { "diskSizeGB": size, "lun": lun, "createOption": "Empty", "managedDisk": { "storageAccountType": disk_type } } if disk_name is not None: disk["name"] = disk_name return disk def get_storage_profile(disk, image, instance_type, instance_name=None, swap_size=None): disk_type = get_disk_type(instance_type) disk_name = None if instance_name is None else instance_name + "-data" disk_lun = 62 data_disks = [get_data_disk(disk, disk_type, disk_lun, disk_name=disk_name)] if swap_size is not None and swap_size > 0: swap_name = None if instance_name is None else instance_name + "-swap" data_disks.append(get_data_disk(swap_size, disk_type, disk_lun + 1, disk_name=swap_name)) return { 'image_reference': { 'id': image.id }, "osDisk": { "caching": "ReadWrite", "managedDisk": { "storageAccountType": disk_type }, "createOption": "FromImage" }, "dataDisks": data_disks } def create_vm(instance_name, run_id, pool_id, instance_type, instance_image, disk, user_data_script, ssh_pub_key, user, swap_size): nic = network_client.network_interfaces.get( resource_group_name, instance_name + '-nic' ) resource_group, image_name = get_res_grp_and_res_name_from_string(instance_image, 'images') image = compute_client.images.get(resource_group, image_name) storage_profile = get_storage_profile(disk, image, instance_type, instance_name=instance_name, swap_size=swap_size) vm_parameters = { 'location': zone, 'os_profile': get_os_profile(instance_name, ssh_pub_key, user, user_data_script, 'computer_name'), 'hardware_profile': { 'vm_size': instance_type }, 'storage_profile': storage_profile, 'network_profile': { 'network_interfaces': [{ 'id': nic.id }] }, 'tags': get_tags(run_id, pool_id) } create_node_resource(compute_client.virtual_machines, instance_name, vm_parameters) private_ip = network_client.network_interfaces.get( resource_group_name, instance_name + '-nic').ip_configurations[0].private_ip_address return instance_name, private_ip def create_low_priority_vm(scale_set_name, run_id, pool_id, instance_type, instance_image, disk, user_data_script, ssh_pub_key, user, swap_size, disable_external_access): pipe_log('Create VMScaleSet with low priority instance for run: {}'.format(run_id)) resource_group, image_name = get_res_grp_and_res_name_from_string(instance_image, 'images') image = compute_client.images.get(resource_group, image_name) subnet_info = get_subnet_info() security_group_info = get_security_group_info() service = compute_client.virtual_machine_scale_sets vmss_parameters = { "location": zone, "sku": { "name": instance_type, "capacity": "1" }, "upgradePolicy": { "mode": "Manual", "automaticOSUpgrade": False }, "properties": { "overprovision": False, "virtualMachineProfile": { 'priority': 'Low', 'evictionPolicy': 'delete', 'os_profile': get_os_profile(scale_set_name, ssh_pub_key, user, user_data_script, 'computer_name_prefix'), 'storage_profile': get_storage_profile(disk, image, instance_type, swap_size=swap_size), "network_profile": { "networkInterfaceConfigurations": [ { "name": scale_set_name + "-nic", "properties": { "primary": True, "networkSecurityGroup": { "id": security_group_info.id }, 'dns_settings': { 'domain_name_label': scale_set_name }, "ipConfigurations": [ { "name": scale_set_name + "-ip", "properties": { "subnet": { "id": subnet_info.id } } } ] } } ] } } }, 'tags': get_tags(run_id, pool_id) } if not disable_external_access: vmss_parameters['properties']['virtualMachineProfile'] \ ['network_profile']['networkInterfaceConfigurations'][0] \ ['properties']['ipConfigurations'][0] \ ['publicIPAddressConfiguration'] = {"name": scale_set_name + "-publicip"} create_node_resource(service, scale_set_name, vmss_parameters) return get_instance_name_and_private_ip_from_vmss(scale_set_name) def create_node_resource(service, instance_name, node_parameters): try: creation_result = service.create_or_update( resource_group_name, instance_name, node_parameters ) creation_result.result() except CloudError as client_error: delete_all_by_run_id(node_parameters['tags']['Name']) error_message = client_error.__str__() if 'OperationNotAllowed' in error_message or 'ResourceQuotaExceeded' in error_message: pipe_log_warn(LIMIT_EXCEEDED_ERROR_MASSAGE) sys.exit(LIMIT_EXCEEDED_EXIT_CODE) else: raise client_error def get_instance_name_and_private_ip_from_vmss(scale_set_name): vm_vmss_id = None for vm in compute_client.virtual_machine_scale_set_vms.list(resource_group_name, scale_set_name): vm_vmss_id = vm.instance_id break if vm_vmss_id is None: pipe_log('Failed to find instance in ScaleSet: {}. Seems that instance was preempted.'.format(scale_set_name)) raise RuntimeError('Failed to find instance in ScaleSet: {}. Seems that instance was preempted.'.format(scale_set_name)) instance_name = compute_client.virtual_machine_scale_set_vms \ .get_instance_view(resource_group_name, scale_set_name, vm_vmss_id) \ .additional_properties["computerName"] private_ip = network_client.network_interfaces. \ get_virtual_machine_scale_set_ip_configuration(resource_group_name, scale_set_name, vm_vmss_id, scale_set_name + "-nic", scale_set_name + "-ip") \ .private_ip_address return instance_name, private_ip def get_cloud_region(region_id): if region_id is not None: return region_id regions, tags = load_cloud_config() for region in regions: if 'default' in region and region['default']: return region['name'] pipe_log('Failed to determine region for Azure instance') raise RuntimeError('Failed to determine region for Azure instance') def increment_or_fail(num_rep, rep, error_message, kill_instance_id_on_fail=None): rep = rep + 1 if rep > num_rep: if kill_instance_id_on_fail: pipe_log('[ERROR] Operation timed out and an instance {} will be terminated'.format(kill_instance_id_on_fail)) terminate_instance(kill_instance_id_on_fail) raise RuntimeError(error_message) return rep def resource_tags(): tags = {} config_regions, config_tags = load_cloud_config() if config_tags is None: return tags for key, value in config_tags.iteritems(): tags.update({key: value}) return tags def run_id_tag(run_id): return { 'Name': run_id, } def get_tags(run_id, pool_id=None): tags = run_id_tag(run_id) res_tags = resource_tags() if res_tags: tags.update(res_tags) if pool_id: tags[POOL_ID_KEY] = pool_id return tags def verify_run_id(run_id): pipe_log('Checking if instance already exists for RunID {}'.format(run_id)) vm_name = None private_ip = None for resource in resource_client.resources.list(filter="tagName eq 'Name' and tagValue eq '" + run_id + "'"): if str(resource.type).split('/')[-1].lower() == "virtualmachines": vm_name = resource.name private_ip = network_client.network_interfaces\ .get(resource_group_name, vm_name + '-nic').ip_configurations[0].private_ip_address break if str(resource.type).split('/')[-1].lower() == "virtualmachinescaleset": scale_set_name = resource.name vm_name, private_ip = get_instance_name_and_private_ip_from_vmss(scale_set_name) break return vm_name, private_ip def generate_scale_set_vm_names(scale_set_name): nodes_to_delete = [scale_set_name + '%0*x' % (6, x) for x in range(0, 15)] return nodes_to_delete def delete_scale_set_nodes_from_kube(kube_api, scale_set_name): nodes_to_delete = generate_scale_set_vm_names(scale_set_name) for node_to_delete in nodes_to_delete: delete_node_from_kube(kube_api, node_to_delete) def delete_node_from_kube(kube_api, ins_id): nodes = pykube.Node.objects(kube_api).filter(field_selector={'metadata.name': ins_id}) for node in nodes.response['items']: if any((condition['status'] == 'False' or condition['status'] == 'Unknown') and condition['type'] == "Ready" for condition in node["status"]["conditions"]): obj = { "apiVersion": "v1", "kind": "Node", "metadata": { "name": node["metadata"]["name"] } } pykube.Node(kube_api, obj).delete() def find_node(nodename, nodename_full, api): ret_namenode = get_nodename(api, nodename) if not ret_namenode: return get_nodename(api, nodename_full) else: return ret_namenode def get_nodename(api, nodename): node = pykube.Node.objects(api).filter(field_selector={'metadata.name': nodename}) if len(node.response['items']) > 0: return nodename else: return '' def verify_regnode(ins_id, num_rep, time_rep, api): ret_namenode = '' rep = 0 while rep <= num_rep: ret_namenode = find_node(ins_id, ins_id, api) if ret_namenode: break rep = increment_or_fail(num_rep, rep, 'Exceeded retry count ({}) for instance (ID: {}, NodeName: {}) cluster registration'.format(num_rep, ins_id, ins_id), ins_id) sleep(time_rep) if ret_namenode: # useless? pipe_log('- Node registered in cluster as {}'.format(ret_namenode)) rep = 0 while rep <= num_rep: node = pykube.Node.objects(api).filter(field_selector={'metadata.name': ret_namenode}) status = node.response['items'][0]['status']['conditions'][3]['status'] if status == u'True': pipe_log('- Node ({}) status is READY'.format(ret_namenode)) break rep = increment_or_fail(num_rep, rep, 'Exceeded retry count ({}) for instance (ID: {}, NodeName: {}) kube node READY check'.format(num_rep, ins_id, ret_namenode), ins_id) sleep(time_rep) rep = 0 pipe_log('- Waiting for system agents initialization...') while rep <= num_rep: pods = pykube.objects.Pod.objects(api).filter(namespace="kube-system", field_selector={"spec.nodeName": ret_namenode}) count_pods = len(pods.response['items']) ready_pods = len([p for p in pods if p.ready]) if count_pods == ready_pods: break pipe_log('- {} of {} agents initialized. Still waiting...'.format(ready_pods, count_pods)) rep = increment_or_fail(num_rep, rep, 'Exceeded retry count ({}) for instance (ID: {}, NodeName: {}) kube system pods check'.format(num_rep, ins_id, ret_namenode), ins_id) sleep(time_rep) pipe_log('Instance {} successfully registred in cluster with name {}\n-'.format(ins_id, ins_id)) return ret_namenode def label_node(nodename, run_id, api, cluster_name, cluster_role, cloud_region, additional_labels): pipe_log('Assigning instance {} to RunID: {}'.format(nodename, run_id)) obj = { "apiVersion": "v1", "kind": "Node", "metadata": { "name": nodename, "labels": { "runid": run_id, "cloud_region": cloud_region } } } if additional_labels: obj["metadata"]["labels"].update(additional_labels) if cluster_name: obj["metadata"]["labels"]["cp-cluster-name"] = cluster_name if cluster_role: obj["metadata"]["labels"]["cp-cluster-role"] = cluster_role pykube.Node(api, obj).update() pipe_log('Instance {} is assigned to RunID: {}\n-'.format(nodename, run_id)) def get_certs_string(): global api_token global api_url command_pattern = 'mkdir -p /etc/docker/certs.d/{url} && echo "{cert}" >> /etc/docker/certs.d/{url}/ca.crt' if api_url and api_token: pipe_api = PipelineAPI(api_url, None) result = pipe_api.load_certificates() if not result: return "" else: entries = [] for url, cert in result.iteritems(): entries.append(command_pattern.format(url=url, cert=cert)) return " && ".join(entries) return "" def get_well_known_hosts_string(cloud_region): pipe_log('Setting well-known hosts an instance in {} region'.format(cloud_region)) command_pattern = 'echo {well_known_ip} {well_known_host} >> /etc/hosts' well_known_list = get_well_known_hosts(cloud_region) if not well_known_list or len(well_known_list) == 0: return '' entries = [] for well_known_item in well_known_list: if not 'ip' in well_known_item or not 'host' in well_known_item: continue well_known_ip = well_known_item['ip'] well_known_host = well_known_item['host'] if not well_known_ip or not well_known_host: continue entries.append(command_pattern.format(well_known_ip=well_known_ip, well_known_host=well_known_host)) pipe_log('-> {}={}'.format(well_known_ip, well_known_host)) if len(entries) == 0: return '' return ' && '.join(entries) def resolve_azure_api(resource): """ This method retrieves the latest non-preview api version for the given resource (unless the preview version is the only available api version) """ provider = resource_client.providers.get(resource.id.split('/')[6]) rt = next((t for t in provider.resource_types if t.resource_type.lower() == '/'.join(resource.type.split('/')[1:]).lower()), None) if rt and 'api_versions' in rt.__dict__: api_version = [v for v in rt.__dict__['api_versions'] if 'preview' not in v.lower()] return api_version[0] if api_version else rt.__dict__['api_versions'][0] def azure_resource_type_cmp(r1, r2): if str(r1.type).split('/')[-1].lower().startswith("virtualmachine"): return -1 elif str(r1.type).split('/')[-1].lower() == "networkinterfaces" and not str(r2.type).split('/')[-1].lower().startswith("virtualmachine"): return -1 return 0 def terminate_instance(ins_id): instance = compute_client.virtual_machines.get(resource_group_name, ins_id) if 'Name' in instance.tags: delete_all_by_run_id(instance.tags['Name']) else: compute_client.virtual_machines.delete(resource_group_name, ins_id).wait() def delete_all_by_run_id(run_id): resources = [] resources.extend(resource_client.resources.list(filter="tagName eq 'Name' and tagValue eq '" + run_id + "'")) if len(resources) > 0: # we need to sort resources to be sure that vm and nic will be deleted first, # because it has attached resorces(disks and ip) resources.sort(key=functools.cmp_to_key(azure_resource_type_cmp)) vm_name = resources[0].name if str(resources[0].type).split('/')[-1].lower().startswith('virtualmachine') else resources[0].name[0:len(VM_NAME_PREFIX) + UUID_LENGHT] if str(resources[0].type).split('/')[-1].lower() == 'virtualmachines': detach_disks_and_nic(vm_name) for resource in resources: resource_client.resources.delete( resource_group_name=resource.id.split('/')[4], resource_provider_namespace=resource.id.split('/')[6], parent_resource_path='', resource_type=str(resource.type).split('/')[-1], resource_name=resource.name, api_version=resolve_azure_api(resource), parameters=resource ).wait() def detach_disks_and_nic(vm_name): compute_client.virtual_machines.delete(resource_group_name, vm_name).wait() try: nic = network_client.network_interfaces.get(resource_group_name, vm_name + '-nic') nic.ip_configurations[0].public_ip_address = None network_client.network_interfaces.create_or_update(resource_group_name, vm_name + '-nic', nic).wait() except Exception as e: print(e) def replace_common_params(cloud_region, init_script, config_section): pipe_log('Configuring {} settings for an instance in {} region'.format(config_section, cloud_region)) common_list = get_cloud_config_section(cloud_region, config_section) if not common_list: return init_script for common_item in common_list: if not 'name' in common_item or not 'path' in common_item: continue item_name = common_item['name'] item_path = common_item['path'] if not item_name: continue if item_path == None: item_path = '' init_script = init_script.replace('@' + item_name + '@', item_path) pipe_log('-> {}={}'.format(item_name, item_path)) return init_script def replace_proxies(cloud_region, init_script): return replace_common_params(cloud_region, init_script, "proxies") def replace_swap(swap_size, init_script): if swap_size is not None: return init_script.replace('@swap_size@', str(swap_size)) return init_script def get_swap_size(cloud_region, ins_type, is_spot): pipe_log('Configuring swap settings for an instance in {} region'.format(cloud_region)) swap_params = get_cloud_config_section(cloud_region, "swap") if swap_params is None: return None swap_ratio = get_swap_ratio(swap_params) if swap_ratio is None: pipe_log("Swap ratio is not configured. Swap configuration will be skipped.") return None ram = get_instance_ram(cloud_region, ins_type, is_spot) if ram is None: pipe_log("Failed to determine instance RAM. Swap configuration will be skipped.") return None swap_size = int(math.ceil(swap_ratio * ram)) if swap_size >= MIN_SWAP_DEVICE_SIZE: pipe_log("Swap device will be configured with size %d." % swap_size) return swap_size return None def get_instance_ram(cloud_region, ins_type, is_spot): api = PipelineAPI(api_url, None) region_id = get_region_id(cloud_region, api) if region_id is None: return None instance_types = api.get_allowed_instance_types(region_id, spot=is_spot) ram = get_ram_from_group(instance_types, 'cluster.allowed.instance.types', ins_type) if ram is None: ram = get_ram_from_group(instance_types, 'cluster.allowed.instance.types.docker', ins_type) return ram def get_ram_from_group(instance_types, group, instance_type): if group in instance_types: for current_type in instance_types[group]: if current_type['name'] == instance_type: return current_type['memory'] return None def get_region_id(cloud_region, api): regions = api.get_regions() if regions is None: return None for region in regions: if region.provider == 'AZURE' and region.region_id == cloud_region: return region.id return None def get_swap_ratio(swap_params): for swap_param in swap_params: if not 'name' in swap_param or not 'path' in swap_param: continue item_name = swap_param['name'] if item_name == 'swap_ratio': item_value = swap_param['path'] if item_value: try: return float(item_value) except ValueError: pipe_log("Unexpected swap_ratio value: {}".format(item_value)) return None def replace_docker_images(pre_pull_images, user_data_script): global api_token payload = jwt.decode(api_token, verify=False) if 'sub' in payload: subject = payload['sub'] user_data_script = user_data_script \ .replace("@PRE_PULL_DOCKERS@", ",".join(pre_pull_images)) \ .replace("@API_USER@", subject) return user_data_script else: raise RuntimeError("Pre-pulled docker initialization failed: unable to parse JWT token for docker auth.") def get_user_data_script(api_url, api_token, api_user, cloud_region, ins_type, ins_img, ins_platform, kube_ip, kubeadm_token, kubeadm_cert_hash, kube_node_token, global_distribution_url, swap_size, pre_pull_images): allowed_instance = get_allowed_instance_image(cloud_region, ins_type, ins_platform, ins_img) if allowed_instance and allowed_instance["init_script"]: init_script = open(allowed_instance["init_script"], 'r') user_data_script = init_script.read() certs_string = get_certs_string() well_known_string = get_well_known_hosts_string(cloud_region) fs_type = allowed_instance.get('fs_type', DEFAULT_FS_TYPE) if fs_type not in SUPPORTED_FS_TYPES: pipe_log_warn('Unsupported filesystem type is specified: %s. Falling back to default value %s.' % (fs_type, DEFAULT_FS_TYPE)) fs_type = DEFAULT_FS_TYPE init_script.close() user_data_script = replace_proxies(cloud_region, user_data_script) user_data_script = replace_swap(swap_size, user_data_script) user_data_script = replace_docker_images(pre_pull_images, user_data_script) user_data_script = user_data_script.replace('@DOCKER_CERTS@', certs_string) \ .replace('@WELL_KNOWN_HOSTS@', well_known_string) \ .replace('@KUBE_IP@', kube_ip) \ .replace('@KUBE_TOKEN@', kubeadm_token) \ .replace('@KUBE_CERT_HASH@', kubeadm_cert_hash) \ .replace('@KUBE_NODE_TOKEN@', kube_node_token) \ .replace('@API_URL@', api_url) \ .replace('@API_TOKEN@', api_token) \ .replace('@API_USER@', api_user) \ .replace('@FS_TYPE@', fs_type) \ .replace('@GLOBAL_DISTRIBUTION_URL@', global_distribution_url) \ .replace('@KUBE_RESERVED_MEM@', os.getenv('KUBE_RESERVED_MEM', '')) \ .replace('@SYSTEM_RESERVED_MEM@', os.getenv('SYSTEM_RESERVED_MEM', '')) embedded_scripts = {} if allowed_instance["embedded_scripts"]: for embedded_name, embedded_path in allowed_instance["embedded_scripts"].items(): embedded_scripts[embedded_name] = open(embedded_path, 'r').read() if ins_platform == 'windows': return pack_powershell_script_contents(user_data_script, embedded_scripts) else: return pack_script_contents(user_data_script, embedded_scripts) else: raise RuntimeError('Unable to get init.sh path') def get_res_grp_and_res_name_from_string(resource_id, resource_type): resource_params = resource_id.split("/") if len(resource_params) == 2: resource_group, resource = resource_params[0], resource_params[1] # according to full ID form: /subscriptions/<sub-id>/resourceGroups/<res-grp>/providers/Microsoft.Compute/images/<image> elif len(resource_params) == RESOURCE_ID_PARTS_NUMBER \ and resource_params[RESOURCE_GROUP_KEY_INDEX] == 'resourceGroups' \ and resource_params[RESOURCE_TYPE_INDEX] == resource_type: resource_group, resource = resource_params[RESOURCE_GROUP_NAME_INDEX], resource_params[RESOURCE_NAME_INDEX] else: raise RuntimeError( "Resource parameter doesn't match to Azure resource name convention: <resource_group>/<resource_name>" " or full resource id: /subscriptions/<sub-id>/resourceGroups/<res-grp>/providers/Microsoft.Compute/<type>/<name>. " "Node Up process will be stopped.") return resource_group, resource def get_subnet_name_from_id(subnet_id): if "/" not in subnet_id: return subnet_id subnet_params = subnet_id.split("/") # according to /subscriptions/<sub>/resourceGroups/<res_grp>/providers/Microsoft.Network/virtualNetworks/<vnet>/subnets/<subnet> if len(subnet_params) == SUBNET_ID_PARTS_NUMBER \ and subnet_params[RESOURCE_GROUP_KEY_INDEX] == "resourceGroups" \ and subnet_params[SUBNET_TYPE_INDEX] == "subnets": return subnet_params[SUBNET_NAME_INDEX] else: raise RuntimeError("Subnet dont match form of the Azure ID " "/subscriptions/<sub>/resourceGroups/<res_grp>/providers/Microsoft.Network/virtualNetworks/<vnet>/subnets/<subnet>: {}".format(subnet_id)) def map_labels_to_dict(additional_labels_list): additional_labels_dict = dict() for label in additional_labels_list: label_parts = label.split("=") if len(label_parts) == 1: additional_labels_dict[label_parts[0]] = None else: additional_labels_dict[label_parts[0]] = label_parts[1] return additional_labels_dict def main(): parser = argparse.ArgumentParser() parser.add_argument("--ins_key", type=str, required=True) parser.add_argument("--run_id", type=str, required=True) parser.add_argument("--cluster_name", type=str, required=False) parser.add_argument("--cluster_role", type=str, required=False) parser.add_argument("--ins_type", type=str, default='Standard_B2s') parser.add_argument("--ins_hdd", type=int, default=30) parser.add_argument("--ins_img", type=str, default='pipeline-azure-group/pipeline-base-image') parser.add_argument("--ins_platform", type=str, default='linux') parser.add_argument("--num_rep", type=int, default=250) # 250 x 3s = 12.5m parser.add_argument("--time_rep", type=int, default=3) parser.add_argument("--is_spot", type=bool, default=False) parser.add_argument("--bid_price", type=float, default=1.0) parser.add_argument("--kube_ip", type=str, required=True) parser.add_argument("--kubeadm_token", type=str, required=True) parser.add_argument("--kubeadm_cert_hash", type=str, required=True) parser.add_argument("--kube_node_token", type=str, required=True) parser.add_argument("--kms_encyr_key_id", type=str, required=False) parser.add_argument("--region_id", type=str, default=None) parser.add_argument("--label", type=str, default=[], required=False, action='append') parser.add_argument("--image", type=str, default=[], required=False, action='append') args, unknown = parser.parse_known_args() ins_key_path = args.ins_key run_id = args.run_id ins_type = args.ins_type ins_hdd = args.ins_hdd ins_img = args.ins_img ins_platform = args.ins_platform is_spot = args.is_spot num_rep = args.num_rep time_rep = args.time_rep cluster_name = args.cluster_name cluster_role = args.cluster_role kube_ip = args.kube_ip kubeadm_token = args.kubeadm_token kubeadm_cert_hash = args.kubeadm_cert_hash kube_node_token = args.kube_node_token region_id = args.region_id pre_pull_images = args.image additional_labels = map_labels_to_dict(args.label) pool_id = additional_labels.get(POOL_ID_KEY) global_distribution_url = os.getenv('GLOBAL_DISTRIBUTION_URL', default='https://cloud-pipeline-oss-builds.s3.us-east-1.amazonaws.com/') global zone zone = region_id if not kube_ip or not kubeadm_token: raise RuntimeError('Kubernetes configuration is required to create a new node') pipe_log_init(run_id) wait_time_sec = get_preference(NODE_WAIT_TIME_SEC) if wait_time_sec and wait_time_sec.isdigit(): num_rep = int(wait_time_sec) / time_rep cloud_region = get_cloud_region(region_id) pipe_log('Started initialization of new calculation node in cloud region {}:\n' '- RunID: {}\n' '- Type: {}\n' '- Disk: {}\n' '- Image: {}\n' '- Platform: {}\n' '- Repeat attempts: {}\n' '- Repeat timeout: {}\n-'.format(cloud_region, run_id, ins_type, ins_hdd, ins_img, ins_platform, str(num_rep), str(time_rep))) try: api = pykube.HTTPClient(pykube.KubeConfig.from_service_account()) except Exception as e: api = pykube.HTTPClient(pykube.KubeConfig.from_file("~/.kube/config")) api.session.verify = False resource_name = VM_NAME_PREFIX + uuid.uuid4().hex[0:UUID_LENGHT] try: if not ins_img or ins_img == 'null': # Redefine default instance image if cloud metadata has specific rules for instance type allowed_instance = get_allowed_instance_image(cloud_region, ins_type, ins_platform, ins_img) if allowed_instance and allowed_instance["instance_mask"]: pipe_log('Found matching rule {instance_mask}/{ami} for requested instance type {instance_type}' '\nImage {ami} will be used'.format(instance_mask=allowed_instance["instance_mask"], ami=allowed_instance["instance_mask_ami"], instance_type=ins_type)) ins_img = allowed_instance["instance_mask_ami"] else: pipe_log('Specified in configuration image {ami} will be used'.format(ami=ins_img)) ins_id, ins_ip = verify_run_id(run_id) if not ins_id: api_url = os.environ["API"] api_token = os.environ["API_TOKEN"] api_user = os.environ["API_USER"] ins_id, ins_ip = run_instance(api_url, api_token, api_user, resource_name, ins_type, cloud_region, run_id, pool_id, ins_hdd, ins_img, ins_platform, ins_key_path, "pipeline", ins_type, is_spot, kube_ip, kubeadm_token, kubeadm_cert_hash, kube_node_token, global_distribution_url, pre_pull_images) nodename = verify_regnode(ins_id, num_rep, time_rep, api) label_node(nodename, run_id, api, cluster_name, cluster_role, cloud_region, additional_labels) pipe_log('Node created:\n' '- {}\n' '- {}'.format(ins_id, ins_ip)) # External process relies on this output print(ins_id + "\t" + ins_ip + "\t" + nodename) pipe_log('{} task finished'.format(NODEUP_TASK), status=TaskStatus.SUCCESS) except Exception as e: delete_all_by_run_id(run_id) delete_scale_set_nodes_from_kube(kube_api=api, scale_set_name=resource_name) pipe_log('[ERROR] ' + str(e), status=TaskStatus.FAILURE) raise e if __name__ == '__main__': main()