syndicate/connection/cloud_watch_connection.py (413 lines of code) (raw):

""" Copyright 2018 EPAM Systems, Inc. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. """ import json import uuid from json import dumps from typing import Optional from boto3 import client from botocore.exceptions import ClientError from syndicate.commons.log_helper import get_logger from syndicate.connection.helper import apply_methods_decorator, retry from syndicate.core.constants import ( POSSIBLE_RETENTION_DAYS, DEFAULT_LOGS_EXPIRATION ) _LOG = get_logger(__name__) def get_lambda_log_group_name(lambda_name): return '/aws/lambda/' + lambda_name @apply_methods_decorator(retry()) class LogsConnection(object): """ CloudWatch Log connection class.""" def __init__(self, region=None, aws_access_key_id=None, aws_secret_access_key=None, aws_session_token=None): self.client = client('logs', region, aws_access_key_id=aws_access_key_id, aws_secret_access_key=aws_secret_access_key, aws_session_token=aws_session_token) _LOG.debug('Opened new Cloudwatch logs connection.') def delete_log_group_name(self, name): """ Delete all existing logs. :type name: str """ self.client.delete_log_group(logGroupName=name) def create_subscription(self, log_group_name, filter_name, lambda_arn, filter_pattern=''): """ Subscribes provided lambda to the provided log group. :type log_group_name: str :type filter_name: str :type lambda_arn: str :type filter_pattern: str """ log_group_name = get_lambda_log_group_name(log_group_name) self.client.put_subscription_filter(logGroupName=log_group_name, filterName=filter_name, filterPattern=filter_pattern, destinationArn=lambda_arn) def create_log_group_with_retention_days(self, group_name: str, retention_in_days: int, tags: dict = None): """ Creates a log group for provided lambda function and sets the retention . :type group_name: str :type retention_in_days: int :type tags: dict """ if retention_in_days == 0: retention_in_days = POSSIBLE_RETENTION_DAYS[-1] if retention_in_days not in POSSIBLE_RETENTION_DAYS: _LOG.warning( f"Invalid value for 'logs_expiration': {retention_in_days}. " f"Possible values: {', '.join(map(str, POSSIBLE_RETENTION_DAYS))}" f" or 0 for max limit. Set default {DEFAULT_LOGS_EXPIRATION}" ) retention_in_days = DEFAULT_LOGS_EXPIRATION log_group_name = get_lambda_log_group_name(group_name) params = dict( logGroupName=log_group_name ) if tags: params['tags'] = tags self.client.create_log_group(**params) self.client.put_retention_policy( logGroupName=log_group_name, retentionInDays=retention_in_days ) def update_log_group_retention_days(self, group_name: str, retention_in_days: int): """ Updates the retention of a log group for provided lambda function. :type group_name: str :type retention_in_days: int """ if retention_in_days == 0: retention_in_days = POSSIBLE_RETENTION_DAYS[-1] if retention_in_days not in POSSIBLE_RETENTION_DAYS: _LOG.warning( f"Invalid value for 'logs_expiration': {retention_in_days}. " f"Possible values: {', '.join(map(str, POSSIBLE_RETENTION_DAYS))}" f" or 0 for max limit. Set default {DEFAULT_LOGS_EXPIRATION}" ) retention_in_days = DEFAULT_LOGS_EXPIRATION log_group_name = get_lambda_log_group_name(group_name) try: res = self.client.describe_log_groups( logGroupNamePrefix=log_group_name) except Exception as e: _LOG.warning(f"Error on describing log group: {log_group_name}. " f"Error: {str(e)}") return if not res.get('logGroups'): _LOG.warning(f"Log group does not exist: {log_group_name}.") return self.client.put_retention_policy( logGroupName=log_group_name, retentionInDays=retention_in_days ) _LOG.info( f"Successfully updated the cloudWatch log group: {log_group_name}") def get_log_group_arns(self): """ Returns ARNs for each log group that currently exists. """ response = self.get_all_log_groups() return [each['arn'] for each in response] def get_log_group_names(self): """ Get all log group names from CloudWatch Log.""" response = self.get_all_log_groups() return [each['logGroupName'] for each in response] def get_all_log_groups(self): groups = [] response = self.client.describe_log_groups() groups.extend(response.get('logGroups')) token = response.get('nextToken') while token: response = self.client.describe_log_groups(nextToken=token) groups.extend(response.get('logGroups')) token = response.get('nextToken') return groups def get_log_group_by_lambda_name(self, lambda_name: str): group = None response = self.client.describe_log_groups( logGroupNamePattern=lambda_name ) group_data = response.get("logGroups") if group_data: group = group_data[0] return group @apply_methods_decorator(retry()) class EventConnection(object): """ CloudWatch Event connection class.""" def __init__(self, region=None, aws_access_key_id=None, aws_secret_access_key=None, aws_session_token=None): self.client = client('events', region, aws_access_key_id=aws_access_key_id, aws_secret_access_key=aws_secret_access_key, aws_session_token=aws_session_token) _LOG.debug('Opened new Cloudwatch events connection.') def create_schedule_rule(self, name, expression, tags=None, state='ENABLED'): """ Create CloudWatch schedule rule for resource invocation. :type name: str :type expression: str :param expression: e.g. rate(1 hour) :type state: str :param state: 'ENABLED'/'DISABLED' :param tags: List of resource tags key-value pairs """ params = dict( Name=name, ScheduleExpression=expression, State=state, Description=name, ) if tags: params['Tags'] = tags self.client.put_rule(**params) def create_ec2_rule(self, name, instances=None, instance_states=None, tags=None, state='ENABLED'): """ Create CloudWatch ec2 rule for resource invocation. :type name: str :type instances: list :type instance_states: list :type state: str :type tags: list of dict """ event_pattern = { "source": ["aws.ec2"], "detail-type": ["EC2 Instance State-change Notification"] } if instances: event_pattern["detail"] = {"instance-id": instances} if instance_states: if event_pattern.get("detail"): event_pattern.get("detail").update({"state": instance_states}) else: event_pattern["detail"] = {"state": instance_states} params = dict( Name=name, EventPattern=dumps(event_pattern), State=state, Description=name ) if tags: params['Tags'] = tags self.client.put_rule(**params) def create_api_call_rule(self, name, aws_service=None, operations=None, custom_pattern=None, tags=None, state='ENABLED'): """ To select ANY operation do not set 'operations' param. :type aws_service: :param aws_service: e.g. 'ec2' :type name: str :type operations: list :type custom_pattern: dict :param operations: :type state: str :type tags: list of dict """ if custom_pattern: event_pattern = custom_pattern elif aws_service: event_pattern = { "detail-type": [ "AWS API Call via CloudTrail" ], "detail": { "eventSource": [ "{0}.amazonaws.com".format(aws_service) ] } } if operations: event_pattern['detail']['eventName'] = operations else: raise AssertionError( f'aws_service or custom_pattern should be specified for rule ' f'with "api_call" type! Resource: {name}') params = dict( Name=name, EventPattern=dumps(event_pattern), State=state, Description=name ) if tags: params['Tags'] = tags self.client.put_rule(**params) def get_rule(self, rule_name): try: return self.client.describe_rule(Name=rule_name) except ClientError as e: if 'ResourceNotFoundException' in str(e): pass # valid exception else: raise e def get_rule_arn(self, name): rule = self.get_rule(name) if rule: return rule['Arn'] def add_rule_target(self, rule_name: str, target_arn: str, input_: Optional[dict] = None): """Add to CloudWatch rule targets for invocations :type rule_name: str :type target_arn: str :type input_: Optional[dict] """ target = {'Id': str(uuid.uuid1()), 'Arn': target_arn} if input_ and isinstance(input_, dict): target['Input'] = json.dumps(input_) self.client.put_targets(Rule=rule_name, Targets=[target, ]) def add_rule_sf_target(self, rule_name, target_arn, input, role_arn): """ Add to CloudWatch rule targets for invocations. :type rule_name: str :type target_arn: str """ self.client.put_targets(Rule=rule_name, Targets=[{ 'Id': str(uuid.uuid1()), 'Arn': target_arn, 'Input': json.dumps(input), 'RoleArn': role_arn }]) def list_targets(self, rule_name): """ Lists the targets assigned to the specified rule. :type rule_name: str """ return self.client.list_targets_by_rule(Rule=rule_name) def list_rules_by_target(self, target_arn): response = self.client.list_rule_names_by_target(TargetArn=target_arn) rule_names = response.get('RuleNames', []) return rule_names def list_rules(self): """ Get list of rules for region.""" rules = [] response = self.client.list_rules() rules.extend(response.get('Rules')) token = response.get('NextToken') while token: response = self.client.list_rules(NextToken=token) rules.extend(response.get('Rules')) token = response.get('NextToken') return rules def clear_rules(self): """ Clear all rules that exist in region.""" rules = self.list_rules() if rules: for rule in rules: self.remove_rule(rule['Name']) def remove_rule(self, rule_name, log_not_found_error=True): """ Remove single rule by name with targets. :type rule_name: str :type log_not_found_error: boolean, parameter is needed for proper log handling in the retry decorator """ response = self.client.list_targets_by_rule(Rule=rule_name) if response['Targets']: targets = response['Targets'] target_ids = [target['Id'] for target in targets] self.client.remove_targets(Rule=rule_name, Ids=target_ids) self.client.delete_rule(Name=rule_name) def list_targets_by_rule(self, rule_name): targets = [] response = self.client.list_targets_by_rule(Rule=rule_name) targets.extend(response.get('Targets')) token = response.get('NextToken') while token: response = self.client.list_targets_by_rule(Rule=rule_name, NextToken=token) targets.extend(response.get('Targets')) token = response.get('NextToken') return targets def remove_targets(self, rule_name, target_ids): self.client.remove_targets(Rule=rule_name, Ids=target_ids) def describe_event_bus(self): return self.client.describe_event_bus() def add_event_bus_permissions(self, account_id, action='events:PutEvents'): """ Permits the specified AWS account to put events to current account's default event bus. http://boto3.readthedocs.io/en/latest/reference/services/events.html#CloudWatchEvents.Client.put_permission :type account_id: str :param action: Currently, this must be 'events:PutEvents' :type action: str """ event_bus = self.describe_event_bus() statement_id = _find_statement_id_in_event_bus_policy(account_id, event_bus) if not statement_id: self.client.put_permission(Action=action, Principal=account_id, StatementId=str(uuid.uuid1())) def remove_event_bus_permissions(self, account_id): """Revokes the permission of another AWS account to be able to put events to current account's default event bus. :type account_id: str """ event_bus = self.describe_event_bus() statement_id = _find_statement_id_in_event_bus_policy(account_id, event_bus) if statement_id: self.client.remove_permission(StatementId=statement_id) def _find_statement_id_in_event_bus_policy(account_id, event_bus): if event_bus and event_bus.get('Policy'): policy = json.loads(event_bus.get('Policy')) for statement in policy.get('Statement'): principal = statement['Principal'] if isinstance(principal, str): if account_id == principal: return statement['Sid'] else: if account_id in principal['AWS']: return statement['Sid'] @apply_methods_decorator(retry()) class MetricConnection(object): """ CloudWatch Log connection class.""" def __init__(self, region=None, aws_access_key_id=None, aws_secret_access_key=None, aws_session_token=None): self.client = client('cloudwatch', region, aws_access_key_id=aws_access_key_id, aws_secret_access_key=aws_secret_access_key, aws_session_token=aws_session_token) _LOG.debug('Opened new Cloudwatch metric connection.') def put_metric_data(self, name_space, metric_name, value, dimensions=None, timestamp=None, statistic_values=None, unit=None): """ :param value: metric value :type name_space: str :type metric_name: str :type dimensions: list of dicts :param dimensions: [ { 'Name': 'string', 'Value': 'string' } ] :type timestamp: datetime :type statistic_values: dict :param statistic_values: { 'SampleCount': 123.0, 'Sum': 123.0, 'Minimum': 123.0, 'Maximum': 123.0 } :type unit: str :param unit: 'Seconds'|'Microseconds'|'Milliseconds'|'Bytes'| 'Kilobytes'|'Megabytes'|'Gigabytes'|'Terabytes'|'Bits'|'Kilobits'| 'Megabits'|'Gigabits'|'Terabits'|'Percent'|'Count'|'Bytes/Second'| 'Kilobytes/Second'|'Megabytes/Second'|'Gigabytes/Second'| 'Terabytes/Second'|'Bits/Second'|'Kilobits/Second'|'Megabits/Second'| 'Gigabits/Second'|'Terabits/Second'|'Count/Second'|'None' """ metric_data = { 'MetricName': metric_name, 'Value': value } if dimensions: metric_data['Dimensions'] = dimensions if statistic_values: metric_data['StatisticValues'] = statistic_values if unit: metric_data['Unit'] = unit if timestamp: metric_data['Timestamp'] = timestamp self.client.put_metric_data(Namespace=name_space, MetricData=[metric_data]) def put_metric_alarm(self, alarm_name, metric_name, namespace, period, evaluation_periods, threshold, comparison_operator, statistic, actions_enabled=None, ok_actions=None, alarm_actions=None, insufficient_data_actions=None, extended_statistic=None, dimensions=None, unit=None, description=None, datapoints=None, evaluate_low_sample_count_percentile=None, tags=None): """ :type alarm_name: str :type metric_name: str :type namespace: str :type period: int :type evaluation_periods: int :type threshold: float :type comparison_operator: str :param comparison_operator: 'GreaterThanOrEqualToThreshold'| 'GreaterThanThreshold'|'LessThanThreshold'|'LessThanOrEqualToThreshold' :type actions_enabled: bool :type ok_actions: list of strings :type alarm_actions: list of strings :type insufficient_data_actions: list of strings :type statistic: str :param statistic: 'SampleCount'|'Average'|'Sum'|'Minimum'|'Maximum' :type extended_statistic: str :type dimensions: list of dicts :param dimensions: [{ 'Name': 'string', 'Value': 'string' },] :type unit: str :param unit: 'Seconds'|'Microseconds'|'Milliseconds'|'Bytes'| 'Kilobytes'|'Megabytes'|'Gigabytes'|'Terabytes'|'Bits'|'Kilobits'| 'Megabits'|'Gigabits'|'Terabits'|'Percent'|'Count'|'Bytes/Second'| 'Kilobytes/Second'|'Megabytes/Second'|'Gigabytes/Second'| 'Terabytes/Second'|'Bits/Second'|'Kilobits/Second'|'Megabits/Second'| 'Gigabits/Second'|'Terabits/Second'|'Count/Second'|'None' :type description: str :param description: the description for the alarm :type datapoints: int :param datapoints: number of datapoints that must be breaching to trigger the alarm :type evaluate_low_sample_count_percentile: str :param evaluate_low_sample_count_percentile: 'evaluate'|'ignore' :type tags: list of dicts: List of resource tags key-value pairs """ params = dict(AlarmName=alarm_name, MetricName=metric_name, Namespace=namespace, Period=period, Threshold=threshold, EvaluationPeriods=evaluation_periods, ComparisonOperator=comparison_operator, Statistic=statistic) if actions_enabled: params['ActionsEnabled'] = actions_enabled if ok_actions: params['OKActions'] = ok_actions if alarm_actions: params['AlarmActions'] = alarm_actions if insufficient_data_actions: params['InsufficientDataActions'] = insufficient_data_actions if extended_statistic: params['ExtendedStatistic'] = extended_statistic if unit: params['Unit'] = unit if dimensions: params['Dimensions'] = dimensions if description: params['AlarmDescription'] = description if evaluate_low_sample_count_percentile: params['EvaluateLowSampleCountPercentile'] = \ evaluate_low_sample_count_percentile if datapoints: params['DatapointsToAlarm'] = datapoints if tags: params['Tags'] = tags self.client.put_metric_alarm(**params) def remove_alarms(self, alarm_names, log_not_found_error=True): """ :type alarm_names: str or list :type log_not_found_error: boolean, parameter is needed for proper log handling in the retry decorator """ if isinstance(alarm_names, str): alarm_names = [alarm_names] self.client.delete_alarms(AlarmNames=alarm_names) def alarm_list(self, alarm_names): """ :type alarm_names: str or list """ if isinstance(alarm_names, str): alarm_names = [alarm_names] alarms = [] response = self.client.describe_alarms(AlarmNames=alarm_names) token = response.get('NextToken') alarms.extend(response.get('MetricAlarms')) while token: response = self.client.describe_alarms(AlarmNames=alarm_names, NextToken=token) token = response.get('NextToken') alarms.extend(response.get('MetricAlarms')) return alarms def all_alarms(self): alarms = [] response = self.client.describe_alarms() token = response.get('NextToken') alarms.extend(response.get('MetricAlarms')) while token: response = self.client.describe_alarms(NextToken=token) token = response.get('NextToken') alarms.extend(response.get('MetricAlarms')) return alarms def is_alarm_exists(self, alarm_names): """ :type alarm_names: str or list """ if isinstance(alarm_names, str): alarm_names = [alarm_names] alarms = self.alarm_list(alarm_names) existing_names = [each['AlarmName'] for each in alarms] for alarm_name in alarm_names: if alarm_name not in existing_names: return False return True def list_metrics(self, name=None, namespace=None, dimensions=None): params = dict() if name: params['MetricName'] = name if namespace: params['Namespace'] = namespace if dimensions: params['Dimensions'] = dimensions metrics = [] response = self.client.list_metrics(**params) metrics.extend(response.get('Metrics', [])) token = response.get('NextToken') while token: params['NextToken'] = token response = self.client.list_metrics(**params) metrics.extend(response.get('Metrics', [])) token = response.get('NextToken') return metrics def describe_alarms(self, alarm_names=None, alarm_name_prefix=None, state_value=None, action_prefix=None): params = dict() if alarm_names: params['AlarmNames'] = alarm_names if alarm_name_prefix: params['AlarmNamePrefix'] = alarm_name_prefix if state_value: params['StateValue'] = state_value if action_prefix: params['ActionPrefix'] = action_prefix alarms = [] response = self.client.describe_alarms(**params) alarms.extend(response.get('MetricAlarms', [])) token = response.get('NextToken') while token: params['NextToken'] = token response = self.client.describe_alarms(**params) alarms.extend(response.get('MetricAlarms', [])) token = response.get('NextToken') return alarms