syndicate/core/resources/sqs_resource.py (115 lines of code) (raw):
"""
Copyright 2018 EPAM Systems, Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
import time
from botocore.exceptions import ClientError
from syndicate.commons.log_helper import get_logger
from syndicate.core.helper import unpack_kwargs
from syndicate.core.resources.base_resource import BaseResource
from syndicate.core.resources.helper import build_description_obj
_LOG = get_logger(__name__)
FIFO_SUFFIX = '.fifo'
class SqsResource(BaseResource):
def __init__(self, sqs_conn_builder, region, account_id) -> None:
self.sqs_conn_builder = sqs_conn_builder
self.region = region
self.account_id = account_id
def create_sqs_queue(self, args):
return self.create_pool(self._create_sqs_queue_from_meta, args)
def describe_queue(self, queue_url, name, meta, resource_name, region):
response = self.sqs_conn_builder(region).get_queue_attributes(
queue_url)
arn = self._build_queue_arn(resource_name=resource_name, region=region)
return {arn: build_description_obj(response, name, meta)}
def describe_queue_from_meta(self, name, meta):
region = meta.get('region', self.region)
is_fifo = meta.get('fifo_queue', False)
resource_name = self.build_resource_name(is_fifo, name)
queue_url = self.sqs_conn_builder(region).get_queue_url(
resource_name, self.account_id)
if not queue_url:
return {}
response = self.sqs_conn_builder(region).get_queue_attributes(
queue_url)
return {
self._build_queue_arn(resource_name,
region): build_description_obj(
response, name, meta)
}
def remove_queues(self, args):
result = self.create_pool(self._remove_queue, args)
# wait to remove all queues
if args:
time.sleep(60)
return result
@unpack_kwargs
def _remove_queue(self, arn, config):
region = arn.split(':')[3]
queue_name = config['resource_name']
resource_meta = config['resource_meta']
try:
is_fifo = resource_meta.get('fifo_queue', False)
resource_name = self.build_resource_name(is_fifo, queue_name)
queue_url = self.sqs_conn_builder(region).get_queue_url(
resource_name,
self.account_id)
if queue_url:
self.sqs_conn_builder(region).delete_queue(
queue_url, log_not_found_error=False)
_LOG.info('SQS queue %s was removed.', queue_name)
else:
_LOG.warn('SQS queue %s is not found', queue_name)
return {arn: config}
except ClientError as e:
exception_type = e.response['Error']['Code']
if exception_type == 'ResourceNotFoundException':
_LOG.warn('SQS queue %s is not found', queue_name)
return {arn: config}
else:
raise e
@unpack_kwargs
def _create_sqs_queue_from_meta(self, name, meta):
region = meta.get('region', self.region)
is_fifo = meta.get('fifo_queue', False)
resource_name = self.build_resource_name(is_fifo, name)
queue_url = self.sqs_conn_builder(region).get_queue_url(resource_name,
self.account_id)
if queue_url:
_LOG.warn('SQS queue %s exists.', name)
return self.describe_queue(queue_url, name, meta, resource_name,
region)
delay_sec = meta.get('delay_seconds')
max_mes_size = meta.get('maximum_message_size')
mes_ret_period = meta.get('message_retention_period')
policy = meta.get('policy')
recieve_mes_wait_sec = meta.get('receive_message_wait_time_seconds')
redrive_policy = meta.get('redrive_policy')
vis_timeout = meta.get('visibility_timeout')
kms_master_key_id = meta.get('kms_master_key_id')
kms_data_reuse_period = meta.get('kms_data_key_reuse_period_seconds')
content_deduplication = meta.get('content_based_deduplication')
tags = meta.get('tags')
params = dict(queue_name=resource_name,
delay_seconds=delay_sec,
maximum_message_size=max_mes_size,
message_retention_period=mes_ret_period,
policy=policy,
receive_message_wait_time_seconds=recieve_mes_wait_sec,
redrive_policy=redrive_policy,
visibility_timeout=vis_timeout,
kms_master_key_id=kms_master_key_id,
kms_data_key_reuse_period_seconds=kms_data_reuse_period,
fifo_queue=is_fifo,
content_based_deduplication=content_deduplication,
tags=tags)
queue_url = self.sqs_conn_builder(region).create_queue(**params)[
'QueueUrl']
_LOG.info('Created SQS queue %s.', name)
return self.describe_queue(queue_url, name, meta, resource_name,
region)
@staticmethod
def build_resource_name(is_fifo, name):
resource_name = name
if is_fifo and not name.endswith(FIFO_SUFFIX):
resource_name += FIFO_SUFFIX
return resource_name
def _build_queue_arn(self, resource_name, region):
arn = 'arn:aws:sqs:{0}:{1}:{2}'.format(region, self.account_id,
resource_name)
return arn