syndicate/core/build/profiler_processor.py (196 lines of code) (raw):
import os
from math import ceil
from syndicate.commons.log_helper import get_logger
from syndicate.core import ResourceProvider
from syndicate.core.build.bundle_processor import load_deploy_output
from syndicate.core.constants import DATE_FORMAT_ISO_8601
from datetime import datetime, timedelta
MIN_STATISTIC_VALUE = 'Minimum'
MAX_STATISTIC_VALUE = 'Maximum'
AVG_STATISTIC_VALUE = 'Average'
SUM_STATISTIC_VALUE = "Sum"
DURATION_METRIC = "Duration"
INVOCATION_METRIC = "Invocations"
ERROR_METRIC = "Errors"
CONCURRENT_EXECUTIONS_METRIC = "ConcurrentExecutions"
SUCCESS_RATE_METRIC = f"Success rate{os.linesep}(%)"
UTC_TIMESTAMP = f'Metrics time stamp{os.linesep}(UTC)'
METRIC_NAMES = ["Invocations", "Errors", "Throttles", "Duration",
"DestinationDeliveryFailures", "DeadLetterErrors",
"IteratorAge", "ConcurrentExecutions"]
_LOG = get_logger(__name__)
def _get_cw_client():
return ResourceProvider.instance.cw_alarm().client.client
def get_lambdas_name(bundle_name, deploy_name):
output = load_deploy_output(bundle_name, deploy_name)
lambda_output = {key: value for key, value in output.items() if
value['resource_meta'].get('resource_type') == 'lambda'}
if not lambda_output:
_LOG.warning('No Lambdas to describe metrics, exiting')
return []
lambda_names = [definition['resource_name']
for _, definition in lambda_output.items()]
return lambda_names
def add_success_rate_column(prettify_metrics_dict):
invocations = []
errors = []
success_rate = []
for metric_type, metric_data in prettify_metrics_dict.items():
if metric_type.startswith(INVOCATION_METRIC):
invocations = metric_data
elif metric_type.startswith(ERROR_METRIC):
errors = metric_data
for idx, data in enumerate(invocations):
percent = round(((data - errors[idx]) / data) * 100, 2)
success_rate.append(percent)
if success_rate:
prettify_metrics_dict[SUCCESS_RATE_METRIC] = success_rate
return prettify_metrics_dict
def handle_data_points(prettify_metrics_dict, data_points, label):
for data in data_points:
unit = f"{os.linesep}({data['Unit']})".replace('Milliseconds',
'Ms')
label_unit = label + unit
time_stamp = str(data['Timestamp']).split('+')[0]
metric_data = None
statistics = [AVG_STATISTIC_VALUE, MIN_STATISTIC_VALUE,
MAX_STATISTIC_VALUE, SUM_STATISTIC_VALUE]
for statistic in statistics:
if statistic in data:
metric_data = data[statistic]
if int(metric_data) == metric_data:
metric_data = int(metric_data)
else:
metric_data = round(metric_data, 2)
if label_unit not in prettify_metrics_dict:
prettify_metrics_dict.update({label_unit: [metric_data]})
else:
prettify_metrics_dict[label_unit].append(metric_data)
if time_stamp not in prettify_metrics_dict[UTC_TIMESTAMP]:
prettify_metrics_dict[UTC_TIMESTAMP].append(time_stamp)
def process_metrics(prettify_metrics_dict, metrics):
for metric_type in metrics:
label = metric_type['Label']
data_points = metric_type['Datapoints']
data_points.sort(key=lambda date: date['Timestamp'], reverse=True)
if UTC_TIMESTAMP not in prettify_metrics_dict and data_points:
prettify_metrics_dict.update({UTC_TIMESTAMP: []})
handle_data_points(prettify_metrics_dict, data_points, label)
add_success_rate_column(prettify_metrics_dict)
return prettify_metrics_dict
def period_calculation(time_range):
single_call_data_points = 1440
divider = 60
time_range_in_sec = time_range.total_seconds()
period = time_range_in_sec / single_call_data_points
if period < divider:
allowed_min_values = [1, 5, 10, 30]
for num in allowed_min_values:
if num >= period:
period = num
break
if period not in allowed_min_values:
period = divider
else:
nearest_multiple = divider * ceil(period / divider)
period = nearest_multiple
return int(period)
def validate_time_range(from_date, to_date):
if not (from_date and to_date):
from_date = datetime.utcnow() - timedelta(hours=1)
to_date = datetime.utcnow()
else:
from_date = datetime.strptime(from_date, DATE_FORMAT_ISO_8601)
from_date = datetime.utcfromtimestamp(datetime.timestamp(from_date))
to_date = datetime.strptime(to_date, DATE_FORMAT_ISO_8601)
to_date = datetime.utcfromtimestamp(datetime.timestamp(to_date))
time_range = to_date - from_date
if time_range <= timedelta(seconds=0):
raise AssertionError(f'The parameter from_date must be more than the'
f' parameter to_date.')
return from_date, to_date, time_range
def get_metric(lambda_name, metric, statistics, from_date, to_date, period):
cw_client = _get_cw_client()
response = cw_client.get_metric_statistics(
Namespace='AWS/Lambda',
Dimensions=[
{
'Name': 'FunctionName',
'Value': lambda_name
}
],
MetricName=metric,
StartTime=from_date,
EndTime=to_date,
Period=period,
Statistics=statistics
)
return response
def save_metric_to_dict(metric_value_dict, lambda_name, response):
if lambda_name not in metric_value_dict:
metric_value_dict.update({lambda_name: [response]})
else:
metric_value_dict[lambda_name].append(response)
return metric_value_dict
def process_duration_metric(lambda_name, metric, from_date, to_date, period,
metric_value_dict):
statistics_abbreviation = {MIN_STATISTIC_VALUE: 'Min.',
AVG_STATISTIC_VALUE: 'Avg.',
MAX_STATISTIC_VALUE: 'Max.'}
for statistic in statistics_abbreviation:
response = get_metric(lambda_name, metric, [statistic],
from_date, to_date, period)
statistic_name = statistic
response['Label'] = statistics_abbreviation[
statistic_name] + ' ' + \
response['Label']
metric_value_dict = save_metric_to_dict(metric_value_dict,
lambda_name,
response)
return metric_value_dict
def process_invocation_metric(lambda_name, metric, from_date, to_date, period,
metric_value_dict):
sum_statistic = [SUM_STATISTIC_VALUE]
response = get_metric(lambda_name, metric, sum_statistic,
from_date, to_date, period)
metric_value_dict = save_metric_to_dict(metric_value_dict,
lambda_name, response)
return metric_value_dict
def process_cncr_execution_metric(lambda_name, metric, from_date, to_date,
period, metric_value_dict):
max_statistics = [MAX_STATISTIC_VALUE]
response = get_metric(lambda_name, metric, max_statistics,
from_date, to_date, period)
response['Label'] = 'Concurrent Executions'
metric_value_dict = save_metric_to_dict(metric_value_dict,
lambda_name, response)
return metric_value_dict
METRIC_TYPE_HANDLERS = {
DURATION_METRIC: process_duration_metric,
INVOCATION_METRIC: process_invocation_metric,
CONCURRENT_EXECUTIONS_METRIC: process_cncr_execution_metric
}
def process_another_metrics_types(lambda_name, metric, from_date, to_date,
period, metric_value_dict):
max_statistics = [MAX_STATISTIC_VALUE]
response = get_metric(lambda_name, metric, max_statistics,
from_date, to_date, period)
metric_value_dict = save_metric_to_dict(metric_value_dict,
lambda_name, response)
return metric_value_dict
def get_metric_statistics(bundle_name, deploy_name, from_date, to_date):
from_date, to_date, time_range = validate_time_range(from_date, to_date)
period = period_calculation(time_range)
lambda_names = get_lambdas_name(bundle_name, deploy_name)
metric_value_dict = {}
for lambda_name in lambda_names:
for metric in METRIC_NAMES:
metric_handler = METRIC_TYPE_HANDLERS.get(metric)
if metric_handler:
metric_handler(lambda_name, metric, from_date, to_date, period,
metric_value_dict)
else:
process_another_metrics_types(lambda_name, metric, from_date,
to_date, period,
metric_value_dict)
return metric_value_dict