syndicate/core/resources/sns_resource.py (252 lines of code) (raw):

""" Copyright 2018 EPAM Systems, Inc. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. """ from botocore.exceptions import ClientError from syndicate.commons.log_helper import get_logger from syndicate.core.conf.validator import ALL_REGIONS from syndicate.core.helper import unpack_kwargs from syndicate.core.resources.base_resource import BaseResource from syndicate.core.resources.helper import (build_description_obj, check_region_available, create_args_for_multi_region, validate_params) SNS_CLOUDWATCH_TRIGGER_REQUIRED_PARAMS = ['target_rule'] _LOG = get_logger(__name__) class SnsResource(BaseResource): def __init__(self, conn_provider, region) -> None: self.connection_provider = conn_provider self.region = region self.create_trigger = { 'cloudwatch_rule_trigger': self._create_cloud_watch_trigger_from_meta, 'eventbridge_rule_trigger': self._create_cloud_watch_trigger_from_meta } def describe_sns(self, name, meta, region, arn=None): if not arn: arn = self.connection_provider.sns(region).get_topic_arn(name) response = self.connection_provider.sns(region).get_topic_attributes( arn) return { arn: build_description_obj(response, name, meta) } def describe_sns_from_meta(self, name, meta): new_region_args = create_args_for_multi_region( [ { 'name': name, 'meta': meta } ], ALL_REGIONS) responses = [] for arg in new_region_args: region = arg['region'] topic_arn = self.connection_provider.sns(region).get_topic_arn( name) if not topic_arn: continue response = self.connection_provider.sns( region).get_topic_attributes( topic_arn) if response: responses.append({'arn': topic_arn, 'response': response}) description = {} for topic in responses: description.update({ topic['arn']: build_description_obj( topic['response'], name, meta) }) return description def describe_sns_application(self, name, meta, region, arn=None): if not arn: arn = self.connection_provider.sns( region).get_platform_application(name) response = self.connection_provider.sns( region).get_platform_application_attributes(arn) return { arn: build_description_obj(response, name, meta) } def describe_sns_application_from_meta(self, name, meta): new_region_args = create_args_for_multi_region( [ { 'name': name, 'meta': meta } ], ALL_REGIONS) responses = [] for arg in new_region_args: region = arg['region'] app_arn = self.connection_provider.sns( region).get_platform_application( name) if not app_arn: continue response = self.connection_provider.sns( region).get_platform_application_attributes( app_arn) if response: responses.append({'arn': app_arn, 'response': response}) description = {} for topic in responses: description.update({ topic['arn']: build_description_obj( topic['response'], name, meta) }) return description def create_sns_topic(self, args): """ Create sns topic from meta in region/regions. :type args: list """ new_region_args = create_args_for_multi_region(args, ALL_REGIONS) return self.create_pool(self._create_sns_topic_from_meta, new_region_args) def create_sns_application(self, args): """ Create sns application from meta in region/regions. :type args: list """ new_region_args = create_args_for_multi_region(args, ALL_REGIONS) return self.create_pool(self._create_platform_application_from_meta, new_region_args) @unpack_kwargs def _create_sns_topic_from_meta(self, name, meta, region): arn = self.connection_provider.sns(region).get_topic_arn(name) if arn: _LOG.warn( '{0} sns topic exists in region {1}.'.format(name, region)) return self.describe_sns(name=name, meta=meta, region=region, arn=arn) arn = self.connection_provider.sns(region).create_topic( name, meta.get('tags')) event_sources = meta.get('event_sources') if event_sources: for trigger_meta in event_sources: trigger_type = trigger_meta['resource_type'] func = self.create_trigger[trigger_type] func(name, trigger_meta, region) _LOG.info('SNS topic %s in region %s created.', name, region) return self.describe_sns(name=name, meta=meta, region=region, arn=arn) def _subscribe_lambda_to_sns_topic(self, lambda_arn, topic_name, region): topic_arn = self.connection_provider.sns(region).subscribe(lambda_arn, topic_name, 'lambda') try: self.connection_provider.lambda_conn().add_invocation_permission( lambda_arn, 'sns.amazonaws.com', source_arn=topic_arn) except ClientError: _LOG.warn('The final access policy size for lambda {} is reached. ' 'The limit is 20480 bytes. ' 'Invocation permission was not added.'.format( lambda_arn)) def create_sns_subscription_for_lambda(self, lambda_arn, topic_name, region): """ Create subscription for lambda on SNS topic in specified region/regions. :type lambda_arn: str :type topic_name: str :type region: str """ if region: if isinstance(region, str): if region == 'all': for each in ALL_REGIONS: self._subscribe_lambda_to_sns_topic(lambda_arn, topic_name, each) else: if check_region_available(region, ALL_REGIONS): self._subscribe_lambda_to_sns_topic(lambda_arn, topic_name, region) elif isinstance(region, list): for each in region: if check_region_available(each, ALL_REGIONS): self._subscribe_lambda_to_sns_topic(lambda_arn, topic_name, each) else: raise AssertionError('Invalid value for SNS region: %s.', region) else: self._subscribe_lambda_to_sns_topic(lambda_arn, topic_name, self.region) def _create_cloud_watch_trigger_from_meta(self, topic_name, trigger_meta, region): required_parameters = SNS_CLOUDWATCH_TRIGGER_REQUIRED_PARAMS validate_params(topic_name, trigger_meta, required_parameters) rule_name = trigger_meta['target_rule'] topic_arn = self.connection_provider.sns(region).get_topic_arn( topic_name) self.connection_provider.cw_events(region).add_rule_target( rule_name, topic_arn) self.connection_provider.sns(region).allow_service_invoke( topic_arn, 'events.amazonaws.com') _LOG.info('SNS topic %s subscribed to cloudwatch rule %s', topic_name, rule_name) def remove_sns_topics(self, args): return self.create_pool(self._remove_sns_topic, args) @unpack_kwargs def _remove_sns_topic(self, arn, config): region = arn.split(':')[3] topic_name = config['resource_name'] # TODO delete remove_sns_topic_subscriptions when AWS will start # deleting subscriptions with related SNS topic deletion self._remove_sns_topic_subscriptions(arn) try: self.connection_provider.sns(region).remove_topic_by_arn( arn, log_not_found_error=False) _LOG.info('SNS topic %s was removed.', topic_name) return {arn: config} except ClientError as e: exception_type = e.response['Error']['Code'] if exception_type == 'ResourceNotFoundException': _LOG.warn('SNS topic %s is not found', topic_name) return {arn: config} else: raise e def _remove_sns_topic_subscriptions(self, topic_arn): region = topic_arn.split(':')[3] subscriptions = (self.connection_provider.sns(region). list_subscriptions_by_topic(topic_arn)) for subscription in subscriptions: subscription_arn = subscription['SubscriptionArn'] self.connection_provider.sns(region).unsubscribe( subscription_arn) def unsubscribe_arn(self, subscription_arn): self.connection_provider.sns().unsubscribe( subscription_arn=subscription_arn) def list_subscriptions(self): return self.connection_provider.sns().list_subscriptions() @unpack_kwargs def _create_platform_application_from_meta(self, name, meta, region): required_parameters = ['platform', 'attributes'] validate_params(name, meta, required_parameters) arn = self.connection_provider.sns(region).get_platform_application( name) if arn: _LOG.warn( '{0} SNS platform application exists in region {1}.'.format( name, region)) return self.describe_sns_application(name, meta, region, arn) platform = meta['platform'] atrbts = meta['attributes'] try: arn = self.connection_provider.sns( region).create_platform_application( name=name, platform=platform, attributes=atrbts) except ClientError as e: exception_type = e.response['Error']['Code'] if exception_type == 'InvalidParameterException': _LOG.warn('SNS application %s is already exist.', name) else: raise e _LOG.info('SNS platform application %s in region %s has been created.', name, region) return self.describe_sns_application(name, meta, region, arn) def remove_sns_application(self, args): return self.create_pool(self._remove_sns_application, args) @unpack_kwargs def _remove_sns_application(self, arn, config): region = arn.split(':')[3] application_name = config['resource_name'] try: self.connection_provider.sns(region).remove_application_by_arn( arn, log_not_found_error=False) _LOG.info('SNS application %s was removed.', application_name) return {arn: config} except ClientError as e: exception_type = e.response['Error']['Code'] if exception_type == 'ResourceNotFoundException': _LOG.warn('SNS application %s is not found', application_name) return {arn: config} else: raise e