syndicate/core/resources/ec2_resource.py (347 lines of code) (raw):
"""
Copyright 2018 EPAM Systems, Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
import base64
import os
from time import sleep
from typing import Any
from syndicate.commons.log_helper import get_logger
from syndicate.connection.ec2_connection import InstanceTypes
from syndicate.core import ClientError
from syndicate.core.helper import unpack_kwargs, \
dict_keys_to_capitalized_camel_case
from syndicate.core.resources.base_resource import BaseResource
from syndicate.core.resources.helper import build_description_obj, chunks
_LOG = get_logger(__name__)
class Ec2Resource(BaseResource):
def __init__(self, ec2_conn, iam_conn, region, account_id) -> None:
self.ec2_conn = ec2_conn
self.iam_conn = iam_conn
self.region = region
self.account_id = account_id
def describe_ec2(self, name, meta, response=None):
if not response:
response = self.ec2_conn.describe_instances()
arn = 'arn:aws:ec2:{0}:{1}:instance/{2}'.format(self.region,
self.account_id,
response['InstanceId'])
describe_response = self.ec2_conn.describe_instances(
[
{
'Name': 'instance-id',
'Values': [response['InstanceId']]
}
]
)
response['NetworkInterfaces'] = describe_response
return {
arn: build_description_obj(response, name, meta)
}
def create_ec2(self, args):
return self.create_pool(self._create_ec2_from_meta, args)
@unpack_kwargs
def _create_ec2_from_meta(self, name, meta):
from syndicate.core import CONF_PATH
# checking required parameters
image_id = meta['image_id']
image_data = self.ec2_conn.describe_image(image_id=image_id)
if not image_data:
raise AssertionError(f'Image id {image_id} is invalid')
instance_type = meta.get('instance_type')
if not instance_type:
raise AssertionError('Instance type must be specified')
if instance_type not in InstanceTypes.from_botocore():
raise AssertionError(f'Not available instance type: {instance_type}')
key_name = meta.get('key_name')
if not self.ec2_conn.if_key_pair_exists(key_name):
raise AssertionError(f'There is no key pair with name: {key_name}')
availability_zone = meta.get('availability_zone')
subnet = meta.get('subnet_id')
if availability_zone:
subnet_filter = {
'Name': 'availabilityZone',
'Values': [availability_zone]
}
subnet_list = self.ec2_conn.list_subnets(filters=[subnet_filter])
if subnet and subnet not in \
[subnet_ids['SubnetId'] for subnet_ids in subnet_list]:
raise AssertionError(
f'There is no available Subnets with name {subnet} in '
f'Availability Zone {availability_zone}.'
)
if availability_zone not in self.ec2_conn.get_azs():
raise AssertionError(
f'There is no Availability Zone with name: {availability_zone}'
)
security_groups_names = meta.get('security_group_names')
if security_groups_names:
sg_meta = self.ec2_conn.describe_security_groups(
security_groups_names)
described_sec_groups_names = [security_group['GroupName']
for security_group in sg_meta]
for security_group_name in security_groups_names:
if security_group_name not in described_sec_groups_names:
raise AssertionError(
f'Security group {security_group_name} does not exist'
)
# checking optional parameters
user_data_file_name = meta.get('userdata_file')
user_data_content = None
if user_data_file_name:
user_data_location = os.path.join(CONF_PATH, user_data_file_name)
if not os.path.isfile(user_data_location):
_LOG.warning(
f'There is no user data {user_data_file_name} found by path'
f' {CONF_PATH}'
)
else:
with open(user_data_location, 'r') as userdata_file:
user_data_content = userdata_file.read()
# describing instance profile by iam role name
iam_role_name = meta.get('iam_role')
iam_instance_profile_object = None
if iam_role_name:
instance_profiles = self.iam_conn.get_instance_profiles_for_role(
role_name=iam_role_name)
if instance_profiles:
iam_profile_meta = instance_profiles[0]
iam_instance_profile_arn = iam_profile_meta['Arn']
iam_instance_profile_object = {'Arn': iam_instance_profile_arn}
# launching instance
response = self.ec2_conn.launch_instance(
name=name,
image_id=image_id,
instance_type=instance_type,
key_name=key_name,
tags=meta.get('tags'),
security_groups_names=meta.get('security_group_names'),
security_group_ids=meta.get('security_group_ids'),
user_data=user_data_content,
iam_instance_profile=iam_instance_profile_object,
subnet_id=meta.get('subnet_id'),
availability_zone=availability_zone
)
if meta.get('disableApiTermination'):
disable_api_termination = meta.get('disableApiTermination')
_LOG.debug(
f'Found disableApiTermination property: '
f'{disable_api_termination}'
)
if str(disable_api_termination).lower() == 'true':
self.ec2_conn.modify_instance_attribute(
InstanceId=response['InstanceId'],
DisableApiTermination={'Value': True},
)
_LOG.info(
f'Created EC2 instance {name}. Waiting for instance network '
f'interfaces configuring.'
)
sleep(30) # time for vm to become running
return self.describe_ec2(name, meta, response)
def remove_ec2_instances(self, args):
return self._remove_ec2_instances(args)
def _remove_ec2_instances(self, args):
results = {}
exceptions = []
for res_chunk in chunks(args, 1000):
existing_instances_list = []
for resource in res_chunk:
arn = resource['arn']
config = resource['config']
instance_id = config['description']['InstanceId']
try:
self.ec2_conn.modify_instance_attribute(
InstanceId=instance_id,
DisableApiTermination={'Value': False},
log_not_found_error=False
)
existing_instances_list.append(instance_id)
except ClientError as e:
if 'InvalidInstanceID.NotFound' in str(e):
_LOG.warn('Instance %s does not exist', instance_id)
results.update({arn: config})
elif 'IncorrectInstanceState' in str(e):
_LOG.warn('Instance %s '
'already terminated', instance_id)
results.update({arn: config})
else:
exceptions.append(f'Caused by resource {arn}. {e}')
if existing_instances_list:
try:
self.ec2_conn.terminate_instances(
instance_ids=existing_instances_list)
_LOG.info('EC2 instances %s were removed.',
str(existing_instances_list))
results.update({
item['arn']: item['config'] for item in res_chunk
})
except Exception as e:
exceptions.append(str(e))
filters = [
{
'Name': 'instance-state-name',
'Values': ['pending', 'running',
'stopping', 'stopped']
}
]
try:
described_instances = self.ec2_conn.describe_instances(
filters=filters,
instance_ids=existing_instances_list
)
described_instances_ids = [x['InstanceId'] for x in
described_instances]
for instance in res_chunk:
instance_id = \
instance['config']['description']['InstanceId']
if instance_id not in described_instances_ids:
results.update({
instance['arn']: instance['config']
})
except Exception as e:
exceptions.append(str(e))
return (results, exceptions) if exceptions else results
def describe_launch_template(
self,
name: str,
meta: dict,
response = None,
) -> dict:
if not response:
response = self.ec2_conn.describe_launch_templates(lt_name=name)
else:
response = [response['LaunchTemplate']]
if response:
lt_id = response[0]['LaunchTemplateId']
return {
lt_id: build_description_obj(response, name, meta)
}
return {}
def create_launch_template(self, args):
return self.create_pool(self._create_launch_template_from_meta, args)
@unpack_kwargs
def _create_launch_template_from_meta(
self,
name: str,
meta: dict,
) -> dict:
resource_tags = meta.get('launch_template_data').get('resource_tags')
lt_data = self._prepare_launch_template_data(meta)
response = self.ec2_conn.create_launch_template(
name=name,
lt_data=dict_keys_to_capitalized_camel_case(lt_data),
version_description=meta.get('version_description'),
tags=meta.get('tags'),
resource_tags=resource_tags,
)
return self.describe_launch_template(
name=name, meta=meta, response=response,
)
def remove_launch_templates(self, args):
return self.create_pool(self._remove_launch_template, args)
@unpack_kwargs
def _remove_launch_template(self, arn, config):
try:
self.ec2_conn.delete_launch_template(lt_id=arn,
log_not_found_error=False)
_LOG.info(f"Launch template with ID '{arn}' removed "
f"successfully")
return {arn: config}
except ClientError as e:
if 'InvalidLaunchTemplateId.NotFound' in str(e):
_LOG.warn(f"Launch template with ID '{arn}' not found")
return {arn: config}
else:
raise e
def update_launch_template(self, args: list[dict[str, Any]]):
return self.create_pool(self._update_launch_template_from_meta, args)
@unpack_kwargs
def _update_launch_template_from_meta(
self,
name: str,
meta: dict[str, Any],
context: dict | None,
):
resource_tags = meta.get('launch_template_data').get('resource_tags')
lt_data: dict = self._prepare_launch_template_data(meta)
lt_description = self.ec2_conn.describe_launch_templates(lt_name=name)
if not lt_description:
raise AssertionError(
f"Launch template with name '{name}' not found")
lt_latest_version = lt_description[0]['LatestVersionNumber']
self.ec2_conn.create_launch_template_version(
lt_name=name,
source_version=str(lt_latest_version),
lt_data=dict_keys_to_capitalized_camel_case(lt_data),
version_description=meta.get('version_description'),
resource_tags=resource_tags,
)
response = self.ec2_conn.modify_launch_template(
lt_name=name,
default_version=str(lt_latest_version + 1)
)
return self.describe_launch_template(
name=name, meta=meta, response=response,
)
def _prepare_launch_template_data(
self,
meta: dict,
) -> dict:
from syndicate.core import CONFIG
lt_data = meta['launch_template_data']
lt_imds = lt_data.pop('imds_support', None)
lt_data.pop('resource_tags', None)
image_id = lt_data.get('image_id')
if image_id:
image_data = self.ec2_conn.describe_image(image_id=image_id)
if not image_data:
raise AssertionError(f'Image id {image_id} is invalid')
if lt_imds:
metadata_options = {'http_tokens': 'required'} \
if lt_imds == 'v2.0' else {'http_tokens': 'optional'}
lt_data['metadata_options'] = metadata_options
key_name = lt_data.get('key_name')
if key_name and not self.ec2_conn.if_key_pair_exists(key_name):
raise AssertionError(f'There is no key pair with name: {key_name}')
security_groups_names = lt_data.get('security_groups')
if security_groups_names:
sg_meta = self.ec2_conn.describe_security_groups(
security_groups_names)
described_sec_groups_names = [security_group['GroupName']
for security_group in sg_meta]
for security_group_name in security_groups_names:
if security_group_name not in described_sec_groups_names:
raise AssertionError(
f'Security group {security_group_name} does not exist')
security_group_ids = lt_data.get('security_group_ids')
if security_group_ids:
sg_meta = self.ec2_conn.describe_security_groups(
sg_id=security_group_ids)
described_sec_group_ids = [security_group['GroupId']
for security_group in sg_meta]
for security_group_id in security_group_ids:
if security_group_id not in described_sec_group_ids:
raise AssertionError(f'Security group with ID '
f'{security_group_id} does not exist')
iam_role_name = lt_data.pop('iam_role', None)
if iam_role_name:
instance_profiles = self.iam_conn.get_instance_profiles_for_role(
role_name=iam_role_name)
if instance_profiles:
iam_profile_meta = instance_profiles[0]
lt_data['iam_instance_profile'] = {
'arn': iam_profile_meta['Arn'],
'name': iam_profile_meta['InstanceProfileName']
}
user_data_file_path = lt_data.pop('userdata_file', None)
if user_data_file_path:
user_data_content = None
if not os.path.isabs(user_data_file_path):
user_data_file_path = os.path.join(CONFIG.project_path,
user_data_file_path)
if not os.path.isfile(user_data_file_path):
_LOG.warning(
f'There is no user data found by path {user_data_file_path}'
)
else:
with open(user_data_file_path, 'r') as userdata_file:
user_data_content = userdata_file.read()
if user_data_content:
user_data_b = \
base64.b64encode(user_data_content.encode("ascii"))
user_data = user_data_b.decode('ascii')
lt_data['user_data'] = user_data
return lt_data