src/lambdas/r8s_report_generator/handler.py (488 lines of code) (raw):
import itertools
import json
import math
from datetime import datetime, timedelta
from typing import List, Union
from uuid import uuid4
from modular_sdk.commons import ModularException
from modular_sdk.services.customer_service import CustomerService
from modular_sdk.services.impl.maestro_rabbit_transport_service \
import MaestroRabbitMQTransport
from modular_sdk.services.tenant_service import TenantService
from commons import build_response, RESPONSE_BAD_REQUEST_CODE, \
RESPONSE_INTERNAL_SERVER_ERROR, RESPONSE_OK_CODE
from commons.constants import CUSTOMER_ATTR, TENANT_ATTR, \
RECOMMENDATION_SETTINGS_ATTR, TARGET_TIMEZONE_NAME_ATTR, TENANTS_ATTR
from commons.log_helper import get_logger
from models.recommendation_history import RecommendationHistory, \
RecommendationTypeEnum
from services import SERVICE_PROVIDER
from services.abstract_lambda import AbstractLambda
from services.algorithm_service import AlgorithmService
from services.environment_service import EnvironmentService
from services.job_service import JobService
from services.recommendation_history_service import \
RecommendationHistoryService
from services.rightsizer_parent_service import RightSizerParentService
_LOG = get_logger('r8s-report-generator')
MAX_PRIORITY_RESOURCES = 10
MAX_RESOURCES_PER_TYPE = 10
COMMAND_NAME = 'SEND_MAIL'
TENANT_REPORT_TYPE = 'RIGHTSIZER_TENANT_REPORT'
UTC_TIMEZONE_NAME = 'Etc/UTC'
class ReportGenerator(AbstractLambda):
def __init__(self, job_service: JobService,
tenant_service: TenantService,
customer_service: CustomerService,
recommendation_service: RecommendationHistoryService,
maestro_rabbitmq_service: MaestroRabbitMQTransport,
parent_service: RightSizerParentService,
algorithm_service: AlgorithmService,
environment_service: EnvironmentService):
self.job_service = job_service
self.tenant_service = tenant_service
self.customer_service = customer_service
self.recommendation_service = recommendation_service
self.maestro_rabbitmq_service = maestro_rabbitmq_service
self.parent_service = parent_service
self.algorithm_service = algorithm_service
self.environment_service = environment_service
def validate_request(self, event):
pass
def handle_request(self, event, context):
customer_name = event.get(CUSTOMER_ATTR)
tenants = event.get(TENANTS_ATTR)
customer = self.customer_service.get(name=customer_name)
if not customer:
_LOG.error(f'Customer \'{customer_name}\' does not exist.')
return build_response(
code=RESPONSE_BAD_REQUEST_CODE,
content=f'Customer \'{customer_name}\' does not exist.'
)
if not tenants:
_LOG.debug(f'\'{TENANTS_ATTR}\' attribute must be specified.')
return build_response(
code=RESPONSE_BAD_REQUEST_CODE,
content=f'\'{TENANTS_ATTR}\' attribute must be specified.'
)
response = {}
for tenant_name in tenants:
try:
tenant = self.tenant_service.get(tenant_name=tenant_name)
if not tenant:
_LOG.error(f'Tenant \'{tenant_name}\' does not exist.')
return build_response(
code=RESPONSE_BAD_REQUEST_CODE,
content=f'Tenant \'{tenant_name}\' does not exist.'
)
processing_days = self.environment_service. \
mail_report_process_days()
priority_saving_threshold = self.environment_service. \
mail_report_high_priority_threshold()
_LOG.debug(f'Generating report for \'{customer_name}\' tenant '
f'\'{tenant_name}\'. Processing days: '
f'{processing_days}, high priority saving '
f'threshold: {priority_saving_threshold}')
report = self.generate_report(
customer=customer, tenant=tenant,
processing_days=processing_days,
priority_saving_threshold= \
priority_saving_threshold)
_LOG.debug(f'Preparing request for sending to maestro')
formatted_report = self.prepare_request(report=report)
_LOG.debug(f'Formatted report: {formatted_report}')
_LOG.debug(f'Sending request to Maestro')
response_code, response_message = \
self._send_notification_to_m3(json_model=formatted_report)
_LOG.debug(f'Response: {response_message}')
response[tenant_name] = response_message
except Exception as e:
message = f'Exception occurred while sending report for ' \
f'tenant: \'{tenant_name}\': {e}'
_LOG.error(message)
response[tenant_name] = message
return build_response(
code=RESPONSE_OK_CODE,
content=response
)
def generate_report(self, customer, tenant, processing_days,
priority_saving_threshold):
processing_from_date, processing_to_date = \
self.get_processing_date_range(processing_days)
recommendations = self.recommendation_service.list(
customer=customer.name,
tenant=tenant.name,
from_dt=processing_from_date
)
if not recommendations:
_LOG.error(f'No recommendations found '
f'for tenant \'{tenant.name}\'')
return build_response(
code=RESPONSE_BAD_REQUEST_CODE,
content=f'No recommendations found '
f'for tenant \'{tenant.name}\''
)
_LOG.debug(f'Filtering recommendation to include only one '
f'recommendations from the last job with the resource')
recommendations = self.filter_latest_job_resource(
recommendations=recommendations
)
_LOG.debug(f'Formatting recommendations')
formatted = []
for recommendation in recommendations:
formatted_recommendation = self.format_recommendation(
recommendation=recommendation)
formatted.append(formatted_recommendation)
priority_resources = self.get_priority(
formatted_recommendations=formatted,
saving_threshold=priority_saving_threshold)
_LOG.debug(f'Calculating total summary')
total_summary = self._resources_summary(resources=formatted)
_LOG.debug(f'Total summary: {total_summary}')
if len(priority_resources) > MAX_PRIORITY_RESOURCES:
priority_resources = priority_resources[0:MAX_PRIORITY_RESOURCES]
_LOG.debug(f'Dividing resources by recommendation type')
by_type = self.divide_by_recommendation_type(
formatted,
max_per_type=MAX_RESOURCES_PER_TYPE)
_LOG.debug(f'Calculating displayed items summary')
displayed_resources = list(itertools.chain.from_iterable(
by_type.values()))
report_summary = self._resources_summary(
resources=list(displayed_resources))
_LOG.debug(f'Displayed resources summary: {report_summary}')
job_id = self.get_most_frequent_job_id(recommendations=recommendations)
report_item = {
'summary': {
"total": total_summary,
"displayed": report_summary
},
'high_priority': priority_resources,
'detailed': by_type,
"from": self.to_milliseconds(processing_from_date),
"to": self.to_milliseconds(processing_to_date),
CUSTOMER_ATTR: customer.name,
TENANT_ATTR: tenant.name,
"timezone": self.resolve_timezone(job_id=job_id)
}
_LOG.debug(f'Report: {report_item}')
return report_item
def _send_notification_to_m3(self, json_model: Union[list, dict]):
try:
code, status, response = self.maestro_rabbitmq_service.send_sync(
command_name=COMMAND_NAME,
parameters=json_model,
is_flat_request=False, async_request=False,
secure_parameters=None, compressed=True)
_LOG.debug(f'Response code: {code}, response message: {response}')
return code, response
except ModularException as e:
_LOG.error(f'Modular error: {e}')
return build_response(
code=RESPONSE_INTERNAL_SERVER_ERROR,
content='An error occurred while sending the report.'
'Please contact the support team.'
)
@staticmethod
def to_milliseconds(dt: datetime):
return int(dt.timestamp()) * 1000
@staticmethod
def get_most_frequent_job_id(recommendations: List[RecommendationHistory]):
job_ids = [recommendation.job_id for recommendation
in recommendations if recommendation.job_id]
return max(set(job_ids), key=job_ids.count)
@staticmethod
def prepare_request(report: dict):
return [
{
'viewType': 'm3',
'model': {
"uuid": str(uuid4()),
"notificationType": TENANT_REPORT_TYPE,
"notificationAsJson": json.dumps(
{**report,
'report_type': TENANT_REPORT_TYPE},
separators=(",", ":")),
"notificationProcessorTypes": ["MAIL"]
}
}
]
@staticmethod
def get_processing_date_range(processing_days):
stop = datetime.now()
start = stop - timedelta(days=processing_days)
return start, stop
@staticmethod
def filter_latest_job_resource(
recommendations: List[RecommendationHistory]):
recommendations = sorted(list(recommendations),
key=lambda i: i['added_at'],
reverse=True)
resource_mapping = {}
result = []
resource_job_id_mapping = {}
for recommendation in recommendations:
instance_id = recommendation.instance_id
recommendation_type = recommendation.recommendation_type.value
if not recommendation.current_month_price_usd \
or not recommendation.savings \
or not recommendation.current_instance_type:
continue
if instance_id not in resource_mapping:
resource_job_id_mapping[instance_id] = recommendation.job_id
resource_mapping[instance_id] = {}
if recommendation_type not in resource_mapping[instance_id] \
and recommendation.job_id == resource_job_id_mapping.get(
instance_id):
resource_mapping[instance_id][recommendation_type] = \
recommendation
result.append(recommendation)
return result
def get_priority(self, formatted_recommendations: list,
saving_threshold: int):
resource_mapping = {}
for recommendation in formatted_recommendations:
resource_id = recommendation.get('resource_id')
recommendation_type = recommendation.get('recommendation_type')
if resource_id not in resource_mapping:
resource_mapping[resource_id] = {}
if recommendation_type not in resource_mapping[resource_id]:
resource_mapping[resource_id][
recommendation_type] = recommendation
for recommendation in resource_mapping.values():
recommendation['estimated_savings'] = \
self._get_resource_saving(recommendation)
if saving_threshold and saving_threshold > 0:
_LOG.debug(f'Filtering priority resources with saving gt '
f'{saving_threshold}')
resource_mapping = {k: v for k, v in resource_mapping.items()
if max(v.get('estimated_savings')) >
saving_threshold}
_LOG.debug(f'Sorting priority resources by savings')
priority_resources = sorted(resource_mapping.values(),
key=lambda m: max(
m.get('estimated_savings')),
reverse=True)
result = []
for resource in priority_resources:
recommendation_keys = [key for key in resource.keys() if
key.isupper()]
recommendation = resource[recommendation_keys[0]]
current_price = recommendation.get('current_price')
item = {
'resource_id': recommendation.get('resource_id'),
'current_price': current_price,
'current_instance_type': recommendation.get(
'current_instance_type'),
"region": recommendation.get('region'),
'estimated_saving': resource.get('estimated_savings'),
'recommendations': {}
}
for key in recommendation_keys:
recommendation = resource[key]
item['recommendations'][key] = {
"recommendation": recommendation.get('recommendation'),
"estimated_saving": recommendation.get('estimated_saving')
}
result.append(item)
return result
def _get_resource_saving(self, resource: dict):
saving_percent_options = []
min_saving_percent = self._get_min_saving_percent(resource=resource)
saving_percent_options.append(min_saving_percent)
max_saving_percent = self._get_max_saving_percent(resource=resource)
saving_percent_options.append(max_saving_percent)
saving_percent_options = sorted(list(set(saving_percent_options)))
recommendation_key = [key for key in resource.keys() if key.isupper()][
0]
month_price = resource[recommendation_key]['current_price']
return [round(month_price * percent / 100, 2)
for percent in saving_percent_options]
def _get_min_saving_percent(self, resource):
min_percents = []
for recommendation_type, recommendation in resource.items():
saving_percent = recommendation.get('saving_percent')
if isinstance(saving_percent, int):
min_percents.append(saving_percent)
else:
min_percents.append(min(saving_percent))
while len(min_percents) > 1 and max(min_percents) > 0:
min_percents.remove(max(min_percents))
return self.get_saving_percent(option_saving_percents=min_percents)
def _get_max_saving_percent(self, resource):
max_percents = []
for recommendation_type, recommendation in resource.items():
saving_percent = recommendation.get('saving_percent')
if isinstance(saving_percent, int):
max_percents.append(saving_percent)
else:
max_percents.append(max(saving_percent))
while len(max_percents) > 1 and min(max_percents) < 0:
max_percents.remove(min(max_percents))
return self.get_saving_percent(option_saving_percents=max_percents)
@staticmethod
def get_saving_percent(option_saving_percents):
if len(option_saving_percents) == 1:
return option_saving_percents[0]
option_saving_percents = [1 - round(item / 100, 2)
for item in option_saving_percents]
return (1 - math.prod(option_saving_percents)) * 100
@staticmethod
def divide_by_recommendation_type(recommendations: list, max_per_type):
result = {}
for recommendation in recommendations:
recommendation_type = recommendation.get('recommendation_type')
if recommendation_type not in result:
result[recommendation_type] = []
result[recommendation_type].append(recommendation)
for recommendation_type in result.keys():
result[recommendation_type] = sorted(
result[recommendation_type],
key=lambda r: max(r.get('estimated_saving')),
reverse=True
)
if len(result[recommendation_type]) > max_per_type:
type_result = result[recommendation_type][0:max_per_type]
result[recommendation_type] = type_result
return result
def format_recommendation(self, recommendation: RecommendationHistory):
recommendation_type = recommendation.recommendation_type
if recommendation_type == RecommendationTypeEnum.ACTION_SHUTDOWN:
return self._format_shutdown_recommendation(
recommendation=recommendation)
elif recommendation_type == RecommendationTypeEnum.ACTION_SCHEDULE:
return self._format_schedule_recommendation(
recommendation=recommendation)
elif recommendation_type in RecommendationTypeEnum.resize():
return self._format_resize_recommendation(
recommendation=recommendation)
@staticmethod
def _format_resize_recommendation(recommendation):
recommended_instance_types = [item.get('name') for
item in recommendation.recommendation]
estimated_savings = sorted(recommendation.savings)
if len(estimated_savings) > 2:
estimated_savings = [estimated_savings[0], estimated_savings[-1]]
saving_percents = [
round(saving_item / recommendation.current_month_price_usd,
2) * 100
for saving_item in estimated_savings]
result = {
"resource_id": recommendation.instance_id,
"recommendation": recommended_instance_types,
"recommendation_type": recommendation.recommendation_type.value,
"description": "",
"current_price": recommendation.current_month_price_usd,
"current_instance_type": recommendation.current_instance_type,
"region": recommendation.region,
"estimated_saving": estimated_savings,
"saving_percent": sorted(saving_percents)
}
if recommendation.recommendation_type == \
RecommendationTypeEnum.ACTION_SPLIT:
shape_koefs = [item.get('probability')
for item in recommendation.recommendation
if item.get('probability')]
result['probability'] = shape_koefs
return result
@staticmethod
def _format_schedule_recommendation(recommendation):
saving_percents = [
round(saving_item / recommendation.current_month_price_usd,
2) * 100
for saving_item in recommendation.savings]
return {
"resource_id": recommendation.instance_id,
"recommendation": list(recommendation.recommendation),
"recommendation_type": recommendation.recommendation_type.value,
"description": "",
"current_price": recommendation.current_month_price_usd,
"current_instance_type": recommendation.current_instance_type,
"region": recommendation.region,
"estimated_saving": sorted(recommendation.savings),
"saving_percent": sorted(saving_percents)
}
@staticmethod
def _format_shutdown_recommendation(recommendation):
return {
"resource_id": recommendation.instance_id,
"recommendation": [],
"recommendation_type": recommendation.recommendation_type.value,
"description": "",
"current_price": recommendation.current_month_price_usd,
"current_instance_type": recommendation.current_instance_type,
"region": recommendation.region,
"estimated_saving": sorted(recommendation.savings),
"saving_percent": [100]
}
@staticmethod
def _resources_summary(resources: list):
used_resource_ids = []
result_saving = [0, 0]
total_cost = 0
for resource in resources:
resource_id = resource.get('resource_id')
if resource_id in used_resource_ids:
continue
resource_price = resource.get('current_price')
total_cost += resource_price
resource_saving = resource.get('estimated_saving')
if len(resource_saving) == 1:
if resource_saving[0] < 0:
result_saving[0] += resource_saving[0]
else:
result_saving[1] += resource_saving[0]
elif len(resource_saving) == 2:
if resource_saving[0] < 0:
result_saving[0] += resource_saving[0]
if resource_saving[1] > 0:
result_saving[1] += resource_saving[1]
used_resource_ids.append(resource_id)
result_saving = [round(item, 2) for item in result_saving]
return {
"resource_count": len(used_resource_ids),
"current_estimated_cost": round(total_cost, 2),
"estimated_saving": result_saving
}
def resolve_timezone(self, job_id):
job = self.job_service.get_by_id(job_id)
if not job:
_LOG.error(f'No job with id \'{job_id}\' found')
return UTC_TIMEZONE_NAME
parent_id = job.parent_id
if not parent_id:
_LOG.error(f'Job \'{job_id}\' does not have parent_id specified.')
return UTC_TIMEZONE_NAME
parent = self.parent_service.get_parent_by_id(parent_id=parent_id)
if not parent:
_LOG.error(f'Parent with id \'{parent_id}\' does not exist.')
return UTC_TIMEZONE_NAME
parent_meta = self.parent_service.get_parent_meta(parent=parent)
if not parent_meta:
_LOG.error(f'Parent \'{parent_id}\' meta is empty.')
return UTC_TIMEZONE_NAME
algorithm_name = parent_meta.algorithm
if not algorithm_name:
_LOG.error(f'Algorithm not specified in parent \'{parent_id}\'.')
return UTC_TIMEZONE_NAME
algorithm = self.algorithm_service.get_by_name(name=algorithm_name)
if not algorithm:
_LOG.error(f'Algorithm \'{algorithm_name}\' does not exist.')
return UTC_TIMEZONE_NAME
recommendation_settings = algorithm.get_json().get(
RECOMMENDATION_SETTINGS_ATTR, {})
target_timezone_name = recommendation_settings.get(
TARGET_TIMEZONE_NAME_ATTR, UTC_TIMEZONE_NAME)
_LOG.debug(f'Resolved timezone: {target_timezone_name}')
return target_timezone_name
HANDLER = ReportGenerator(
job_service=SERVICE_PROVIDER.job_service(),
customer_service=SERVICE_PROVIDER.customer_service(),
tenant_service=SERVICE_PROVIDER.tenant_service(),
recommendation_service=SERVICE_PROVIDER.recommendation_history_service(),
maestro_rabbitmq_service=SERVICE_PROVIDER.maestro_rabbitmq_service(),
parent_service=SERVICE_PROVIDER.rightsizer_parent_service(),
algorithm_service=SERVICE_PROVIDER.algorithm_service(),
environment_service=SERVICE_PROVIDER.environment_service())
def lambda_handler(event, context):
return HANDLER.lambda_handler(event=event, context=context)