docker/services/recommendation_history_service.py (220 lines of code) (raw):

from typing import List import datetime from commons.constants import ACTION_EMPTY, ACTION_ERROR, ACTION_SCHEDULE, \ ACTION_SCALE_UP, ACTION_SCALE_DOWN, ACTION_CHANGE_SHAPE, ACTION_SPLIT, \ ACTION_SHUTDOWN from commons.log_helper import get_logger from models.recommendation_history import RecommendationHistory, \ FeedbackStatusEnum, RecommendationTypeEnum _LOG = get_logger('r8s-recommendation-history-service') RESIZE_ACTIONS = [ACTION_SCALE_UP, ACTION_SCALE_DOWN, ACTION_CHANGE_SHAPE, ACTION_SPLIT] class RecommendationHistoryService: def create(self, instance_id: str, job_id: str, customer: str, tenant: str, region: str, current_instance_type: str, savings: dict, schedule: list, recommended_shapes: list, actions: list, instance_meta: dict) -> List[RecommendationHistory]: if ACTION_EMPTY in actions or ACTION_ERROR in actions: _LOG.debug(f'Skipping saving result to history collection. ' f'Actions: \'{actions}\'') return [] result = [] current_month_price_usd = self._get_current_month_price( savings=savings ) if ACTION_SCHEDULE in actions: schedule_savings = self._filter_savings_usd( savings=savings, action=ACTION_SCHEDULE ) recommendation_item = self._create_or_update_recent( instance_id=instance_id, job_id=job_id, customer=customer, tenant=tenant, region=region, current_instance_type=current_instance_type, current_month_price_usd=current_month_price_usd, recommendation_type=ACTION_SCHEDULE, recommendation=schedule, savings=schedule_savings, instance_meta=instance_meta ) result.append(recommendation_item) resize_action = self._get_resize_action(actions=actions) if resize_action: resize_savings = self._filter_savings_usd( savings=savings, action=resize_action ) recommendation_item = self._create_or_update_recent( instance_id=instance_id, job_id=job_id, customer=customer, tenant=tenant, region=region, current_instance_type=current_instance_type, current_month_price_usd=current_month_price_usd, recommendation_type=resize_action, recommendation=recommended_shapes, savings=resize_savings, instance_meta=instance_meta ) result.append(recommendation_item) if ACTION_SHUTDOWN in actions: shutdown_savings = self._filter_savings_usd( savings=savings, action=ACTION_SHUTDOWN ) recommendation_item = self._create_or_update_recent( instance_id=instance_id, job_id=job_id, customer=customer, tenant=tenant, region=region, current_instance_type=current_instance_type, current_month_price_usd=current_month_price_usd, recommendation_type=ACTION_SHUTDOWN, recommendation=None, savings=shutdown_savings, instance_meta=instance_meta ) result.append(recommendation_item) return result def _create_or_update_recent(self, instance_id, job_id, customer, tenant, region, current_instance_type, current_month_price_usd, recommendation_type, recommendation, savings, instance_meta): recent_recommendations = self.get_recent_recommendation( instance_id=instance_id, recommendation_type=recommendation_type, without_feedback=True ) if recommendation is not None and not isinstance(recommendation, list): recommendation = [recommendation] if not recent_recommendations: _LOG.debug(f'No recent \'{recommendation_type}\' recommendation ' f'found for instance \'{instance_id}\'. Creating new ' f'record.') return RecommendationHistory( instance_id=instance_id, job_id=job_id, customer=customer, tenant=tenant, region=region, current_instance_type=current_instance_type, current_month_price_usd=current_month_price_usd, recommendation_type=recommendation_type, recommendation=recommendation, savings=savings, instance_meta=instance_meta ) recent_recommendations = list(recent_recommendations) if len(recent_recommendations) > 1: _LOG.error(f'More than one recent recommendation found. ' f'Deleting all except the most recent') for recommendation in recent_recommendations[1:]: self.delete(recommendation=recommendation) recent_recommendation: RecommendationHistory = recent_recommendations[0] _LOG.debug(f'Recent recommendation found, updating.') recent_recommendation.update( job_id=job_id, customer=customer, tenant=tenant, region=region, added_at=datetime.datetime.utcnow(), current_instance_type=current_instance_type, current_month_price_usd=current_month_price_usd, recommendation=recommendation, savings=savings, instance_meta=instance_meta ) return recent_recommendation def get_recent_recommendation(self, instance_id, recommendation_type, without_feedback=False): threshold_date = self._get_week_start_dt() query = { 'instance_id': instance_id, 'recommendation_type': recommendation_type, 'added_at__gt': threshold_date } if without_feedback: query['feedback_dt'] = None query['feedback_status'] = None return RecommendationHistory.objects(**query).order_by('-added_at') @staticmethod def get_recommendation_with_feedback(instance_id): return list(RecommendationHistory.objects( instance_id=instance_id, feedback_dt__ne=None, feedback_status__ne=None )) @staticmethod def filter_applied(recommendations: List[RecommendationHistory]): return [item for item in recommendations if item.feedback_status == FeedbackStatusEnum.APPLIED] @staticmethod def filter_resize(recommendations: List[RecommendationHistory]): allowed_recommendation_types = RecommendationTypeEnum.resize() return [item for item in recommendations if item.recommendation_type in allowed_recommendation_types] @staticmethod def is_shutdown_forbidden(recommendations: List[RecommendationHistory]): for recommendation in recommendations: if recommendation.recommendation_type != \ RecommendationTypeEnum.ACTION_SHUTDOWN: continue if recommendation.feedback_status not in \ (FeedbackStatusEnum.DONT_RECOMMEND, FeedbackStatusEnum.WRONG): continue return True return False @staticmethod def batch_save(recommendations: List[RecommendationHistory]): to_update = [] to_create = [] for recommendation in recommendations: if recommendation.get_json().get('_id'): to_update.append(recommendation) else: to_create.append(recommendation) if to_create: RecommendationHistory.objects.insert(to_create) for recommendation in to_update: recommendation.save() @staticmethod def save(recommendation: RecommendationHistory): recommendation.save() @staticmethod def delete(recommendation: RecommendationHistory): recommendation.delete() @staticmethod def _get_resize_action(actions): for action in actions: if action in RESIZE_ACTIONS: return action @staticmethod def _filter_savings_usd(savings: dict, action): if not savings: return saving_options = savings.get('saving_options', []) if not saving_options: return option_savings_usd = [option.get('saving_month_usd') for option in saving_options if option.get('action') == action] return [saving for saving in option_savings_usd if isinstance(saving, (int, float))] @staticmethod def _get_current_month_price(savings: dict): if not savings: return return savings.get('current_monthly_price_usd') @staticmethod def _get_week_start_dt(): now = datetime.datetime.now(datetime.timezone.utc) now = now.replace(hour=0, minute=0, second=0, microsecond=0) return now - datetime.timedelta(days=now.weekday())