syndicate/core/resources/cloud_watch_resource.py (154 lines of code) (raw):

""" Copyright 2018 EPAM Systems, Inc. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. """ import time from botocore.exceptions import ClientError from syndicate.commons.log_helper import get_logger from syndicate.core.conf.validator import ALL_REGIONS from syndicate.core.helper import unpack_kwargs from syndicate.core.resources.base_resource import BaseResource from syndicate.core.resources.helper import (build_description_obj, create_args_for_multi_region, validate_params) _LOG = get_logger(__name__) ARN_KEY = 'Arn' def _create_ec2_rule(rule_name, rule_meta, cw_conn): cw_conn.create_ec2_rule(name=rule_name, instances=rule_meta.get('instance_ids'), instance_states=rule_meta.get( 'instance_states'), tags=rule_meta.get('tags')) def _create_schedule_rule(rule_name, rule_meta, cw_conn): cw_conn.create_schedule_rule(name=rule_name, expression=rule_meta['expression'], tags=rule_meta.get('tags')) def _create_api_call_rule(rule_name, rule_meta, cw_conn): cw_conn.create_api_call_rule(name=rule_name, aws_service=rule_meta.get('aws_service'), operations=rule_meta.get('operations'), custom_pattern=rule_meta.get( 'custom_pattern'), tags=rule_meta.get('tags')) RULE_TYPES = { 'schedule': _create_schedule_rule, 'ec2': _create_ec2_rule, 'api_call': _create_api_call_rule } def validate_cloud_watch_rule_params(name, meta): # validation depends on rule type required_parameters = ['rule_type'] rule_type = meta.get('rule_type') if rule_type: if rule_type == 'schedule': required_parameters.append('expression') validate_params(name, meta, required_parameters) def get_event_bus_arn(event_bus, region): target_arn = 'arn:aws:events:{0}:{1}:event-bus/default'.format( region, event_bus) return target_arn class CloudWatchResource(BaseResource): def __init__(self, cw_events_conn_builder, account_id) -> None: self.cw_events_conn = cw_events_conn_builder() self._cw_events_conn_builder = cw_events_conn_builder self.account_id = account_id def describe_rule(self, name, meta, region, response=None): if not response: response = self._cw_events_conn_builder(region).get_rule(name) arn = response[ARN_KEY] del response[ARN_KEY] return {arn: build_description_obj(response, name, meta)} def describe_rule_from_meta(self, name, meta): new_region_args = create_args_for_multi_region( [ {'name': name, 'meta': meta} ], ALL_REGIONS) responses = [] for arg in new_region_args: rule = self._cw_events_conn_builder(arg['region']).get_rule(name) if rule: responses.append(rule) description = {} for rule in responses: arn = rule[ARN_KEY] del rule[ARN_KEY] description.update({arn: build_description_obj(rule, name, meta)}) return description def create_cloud_watch_rule(self, args): """ Create an event rule from meta in region/regions. :type args: list """ new_region_args = create_args_for_multi_region(args, ALL_REGIONS) return self.create_pool(self._create_cloud_watch_rule_from_meta, new_region_args) @unpack_kwargs def _create_cloud_watch_rule_from_meta(self, name, meta, region): validate_cloud_watch_rule_params(name=name, meta=meta) rule_type = meta['rule_type'] event_buses = meta.get('event_bus_accounts') response = self._cw_events_conn_builder(region).get_rule(name) if response: _LOG.warn('%s rule exists in %s.', name, region) return self.describe_rule(name=name, meta=meta, region=region, response=response) try: func = RULE_TYPES[rule_type] func(name, meta, self._cw_events_conn_builder(region)) if event_buses: time.sleep(5) self._attach_tenant_rule_targets(name, region, event_buses) _LOG.info('Created an event rule %s in %s.', name, region) response = self._cw_events_conn_builder(region).get_rule(name) time.sleep(5) return self.describe_rule(name=name, meta=meta, region=region, response=response) except KeyError: raise AssertionError( 'Invalid rule type: {0} for resource {1}. ' 'Please, change rule type with existing: ' 'schedule|ec2|api_call.'.format(rule_type, name)) def _attach_tenant_rule_targets(self, rule_name, region, event_buses): for event_bus in event_buses: target_arn = get_event_bus_arn(event_bus=event_bus, region=region) existing_targets = self._cw_events_conn_builder( region).list_targets_by_rule( rule_name=rule_name) for target in existing_targets: if target[ARN_KEY] == target_arn: _LOG.debug('Target to event bus %s is already attached', target_arn) return self._cw_events_conn_builder(region).add_rule_target( rule_name=rule_name, target_arn=target_arn) def _handle_deactivation_for_cw_resources(self, cw_conn, region, rule_name): targets = cw_conn.list_targets_by_rule(rule_name) home_eb_arn = f'arn:aws:events:' \ f'{region}:{self.account_id}:event-bus/default' _LOG.debug('Home account event bus arn: %s', home_eb_arn) for target in targets: resource_arn = target[ARN_KEY] if resource_arn == home_eb_arn: cw_conn.remove_targets(rule_name, [target['Id']]) _LOG.debug('Target %s removed', resource_arn) targets = cw_conn.list_targets_by_rule(rule_name) if targets: _LOG.debug('Will not remove rule, targets attached') else: _LOG.debug('Going to remove rule %s', rule_name) cw_conn.remove_rule(rule_name) _LOG.debug('Rule %s removed', rule_name) def remove_cloud_watch_rules(self, args): return self.create_pool(self._remove_cloud_watch_rule, args) @unpack_kwargs def _remove_cloud_watch_rule(self, arn, config): region = arn.split(':')[3] resource_name = config['resource_name'] try: self._cw_events_conn_builder(region).remove_rule( resource_name, log_not_found_error=False) _LOG.info('Rule %s was removed', resource_name) return {arn: config} except ClientError as e: exception_type = e.response['Error']['Code'] if exception_type == 'ResourceNotFoundException': _LOG.warn('Rule %s is not found', resource_name) return {arn: config} else: raise e