docker/services/recomendation_service.py (492 lines of code) (raw):
import json
import os
from datetime import datetime
from typing import Union
import numpy as np
import pandas as pd
from commons.constants import STATUS_ERROR, STATUS_OK, OK_MESSAGE, \
ACTION_SHUTDOWN, ACTION_SCHEDULE, ACTION_SPLIT, ACTION_EMPTY, \
STATUS_POSTPONED
from commons.exception import ExecutorException, ProcessingPostponedException
from commons.log_helper import get_logger
from commons.profiler import profiler
from models.algorithm import Algorithm
from models.base_model import CloudEnum
from models.parent_attributes import ParentMeta
from models.shape_price import OSEnum
from services.environment_service import EnvironmentService
from services.meta_service import MetaService
from services.metrics_service import MetricsService
from services.recommendation_history_service import \
RecommendationHistoryService
from services.resize.resize_service import ResizeService
from services.resize.resize_trend import ResizeTrend
from services.saving.saving_service import SavingService
from services.schedule.schedule_service import ScheduleService
_LOG = get_logger('r8s-recommendation-service')
RIGHTSIZER_SOURCE = 'RIGHTSIZER'
RIGHTSIZER_RESOURCE_TYPE = 'INSTANCE'
DEFAULT_SEVERITY = 'MEDIUM'
class RecommendationService:
def __init__(self, metrics_service: MetricsService,
schedule_service: ScheduleService,
resize_service: ResizeService,
environment_service: EnvironmentService,
saving_service: SavingService,
meta_service: MetaService,
recommendation_history_service: RecommendationHistoryService):
self.metrics_service = metrics_service
self.schedule_service = schedule_service
self.resize_service = resize_service
self.environment_service = environment_service
self.saving_service = saving_service
self.meta_service = meta_service
self.recommendation_history_service = recommendation_history_service
@profiler(execution_step=f'instance_recommendation_generation')
def process_instance(self, metric_file_path, algorithm: Algorithm,
reports_dir, instance_meta_mapping=None,
parent_meta: Union[None, ParentMeta] = None):
_LOG.debug(f'Parsing entity names from metrics file path '
f'\'{metric_file_path}\'')
df = None
schedule = None
recommended_sizes = None
resize_action = None
savings = None
advanced = None
customer, cloud, tenant, region, _, instance_id = self._parse_folders(
metric_file_path=metric_file_path
)
if cloud.upper() != CloudEnum.CLOUD_GOOGLE.value:
instance_os = OSEnum.OS_LINUX.value
else:
# use Linux pricing as a default for all clouds except GCP
instance_os = None
instance_meta = {}
if instance_meta_mapping:
instance_meta = instance_meta_mapping.get(instance_id)
_LOG.debug(f'Instance meta: {instance_meta}')
_LOG.debug(f'Loading past recommendation with feedback')
past_recommendations = self.recommendation_history_service. \
get_recommendation_with_feedback(instance_id=instance_id)
applied_recommendations = self.recommendation_history_service. \
filter_applied(recommendations=past_recommendations)
try:
_LOG.debug(f'Loading adjustments from meta')
meta_adjustments = self.meta_service.to_adjustments(
instance_meta=instance_meta
)
applied_recommendations.extend(meta_adjustments)
_LOG.debug(f'Loading df')
df = self.metrics_service.load_df(
path=metric_file_path,
algorithm=algorithm,
applied_recommendations=applied_recommendations,
instance_meta=instance_meta
)
_LOG.debug(f'Extracting instance type name')
instance_type = self.metrics_service.get_instance_type(
metric_file_path=metric_file_path,
algorithm=algorithm
)
_LOG.debug(f'Dividing into periods with different load')
shutdown_periods, low_periods, medium_periods, \
high_periods, centroids = \
self.metrics_service.divide_on_periods(
df=df,
algorithm=algorithm)
_LOG.debug(f'Got {len(high_periods)} high-load, '
f'{len(low_periods)} low-load periods')
non_straight_periods, total_length = self.get_non_straight_periods(
df=df,
grouped_periods=(low_periods, medium_periods, high_periods)
)
if non_straight_periods and total_length:
_LOG.debug(f'Calculating resize trends for several loads')
trends = self.metrics_service. \
calculate_instance_trend_multiple(
algorithm=algorithm,
non_straight_periods=non_straight_periods,
total_length=total_length
)
else:
_LOG.debug(f'Generating resize trend')
if any((low_periods, medium_periods, high_periods)):
df_ = pd.concat([*low_periods, *medium_periods,
*high_periods])
else:
df_ = df
trends = self.metrics_service.calculate_instance_trend(
df=df_,
algorithm=algorithm
)
trends = [trends]
_LOG.debug(f'Resize trend for instance \'{instance_id}\' has been '
f'calculated. ')
_LOG.debug(
f'Got {len(shutdown_periods)} shutdown periods to process')
_LOG.debug(f'Generating schedule for instance \'{instance_id}\'')
if not low_periods and not medium_periods and not high_periods:
schedule = []
else:
schedule = self.schedule_service.generate_schedule(
shutdown_periods=shutdown_periods,
recommendation_settings=algorithm.recommendation_settings,
instance_id=instance_id,
df=df,
instance_meta=instance_meta,
past_recommendations=applied_recommendations
)
_LOG.debug(f'Searching for resize action')
resize_action = ResizeTrend.get_resize_action(trends=trends)
_LOG.debug(f'Searching for better-fix instance types')
compatibility_rule = algorithm.recommendation_settings. \
shape_compatibility_rule
past_resize_recommendations = self.recommendation_history_service. \
filter_resize(recommendations=applied_recommendations)
if len(trends) == 1:
max_recommended_shapes = algorithm.recommendation_settings. \
max_recommended_shapes
recommended_sizes = self.resize_service.recommend_size(
trend=trends[0],
instance_type=instance_type,
algorithm=algorithm,
cloud=cloud.upper(),
instance_meta=instance_meta,
resize_action=resize_action,
parent_meta=parent_meta,
max_results=max_recommended_shapes,
shape_compatibility_rule=compatibility_rule,
past_resize_recommendations=past_resize_recommendations
)
else:
recommended_sizes = []
for trend in trends:
recommended_size = self.resize_service.recommend_size(
trend=trend,
instance_type=instance_type,
algorithm=algorithm,
cloud=cloud.upper(),
instance_meta=instance_meta,
resize_action=resize_action,
parent_meta=parent_meta,
max_results=1,
shape_compatibility_rule=compatibility_rule,
past_resize_recommendations=past_resize_recommendations
)
recommended_sizes.extend(recommended_size)
recommended_sizes = self.resize_service.add_price(
instances=recommended_sizes,
customer=customer,
region=region,
os=instance_os)
recommended_sizes = self._cleanup_recommended_shapes(
recommended_sizes=recommended_sizes,
current_instance_type=instance_type,
allow_same_shape=resize_action == ACTION_SPLIT
)
recommended_sizes = self.resize_service.sort_shapes(
shapes=recommended_sizes,
sort_option=algorithm.recommendation_settings.shape_sorting
)
_LOG.debug(f'Got {len(recommended_sizes)} '
f'recommended instance types')
_LOG.debug(f'Calculate instance stats')
stats = self.calculate_instance_stats(df=df)
advanced = self.calculate_advanced_stats(
df=df,
centroids=centroids,
algorithm=algorithm
)
general_action = self.get_general_action(
schedule=schedule,
shapes=recommended_sizes,
resize_action=resize_action,
stats=stats,
past_recommendations=past_recommendations
)
if not algorithm.recommendation_settings.ignore_savings:
_LOG.debug(f'Calculating savings')
savings = self.saving_service.calculate_savings(
general_actions=general_action,
current_shape=instance_type,
recommended_shapes=recommended_sizes,
schedule=schedule,
customer=customer,
region=region,
os=instance_os
)
try:
_LOG.debug(f'Saving recommendation to history')
history_items = self.recommendation_history_service.create(
instance_id=instance_id,
job_id=self.environment_service.get_batch_job_id(),
customer=customer,
tenant=tenant,
region=region,
schedule=schedule,
recommended_shapes=recommended_sizes,
current_instance_type=instance_type,
savings=savings,
actions=general_action,
instance_meta=instance_meta
)
if history_items:
_LOG.debug(
f'Saving \'{len(history_items)}\' history items')
self.recommendation_history_service.batch_save(
recommendations=history_items)
except Exception as e:
_LOG.error(f'Exception occurred while saving recommendation '
f'to history: {str(e)}')
except Exception as e:
_LOG.debug(f'Calculate instance stats with exception')
stats = self.calculate_instance_stats(df=df, exception=e)
general_action = self.get_general_action(
schedule=schedule,
shapes=recommended_sizes,
resize_action=resize_action,
stats=stats)
_LOG.debug(f'Dumping instance results')
item = self.dump_reports(
reports_dir=reports_dir,
instance_id=instance_id,
schedule=schedule,
recommended_sizes=recommended_sizes,
stats=stats,
customer=customer,
cloud=cloud,
tenant=tenant,
region=region,
meta=instance_meta,
general_action=general_action,
savings=savings,
advanced=advanced
)
return item
def dump_error_report(self, reports_dir, metric_file_path,
exception):
customer, cloud, tenant, region, _, instance_id = self._parse_folders(
metric_file_path=metric_file_path
)
stats = self.calculate_instance_stats(exception=exception)
return self.dump_reports(
reports_dir=reports_dir,
instance_id=instance_id,
schedule=[],
recommended_sizes=[],
stats=stats,
customer=customer,
cloud=cloud,
tenant=tenant,
region=region,
meta={},
general_action=STATUS_ERROR
)
def dump_reports(self, reports_dir, customer, cloud, tenant, region, stats,
instance_id=None, schedule=None, recommended_sizes=None,
meta=None, general_action=None,
savings=None, advanced=None):
if general_action and not isinstance(general_action, list):
general_action = [general_action]
item = {
'resource_id': instance_id,
'resource_type': RIGHTSIZER_RESOURCE_TYPE,
'source': RIGHTSIZER_SOURCE,
'severity': DEFAULT_SEVERITY,
'recommendation': {
'schedule': schedule,
'recommended_shapes': recommended_sizes,
'savings': savings,
'advanced': advanced,
},
'stats': stats,
'meta': meta,
'general_actions': general_action
}
self.prettify_recommendation(recommendation_item=item)
dir_path = os.path.join(reports_dir, customer, cloud, tenant)
os.makedirs(dir_path, exist_ok=True)
file_path = os.path.join(dir_path, f'{region}.jsonl')
with open(file_path, 'a') as f:
f.write(json.dumps(item))
f.write('\n')
return item
@staticmethod
def get_non_straight_periods(df, grouped_periods):
total_length = len(df)
valid_period_groups = []
total_valid_period_len = 0
for periods in grouped_periods:
grouped_len = sum([len(period) for period in
periods]) # metrics are in 5min freq, periods are in 10min
if grouped_len >= total_length * 0.05:
# eq 10%, period metrics are 10min freq, df freq is 5min
valid_period_groups.append(periods)
total_valid_period_len += grouped_len
if not valid_period_groups or len(valid_period_groups) == 1:
return None, None
return valid_period_groups, total_valid_period_len
@staticmethod
def calculate_instance_stats(df=None, exception=None):
from_date = None
to_date = None
if df is not None:
from_date = min(df.T).to_pydatetime().isoformat()
to_date = max(df.T).to_pydatetime().isoformat()
if isinstance(exception, ExecutorException):
status = STATUS_ERROR
message = exception.reason
elif isinstance(exception, ProcessingPostponedException):
status = STATUS_POSTPONED
message = str(exception)
elif exception:
status = STATUS_ERROR
message = f'Unexpected error occurred: {str(exception)}'
else:
status = STATUS_OK
message = OK_MESSAGE
return {
'from_date': from_date,
'to_date': to_date,
'status': status,
'message': message
}
def calculate_advanced_stats(self, df, algorithm, centroids):
_LOG.debug(f'Calculating advanced stats')
result = {}
metric_fields = self._get_metric_fields(df=df, algorithm=algorithm)
for metric_field in metric_fields:
_LOG.debug(f'Calculating advanced stats for {metric_field}')
metric_stats = self._get_metric_advanced_stats(
df=df,
metric_name=metric_field
)
if metric_stats:
_LOG.debug(f'{metric_field} advanced stats: {metric_stats}')
result[metric_field] = metric_stats
_LOG.debug(f'Calculating clustering stats')
cluster_stats = self._get_clusters_stats(centroids=centroids)
_LOG.debug(f'Clustering stats: {cluster_stats}')
result['clusters'] = cluster_stats
return result
@staticmethod
def _parse_folders(metric_file_path):
"""
Extracts customer, tenant, region, timestamp and instance id from
metric file path
"""
file_name = os.path.basename(metric_file_path)
folders = metric_file_path.split(os.sep)
instance_id = folders[-1][0:file_name.rindex('.')]
timestamp = folders[-2]
region = folders[-3]
tenant = folders[-4]
cloud = folders[-5]
customer = folders[-6]
return customer, cloud, tenant, region, timestamp, instance_id
def get_general_action(self, schedule, shapes, stats, resize_action,
past_recommendations: list = None):
actions = []
status = stats.get('status', '')
if status == STATUS_POSTPONED:
return [ACTION_EMPTY]
if status != STATUS_OK:
return [STATUS_ERROR]
shutdown_forbidden = False
if past_recommendations:
shutdown_forbidden = self.recommendation_history_service. \
is_shutdown_forbidden(
recommendations=past_recommendations
)
if not schedule and not shutdown_forbidden:
return [ACTION_SHUTDOWN]
if schedule and not self._is_schedule_always_run(schedule=schedule):
actions.append(ACTION_SCHEDULE)
if shapes:
shape = shapes[0]
if 'probability' in shape:
actions.append(ACTION_SPLIT)
else:
actions.append(resize_action)
if not actions:
return [ACTION_EMPTY]
return actions
@staticmethod
def _is_schedule_always_run(schedule, complete_week=True):
if len(schedule) != 1:
return False
schedule = schedule[0]
start = schedule.get('start', '')
stop = schedule.get('stop', '')
weekdays = schedule.get('weekdays', [])
if complete_week:
stop_suits = stop.startswith('23:') or stop == '00:00'
return start == '00:00' and stop_suits and len(weekdays) == 7
return start == '00:00' and stop.startswith('23:')
def prettify_recommendation(self, recommendation_item):
recommendation = recommendation_item.get('recommendation')
schedule = recommendation.get('schedule')
shapes = recommendation.get('recommended_shapes')
if shapes is None:
recommendation['recommended_shapes'] = []
if schedule is None:
recommendation['schedule'] = []
if schedule:
schedule.sort(key=lambda k: self._get_schedule_weight(k))
if len(schedule) > 5:
schedule = schedule[0:5]
recommendation['schedule'] = schedule
if not recommendation['advanced']:
recommendation.pop('advanced', None)
recommendation_item['recommendation'] = recommendation
@staticmethod
def _get_schedule_weight(schedule_item):
start = schedule_item.get('start')
stop = schedule_item.get('stop')
start_dt = datetime.strptime(start, '%H:%M')
stop_dt = datetime.strptime(stop, '%H:%M')
delta = (stop_dt - start_dt).total_seconds()
return delta * len(schedule_item.get('weekdays'))
@staticmethod
def _cleanup_recommended_shapes(recommended_sizes,
current_instance_type,
allow_same_shape=False):
result = []
for size in recommended_sizes:
instance_type = size.get('name')
if size in result:
continue
if not allow_same_shape and instance_type == current_instance_type:
continue
result.append(size)
return result
@staticmethod
def _get_metric_fields(df: pd.DataFrame, algorithm: Algorithm):
valid_columns = []
for column in list(algorithm.metric_attributes):
if any([value not in (0, -1) for value in df['cpu_load']]):
valid_columns.append(column)
return valid_columns
@staticmethod
def _get_metric_advanced_stats(df: pd.DataFrame, metric_name):
series = df[metric_name]
deciles = list(np.quantile(series, np.arange(0.1, 1, 0.1))),
deciles = [round(float(decile), 2) for decile in deciles[0]]
return {
"min": round(float(np.max(series)), 2),
"max": round(float(np.max(series)), 2),
"mean": round(float(np.mean(series)), 2),
"deciles": deciles,
"variance": round(float(np.var(series)), 2),
"standard_deviation": round(float(np.std(series)), 2)
}
@staticmethod
def _get_clusters_stats(centroids: list):
cluster_count_per_day = [len(day_centroids)
for day_centroids in centroids]
if not centroids:
return {}
quartiles = list(np.quantile(cluster_count_per_day,
np.arange(0.25, 1, 0.25))),
quartiles = [round(float(quartile), 2) for quartile in quartiles[0]]
return {
"avg_per_day": round(float(np.average(cluster_count_per_day)), 2),
"max_per_day": round(float(np.max(cluster_count_per_day)), 2),
"min_per_day": round(float(np.min(cluster_count_per_day)), 2),
"quartiles": quartiles
}