syndicate/connection/ec2_connection.py (539 lines of code) (raw):

""" Copyright 2018 EPAM Systems, Inc. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. """ import inspect import json import time from pathlib import Path from typing import Generator, Optional, List, Iterable, Any import botocore from botocore.exceptions import ClientError from boto3 import client from syndicate.commons.log_helper import get_logger from syndicate.connection.helper import apply_methods_decorator, retry from syndicate.core.constants import EC2_LT_RESOURCE_TAGS _LOG = get_logger(__name__) def create_permissions(ranges): ip_ranges = [{'CidrIp': ip_range} for ip_range in ranges] return [{ 'IpProtocol': '-1', 'FromPort': -1, 'ToPort': -1, 'IpRanges': ip_ranges }] def preserve_default_permission(group_id, permissions): for permission in permissions: if 'UserIdGroupPairs' in permission and permission['UserIdGroupPairs']: for pair in permission['UserIdGroupPairs']: if pair['GroupId'] == group_id: permission.pop('UserIdGroupPairs') @apply_methods_decorator(retry()) class EC2Connection(object): """ EC2 connection class.""" def __init__(self, region=None, aws_access_key_id=None, aws_secret_access_key=None, aws_session_token=None): self.client = client('ec2', region, aws_access_key_id=aws_access_key_id, aws_secret_access_key=aws_secret_access_key, aws_session_token=aws_session_token) _LOG.debug('Opened new EC2 connection.') def describe_security_groups(self, name=None, sg_id=None, vpc_id=None): filters = [] if name: if isinstance(name, list): filters.append({'Name': 'group-name', 'Values': name}) elif isinstance(name, str): filters.append({'Name': 'group-name', 'Values': [name]}) else: _LOG.warning('Unacceptable name type: %s', type(name)) if sg_id: if isinstance(sg_id, list): filters.append({'Name': 'group-id', 'Values': sg_id}) elif isinstance(sg_id, str): filters.append({'Name': 'group-id', 'Values': [sg_id]}) else: _LOG.warning( f'Unacceptable security group id type: {type(sg_id)}') if vpc_id: filters.append({'Name': 'vpc-id', 'Values': [vpc_id]}) return self.client.describe_security_groups(Filters=filters)[ 'SecurityGroups'] def describe_regions(self, name=None): filters = [] if name: if isinstance(name, list): filters.append({'Name': 'region-name', 'Values': name}) elif isinstance(name, str): filters.append({'Name': 'region-name', 'Values': [name]}) else: _LOG.warning('Unacceptable name type: %s', type(name)) return self.client.describe_regions(Filters=filters)['Regions'] def get_default_vpc_id(self): for vpc in self.client.describe_vpcs()['Vpcs']: if vpc['IsDefault']: return vpc['VpcId'] def create_sg(self, name, desc, vpc_id): return self.client.create_security_group(GroupName=name, Description=desc, VpcId=vpc_id)['GroupId'] def authorize_ingress(self, group_id, group_name, ranges): if not group_id: group_id = self.get_sg_id(group_name) permissions = create_permissions(ranges) self.client.authorize_security_group_ingress(GroupId=group_id, GroupName=group_name, IpPermissions=permissions) def revoke_ingress(self, group_id, group_name, ranges): permissions = create_permissions(ranges) self.revoke_ingress_permissions(group_id, group_name, permissions) def revoke_ingress_permissions(self, group_id, group_name, permissions): if not group_id: group_id = self.get_sg_id(group_name) preserve_default_permission(group_id, permissions) if not permissions: return self.client.revoke_security_group_ingress(GroupId=group_id, GroupName=group_name, IpPermissions=permissions) def delete_sg(self, group_name): self.client.delete_security_group(GroupName=group_name) def get_sg_id(self, group_name, vpc_id=None): if not vpc_id: vpc_id = self.get_default_vpc_id() group, = self.describe_security_groups(group_name, vpc_id) if group: return group['GroupId'] def get_key_pairs( self, dry_run: bool = False, key_names: list | None = None, filters: list | None = None, ) -> dict | list[dict]: """ :type dry_run: bool :type key_names: list :type filters: list :return: """ params = dict(DryRun=dry_run) if key_names: params['KeyNames'] = key_names if filters: params['Filters'] = filters return self.client.describe_key_pairs(**params) def if_key_pair_exists(self, key_name): key_pairs = self.get_key_pairs().get('KeyPairs') if key_pairs: for each in key_pairs: if each['KeyName'] == key_name: return True def list_vpcs(self, dry_run=None, vpc_ids=None, filters=None): params = dict() if dry_run: params['DryRun'] = dry_run if vpc_ids: params['VpcIds'] = vpc_ids if filters: params['Filters'] = filters response = self.client.describe_vpcs(**params) if response: return response['Vpcs'] def list_subnets(self, dry_run=None, subnet_ids=None, filters=None): params = dict() if dry_run: params['DryRun'] = dry_run if subnet_ids: params['SubnetIds'] = subnet_ids if filters: params['Filters'] = filters response = self.client.describe_subnets(**params) if response: return response['Subnets'] def get_azs(self): response = self.client.describe_availability_zones() return [az['ZoneName'] for az in response['AvailabilityZones']] def describe_image(self, image_id): params = dict(ImageIds=[image_id]) response = self.client.describe_images(**params) return response['Images'] def describe_instances(self, filters, instance_ids=None): params = {} result_list = [] if filters: params['Filters'] = filters if instance_ids: params['InstanceIds'] = instance_ids response = self.client.describe_instances(**params) result_list.extend([reservation['Instances'][0] for reservation in response['Reservations']]) token = response.get('NextToken') while token: # value is 'null' if there is no token params['NextToken'] = token response = self.client.describe_instances(**params) result_list.extend([reservation['Instances'][0] for reservation in response['Reservations']]) token = response.get('NextToken') return result_list def terminate_instances(self, instance_ids): self.client.terminate_instances( InstanceIds=instance_ids ) def launch_instance(self, image_id, instance_type, security_groups_names=None, security_group_ids=None, iam_instance_profile=None, name=None, key_name=None, user_data=None, tags=None, subnet_id=None, availability_zone=None): tags = tags or [] if iam_instance_profile: if not iam_instance_profile.get('Arn') \ and not iam_instance_profile.get('Name'): raise AssertionError('Provided instance profile {0}' 'is not well-formed. ' 'Arn or Name nodes required.' .format(iam_instance_profile)) if name: tags.append({ 'Key': 'Name', 'Value': name }) instance_parameters = { 'ImageId': image_id, 'InstanceType': instance_type, 'MinCount': 1, 'MaxCount': 1 } if tags: instance_parameters['TagSpecifications'] = [{ 'ResourceType': 'instance', 'Tags': tags }] if availability_zone: instance_parameters['Placement'] = { 'AvailabilityZone': availability_zone } if key_name: instance_parameters['KeyName'] = key_name if user_data: instance_parameters['UserData'] = user_data if iam_instance_profile: instance_parameters['IamInstanceProfile'] = iam_instance_profile if subnet_id: instance_parameters['SubnetId'] = subnet_id if security_groups_names: instance_parameters['SecurityGroups'] = security_groups_names if security_group_ids: instance_parameters['SecurityGroupIds'] = security_group_ids response = self.client.run_instances(**instance_parameters) # always launch only one instance launched_instances = response['Instances'] if len(launched_instances) < 1: return 'No instances launched' else: return launched_instances[0] def modify_instance_attribute(self, **kwargs): """ log_not_found_error parameter is needed for proper log handling in the retry decorator """ kwargs.pop('log_not_found_error', None) if not kwargs['InstanceId']: raise AssertionError('InstanceId must be specified') self.client.modify_instance_attribute(**kwargs) def deploy_security_groups(self, groups): default_vpc_id = self.get_default_vpc_id() groups_in_default_vpc = self.describe_security_groups( vpc_id=default_vpc_id) group_names = [g['GroupName'] for g in groups_in_default_vpc] verify = [] create = [] for group in groups: if group['n'] in group_names: verify.append(group) else: create.append(group) if create: self._create_security_groups(default_vpc_id, create) if verify: self._verify_security_groups(verify, groups_in_default_vpc) def _delete_security_groups(self, groups): vpc_id = self.client.get_default_vpc_id() security_groups = self.client.describe_security_groups(groups, vpc_id) existent_names = [sg['GroupName'] for sg in security_groups] for sg_name in groups: if sg_name not in existent_names: continue if sg_name == 'default': security_group, = self.client.describe_security_groups( 'default', vpc_id) self.client.revoke_ingress_permissions(group_id=None, group_name='default', permissions= security_group[ 'IpPermissions']) else: self.client.delete_sg(sg_name) def _create_security_groups(self, vpc_id, groups_to_create): for group in groups_to_create: if group['n'] == 'default': continue group_id = self.create_sg(group['n'], group['d'], vpc_id) ranges = group['r'] # add waiting to auth sg time.sleep(5) self.authorize_ingress(group_id, group['n'], ranges) def _verify_security_groups(self, groups, sgs): security_groups = {sg['GroupName']: sg for sg in sgs} for group in groups: name = group['n'] sg = security_groups[name] should_be = group['r'] actual = [] for permission in sg['IpPermissions']: for ip_range in permission['IpRanges']: actual.append(ip_range['CidrIp']) to_revoke = [cidr for cidr in actual if cidr not in should_be] to_authorize = [cidr for cidr in should_be if cidr not in actual] if to_revoke: self.revoke_ingress(sg['GroupId'], name, to_revoke) if to_authorize: # add waiting to auth sg time.sleep(5) self.authorize_ingress(sg['GroupId'], name, to_authorize) def associate_address(self, instance_id=None, public_ip=None, allow_reassociation=False): params = dict(AllowReassociation=allow_reassociation) if instance_id: params['InstanceId'] = instance_id if public_ip: params['PublicIp'] = public_ip return self.client.associate_address(**params) def resolve_resource_tags( # noqa Reason: @apply_methods_decorator(retry()) self, resource_tags: dict[str, dict[str, str]], ) -> list[dict | None]: base_error_message = ( "Failed to process 'resource_tags' for the EC2 launch template. " "This step will be skipped." ) if not isinstance(resource_tags, dict): _LOG.error( f"{base_error_message} Reason: 'resource_tags' should be a " f"dictionary." ) return [] resource_tag_specs = [] for resource_type, tags_dict in resource_tags.items(): if not isinstance(tags_dict, dict): _LOG.error( f"{base_error_message} The value for key '{resource_type}' " f"is not a dictionary as expected" ) return [] if resource_type not in EC2_LT_RESOURCE_TAGS: _LOG.error( f"{base_error_message} " f"Encountered an invalid resource type '{resource_type}'. " f"Valid types include: {EC2_LT_RESOURCE_TAGS}" ) return [] tag_list = [{'Key': k, 'Value': v} for k, v in tags_dict.items()] resource_tag_specs.append({ 'ResourceType': resource_type, 'Tags': tag_list, }) return resource_tag_specs def create_launch_template( self, name: str, lt_data: dict, version_description: str | None = None, tags: list[dict[str, Any]] | None = None, resource_tags: dict[str, dict[str, str]] | None = None, ) -> dict: params = { 'LaunchTemplateName': name, 'LaunchTemplateData': lt_data, } if version_description is not None: params['VersionDescription'] = version_description if tags: params['TagSpecifications'] = [{ 'ResourceType': 'launch-template', 'Tags': tags, }] if resource_tags: resource_tag_specs = self.resolve_resource_tags(resource_tags) if 'TagSpecifications' not in lt_data: lt_data['TagSpecifications'] = [] lt_data['TagSpecifications'].extend(resource_tag_specs) return self.client.create_launch_template(**params) def create_launch_template_version( self, lt_name: str | None = None, lt_id: str | None = None, source_version: str | None = None, lt_data: dict | None = None, version_description: str | None = None, resource_tags: dict[str, dict[str, str]] | None = None, ) -> dict | None: if not lt_name and not lt_id: _LOG.error( 'A launch template version cannot be created without the name ' 'or ID of the launch template' ) return None params = { 'LaunchTemplateId': lt_id or None, 'LaunchTemplateName': lt_name if not lt_id else None, 'SourceVersion': source_version or None, 'VersionDescription': version_description or None, 'LaunchTemplateData': lt_data or {}, } if lt_name and lt_id: _LOG.warning( 'Both the launch template name and ID are specified. The ' 'request will be made by ID' ) if resource_tags: resource_tag_specs = self.resolve_resource_tags(resource_tags) if 'TagSpecifications' not in lt_data: lt_data['TagSpecifications'] = [] lt_data['TagSpecifications'].extend(resource_tag_specs) return self.client.create_launch_template_version( **{k: v for k, v in params.items() if v is not None} ) def describe_launch_templates( self, lt_name: str | None = None, lt_id = None, ) -> list: result_list = list() params = dict() if lt_name is not None and lt_id is not None: _LOG.warning('Both the launch template name and ID are specified. ' 'The request will be made by ID.') if isinstance(lt_id, list): params['LaunchTemplateIds'] = lt_id elif isinstance(lt_id, str): params['LaunchTemplateIds'] = [lt_id] else: _LOG.warning( f'Unsupported launch template ID type {type(lt_id)}') elif lt_name is not None: if isinstance(lt_name, list): params['LaunchTemplateNames'] = lt_name elif isinstance(lt_name, str): params['LaunchTemplateNames'] = [lt_name] else: _LOG.warning( f'Unsupported launch template name type {type(lt_name)}') elif lt_id is not None: if isinstance(lt_id, list): params['LaunchTemplateIds'] = lt_id elif isinstance(lt_id, str): params['LaunchTemplateIds'] = [lt_id] else: _LOG.warning( f'Unsupported launch template ID type {type(lt_id)}') try: response = self.client.describe_launch_templates(**params) if ( params) else self.client.describe_launch_templates() token = response.get('NextToken') result_list.extend(response['LaunchTemplates']) while token: if params: params['NextToken'] = token response = self.client.describe_launch_templates(**params) else: response = self.client.describe_launch_templates() token = response.get('NextToken') result_list.extend(response['LaunchTemplates']) except ClientError as e: if ('InvalidLaunchTemplateName.NotFoundException' in str(e) or 'InvalidLaunchTemplateId.NotFound' in str(e)): dynamic_message = f"by name '{lt_name}'" if lt_name else \ f"by ID '{lt_id}'" _LOG.warning(f"Launch template not found by {dynamic_message}") else: raise e return result_list def delete_launch_template( self, lt_name = None, lt_id = None, log_not_found_error: bool = True, ) -> None: """ log_not_found_error parameter is needed for proper log handling in the retry decorator """ params = dict() if lt_name is not None and lt_id is not None: _LOG.warning( 'Both the launch template name and ID are specified. The ' 'request will be made by ID.' ) params['LaunchTemplateId'] = lt_id elif lt_name is not None: params['LaunchTemplateName'] = lt_name elif lt_id is not None: params['LaunchTemplateId'] = lt_id else: raise AssertionError( 'Either the launch template name or ID must be provided for ' 'removing the launch template.') self.client.delete_launch_template(**params) def modify_launch_template( self, default_version, lt_name=None, lt_id=None, ) -> None: params = dict() params['DefaultVersion'] = default_version if lt_name is not None and lt_id is not None: _LOG.warning( 'Both the launch template name and ID are specified. The ' 'request will be made by ID.' ) params['LaunchTemplateId'] = lt_id elif lt_name is not None: params['LaunchTemplateName'] = lt_name elif lt_id is not None: params['LaunchTemplateId'] = lt_id else: _LOG.error('A launch template modification can not be done ' 'without the name or ID of the launch template.') return return self.client.modify_launch_template(**params) class InstanceTypes: @staticmethod def from_api( region_name: Optional[str] = None, current_generation: Optional[bool] = None, arch: Optional[List] = None, ) -> Generator[str, None, None]: filters = [] if isinstance(current_generation, bool): filters.append({ 'Name': 'current-generation', 'Values': [str(current_generation).lower()] }) if isinstance(arch, list): assert set(arch).issubset({'arm64', 'i386', 'x86_64'}) filters.append({ 'Name': 'processor-info.supported-architecture', 'Values': arch }) params = {} if filters: params['Filters'] = filters ec2 = client('ec2', region_name=region_name) while True: res = ec2.describe_instance_types(**params) yield from (item['InstanceType'] for item in res['InstanceTypes']) _next = res.get('NextToken') if not _next: break params['NextToken'] = _next @staticmethod def from_botocore() -> Generator[str, None, None]: path = Path(inspect.getfile(botocore)).parent with open(Path(path, 'data', 'ec2', '2016-11-15', 'service-2.json'), encoding="utf8") as fp: data = json.load(fp) yield from data['shapes']['InstanceType']['enum'] @staticmethod def instance_type_group(instance_type: str) -> str: return instance_type.split('.')[0] @staticmethod def with_groups(it: Iterable[str]) -> Generator[str, None, None]: """ Before yielding an instance type, yields its group. A group is yielded only once. :param it: Iterable[str] :return: Generator[str, None, None] """ emitted = set() for instance_type in it: group = InstanceTypes.instance_type_group(instance_type) if group not in emitted: yield group emitted.add(group) yield instance_type