docker/services/schedule/schedule_service.py (189 lines of code) (raw):
import statistics
from datetime import datetime, timedelta
from typing import List
import pandas
from commons.constants import WEEK_DAYS
from commons.log_helper import get_logger
from commons.profiler import profiler
from models.algorithm import RecommendationSettings
from models.recommendation_history import RecommendationHistory, \
RecommendationTypeEnum, FeedbackStatusEnum
from services.metrics_service import MetricsService
from services.schedule.active_schedule_period import ActiveSchedulePeriod
from services.schedule.frequency_map import FrequencyMap
from services.schedule.schedule_item import ScheduleItem
_LOG = get_logger('r8s-schedule-service')
SECONDS_IN_DAY = 86400
MINIMUM_DAYS_REQUIRED = 14
MINIMUM_SCHEDULE_DURATION_MINUTES = 60
MAX_GROUPING_DIFFERENCE_SECONDS = 3600
class ScheduleService:
def __init__(self, metrics_service: MetricsService):
self.metrics_service = metrics_service
@profiler(execution_step=f'instance_schedule_generation')
def generate_schedule(self, recommendation_settings: RecommendationSettings,
shutdown_periods, instance_id, df, instance_meta=None,
past_recommendations: List[RecommendationHistory] = None):
# todo process instance meta
_LOG.debug(f'Checking if schedule is disabled by user adjustments')
if self._is_schedule_disabled(recommendations=past_recommendations):
_LOG.debug(f'Schedule action is disabled for instance.')
return self._get_always_run_schedule()
_LOG.debug(f'Generating schedule for instance \'{instance_id}\'')
covered_days = (df.index.max() - df.index.min()).total_seconds()
covered_days = round(covered_days / SECONDS_IN_DAY)
if covered_days < recommendation_settings.min_allowed_days_schedule:
_LOG.warning(f'Minimum {MINIMUM_DAYS_REQUIRED} days of '
f'telemetry required for schedule recommendation')
return self._get_always_run_schedule()
_LOG.debug(f'Extracting active periods')
active_schedule_periods = self._get_active_schedule_periods(
shutdown_periods=shutdown_periods,
instance_id=instance_id
)
_LOG.debug(f'Generating frequency map')
frequency_map = self._generate_frequency_map(
active_schedule_periods=active_schedule_periods,
record_step_minutes=recommendation_settings.record_step_minutes,
action='shutdown'
)
_LOG.debug(f'Generating daily schedules')
day_schedules = self._generate_daily_schedule(
df=df,
frequency_map=frequency_map,
)
_LOG.debug(f'Grouping schedules by day')
schedule_result = self._group_by_days(day_schedules=day_schedules)
schedule_result = [item.as_dict() for item in schedule_result]
_LOG.debug(f'Schedule result: \'{schedule_result}\'')
return schedule_result
@staticmethod
def _is_schedule_disabled(recommendations: List[RecommendationHistory]):
schedule_action = RecommendationTypeEnum.ACTION_SCHEDULE
dont_recommend_status = FeedbackStatusEnum.DONT_RECOMMEND
for recommendation in recommendations:
if recommendation.recommendation_type == schedule_action and \
recommendation.feedback_status == dont_recommend_status:
return True
return False
def _get_active_schedule_periods(
self, shutdown_periods: list,
instance_id: str) -> List[ActiveSchedulePeriod]:
active_schedule_periods = []
for index, period in enumerate(shutdown_periods):
_LOG.debug(f'Processing period: {index}/{len(shutdown_periods)} '
f'for instance \'{instance_id}\'')
# todo calculate schedule probability
ml_result = [0.5]
active_schedule_period = self._get_active_schedule_period(
instance_id=instance_id,
df=period,
ml_result=ml_result,
model_action="shutdown"
)
if active_schedule_period:
active_schedule_periods.append(active_schedule_period)
return active_schedule_periods
def _generate_frequency_map(
self, active_schedule_periods: List[ActiveSchedulePeriod],
action: str, record_step_minutes: int) -> FrequencyMap:
day_time_points = self._get_day_time_points(
record_step_minutes=record_step_minutes)
frequency_map = FrequencyMap(action=action,
time_points=day_time_points,
step_minutes=record_step_minutes)
for period in active_schedule_periods:
if not period.action:
continue
start_index = day_time_points.index(period.time_from)
end_index = day_time_points.index(period.time_to)
if end_index == 0:
end_index = len(day_time_points) - 1
period_time_points = day_time_points[start_index: end_index + 1]
weekday = period.weekday
for period_time_point in period_time_points:
frequency_map_item = frequency_map[weekday][period_time_point]
if not frequency_map_item:
continue
frequency_map_item.count += 1
frequency_map_item.probabilities.append(period.probability)
return frequency_map
@staticmethod
def _get_active_schedule_period(instance_id: str, df: pandas.DataFrame,
ml_result: list,
model_action: str) -> ActiveSchedulePeriod:
time_from = df.index.min().strftime('%H:%M')
time_to = df.index.max().strftime('%H:%M')
weekday = df.index.min().strftime('%A')
result_avg = statistics.mean(ml_result)
probability = round(float(result_avg), 2)
if time_to == '23:50':
time_to = '00:00'
return ActiveSchedulePeriod(
instance_id=instance_id,
weekday=weekday,
time_from=time_from,
time_to=time_to,
action=model_action,
probability=probability
)
@staticmethod
def _generate_daily_schedule(
df: pandas.DataFrame, frequency_map: FrequencyMap,
minimum_duration=MINIMUM_SCHEDULE_DURATION_MINUTES) \
-> List[ScheduleItem]:
df_diff_seconds = (max(df.T) - min(df.T)).total_seconds()
processed_days = round(df_diff_seconds / SECONDS_IN_DAY)
daily_schedules = []
for day in WEEK_DAYS:
schedule_day = frequency_map[day].get_day_schedule(
processed_days=processed_days,
minimum_duration_minutes=minimum_duration)
daily_schedules.extend(schedule_day)
return daily_schedules
@staticmethod
def _group_by_days(day_schedules: List[ScheduleItem]) -> List[ScheduleItem]:
day_schedules = sorted(day_schedules, key=lambda d: d.start)
grouped = []
processed_periods = []
processed = []
for day_schedule in day_schedules:
if day_schedule in processed:
continue
period_key = (day_schedule.start, day_schedule.stop)
if period_key in processed_periods:
continue
same = [i for i in day_schedules if day_schedule.is_similar(
other=i, max_diff_second=MAX_GROUPING_DIFFERENCE_SECONDS)]
grouped_weekdays = []
grouped_prob = []
for i in same:
grouped_weekdays.extend(i.weekdays)
grouped_prob.append(i.probability)
start, stop = ScheduleItem.get_common_start_stop(
day_schedules=same)
grouped_weekdays = list(set(grouped_weekdays))
grouped_weekdays.sort(key=lambda k: WEEK_DAYS.index(k))
schedule_item = ScheduleItem(
start=start,
stop=stop,
weekdays=grouped_weekdays,
probability=round(sum(grouped_prob) / len(grouped_prob), 2)
)
processed.extend(same)
grouped.append(schedule_item)
processed_periods.append(period_key)
return sorted(grouped, key=lambda x: x.duration_minutes, reverse=True)
@staticmethod
def _get_day_time_points(record_step_minutes: int) -> List[str]:
points = []
dt = datetime.now().replace(hour=0, minute=0, second=0, microsecond=0)
for _ in range(24 * 60 // record_step_minutes):
points.append(dt.strftime('%H:%M'))
dt += timedelta(minutes=record_step_minutes)
return points
@staticmethod
def _get_always_run_schedule():
return [{
'start': '00:00',
'stop': '00:00',
'weekdays': WEEK_DAYS
}]