docker/services/metrics_service.py (459 lines of code) (raw):

import datetime import glob import json import os from typing import List import numpy as np import pandas import pandas as pd import pytz from pytz import UnknownTimeZoneError from commons import dateparse from commons.constants import JOB_STEP_PROCESS_METRICS, META_FILE_NAME, \ JOB_STEP_INITIALIZE_ALGORITHM, JOB_STEP_VALIDATE_METRICS, COLUMN_CPU_LOAD, \ CSV_EXTENSION from commons.exception import ExecutorException from commons.log_helper import get_logger from commons.profiler import profiler from models.algorithm import Algorithm from models.recommendation_history import RecommendationHistory from services.clustering_service import ClusteringService from services.resize.resize_trend import ResizeTrend _LOG = get_logger('r8s-metrics-service') TIMESTAMP_FREQUENCY = '5Min' DAY_RECORDS = 144 META_KEY_RESOURCE_ID = 'resourceId' META_KEY_CREATE_DATE_TIMESTAMP = 'createDateTimestamp' MINIMUM_DAYS_TO_CUT_INCOMPLETE_EDGE_DAYS = 14 class MetricsService: def __init__(self, clustering_service: ClusteringService): self.clustering_service = clustering_service def calculate_instance_trend(self, df, algorithm: Algorithm) \ -> ResizeTrend: metric_attrs = set(list(algorithm.metric_attributes)) resize_trend = ResizeTrend() for metric in metric_attrs: metric_column = self.get_column(metric_name=metric, df=df) resize_trend.add_metric_trend( metric_name=metric, column=metric_column ) return resize_trend def calculate_instance_trend_multiple( self, algorithm: Algorithm, non_straight_periods, total_length) -> List[ResizeTrend]: result = [] for period_list in non_straight_periods: concat_dt = pd.concat(period_list) period_trend = self.calculate_instance_trend( df=concat_dt, algorithm=algorithm) period_trend.probability = round( len(concat_dt) / total_length, 2) result.append(period_trend) if not result: return result metric_attrs = set(list(algorithm.metric_attributes)) # removes duplicated trends with same metric resize directions result = ResizeTrend.remove_duplicates(trends=result, metric_attrs=metric_attrs) return result @staticmethod def get_threshold_value(column): return column.quantile(.9) @staticmethod def get_column(metric_name, df): try: column = df[metric_name] return column except Exception as e: _LOG.error(f'Invalid metric format. Exception: {e}') raise ExecutorException( step_name=JOB_STEP_PROCESS_METRICS, reason=f'Invalid metric format. Exception: {e}' ) @staticmethod def get_last_period(df, days=7): range_max = df.index.max() range_min = range_max - datetime.timedelta(days=days) # take slice with final N days of data sliced_df = df[(df.index >= range_min) & (df.index <= range_max)] return sliced_df @staticmethod def fill_missing_timestamps(df, diff=TIMESTAMP_FREQUENCY): instance_id = df['instance_id'][0] instance_type = df['instance_type'][0] df = df[~df.index.duplicated(keep='last')] df = df.resample(diff).ffill() df = df.assign(instance_id=instance_id) df = df.assign(instance_type=instance_type) df.fillna(0, inplace=True) return df def validate_metric_file(self, algorithm: Algorithm, metric_file_path): try: df = self.read_metrics(metric_file_path=metric_file_path, algorithm=algorithm, parse_index=False) except Exception as e: _LOG.warning(f'Metric file can not be read: Exception: {e}') raise ExecutorException( step_name=JOB_STEP_VALIDATE_METRICS, reason=f'Metric file can not be read: Exception: {e}' ) column_names = list(df.columns) required_columns_set = set(list(algorithm.required_data_attributes)) file_columns_set = set(column_names) missing_columns = list(required_columns_set - file_columns_set) excess_columns = list(file_columns_set - required_columns_set) if missing_columns or excess_columns: _LOG.error(f'File \'{metric_file_path}\' does not match the ' f'required set of columns') error_message = [] if missing_columns: _LOG.error(f'Missing columns: \'{missing_columns}\'') error_message.append(f'Missing columns: \'{missing_columns}\'') if excess_columns: _LOG.error(f'Excess columns: \'{excess_columns}\'') if error_message: raise ExecutorException( step_name=JOB_STEP_VALIDATE_METRICS, reason=';'.join(error_message) ) absent_metric_attrs = [] metric_attrs = list(algorithm.metric_attributes) for metric_attr in metric_attrs: if (df[metric_attr] == -1).all(): absent_metric_attrs.append(metric_attr) if set(absent_metric_attrs) == set(metric_attrs): _LOG.warning(f'Metric file must contain data for at ' f'least one metric: {", ".join(metric_attrs)}') raise ExecutorException( step_name=JOB_STEP_INITIALIZE_ALGORITHM, reason=f'Metric file must contain data for at ' f'least one metric: {", ".join(metric_attrs)}' ) def load_df(self, path, algorithm: Algorithm, applied_recommendations: List[RecommendationHistory] = None, instance_meta: dict = None): all_attrs = set(list(algorithm.required_data_attributes)) metric_attrs = set(list(algorithm.metric_attributes)) non_metric = all_attrs - metric_attrs non_metric.remove(algorithm.timestamp_attribute) try: df = self.read_metrics(metric_file_path=path, algorithm=algorithm) df = self.trim_from_appliance_date( df=df, applied_recommendations=applied_recommendations) recommendation_settings = algorithm.recommendation_settings timezone_name = recommendation_settings.target_timezone_name if timezone_name: _LOG.debug(f'Converting to timezone \'{timezone_name}\'') try: df = df.tz_convert(timezone_name) except UnknownTimeZoneError: _LOG.error(f'Unknown timezone \'{timezone_name}\'') df = self.fill_missing_timestamps(df=df) df.sort_index(ascending=True, inplace=True) for attr in non_metric: df.drop(attr, inplace=True, axis=1) df = self.discard_start( df=df, algorithm=algorithm, instance_meta=instance_meta ) df_duration_days = (df.index.max() - df.index.min()).days if np.isnan(df_duration_days) or \ df_duration_days < \ algorithm.recommendation_settings.min_allowed_days: _LOG.error( f'Insufficient data. Analysed period must be larger ' f'than a full day with 5-min frequency of records.') raise ExecutorException( step_name=JOB_STEP_INITIALIZE_ALGORITHM, reason=f'Insufficient data. Analysed period must be larger ' f'than a full day with 5-min frequency of records.' ) df = self.get_last_period(df, days=recommendation_settings.max_days) df = self.group_by_time( df=df, step_minutes=recommendation_settings.record_step_minutes) return df except ExecutorException as e: raise e except Exception as e: _LOG.error(f'Error occurred while reading metrics file: {str(e)}') raise ExecutorException( step_name=JOB_STEP_PROCESS_METRICS, reason=f'Unable to read metrics file' ) @staticmethod def discard_start(df: pd.DataFrame, algorithm: Algorithm, instance_meta=None): if instance_meta and 'creationDateTimestamp' in instance_meta: try: creation_date_timestamp = instance_meta[ 'creationDateTimestamp'] _LOG.debug(f'Discarding metrics before timestamp: ' f'{creation_date_timestamp}') creation_dt = datetime.datetime.utcfromtimestamp( creation_date_timestamp // 1000) creation_dt = creation_dt.astimezone( pytz.timezone(df.index.max().tz.zone)) return df[(df.index >= creation_dt)] except Exception as e: _LOG.debug(f'Failed to discard metrics before timestamp. ' f'Exception: {e}') if algorithm.recommendation_settings.discard_initial_zeros: _LOG.debug( f'Going to discard metrics without load from the start.') metric_attrs = algorithm.metric_attributes try: for index, row in df.iterrows(): if not all(row[attr] in (0, -1) for attr in metric_attrs): _LOG.debug(f'Metrics before {index} will be discarded') return df[df.index >= index] except Exception as e: _LOG.debug(f'Failed to discard leading metrics with zeros. ' f'Exception: {e}') return df def merge_metric_files(self, metrics_folder_path, algorithm: Algorithm): metric_files = [y for x in os.walk(metrics_folder_path) for y in glob.glob(os.path.join(x[0], '*.csv'))] instance_id_date_mapping = {} for file in metric_files: path_items = file.split(os.sep)[::-1] instance_id, date = path_items[0:2] instance_id = instance_id.replace(CSV_EXTENSION, '') if instance_id in instance_id_date_mapping: instance_id_date_mapping[instance_id].append(file) else: instance_id_date_mapping[instance_id] = [file] resulted_files = [] for instance_id, files in instance_id_date_mapping.items(): if len(files) == 1: resulted_files.append(files[0]) continue most_recent = max(files) files = sorted(files) csv_to_combine = [self.read_metrics(f, algorithm=algorithm, parse_index=False) for f in files] combined_csv = pd.concat(csv_to_combine) combined_csv.sort_values(algorithm.timestamp_attribute) combined_csv.to_csv(most_recent, index=False) resulted_files.append(most_recent) for file in metric_files: if file not in resulted_files: os.remove(file) return resulted_files @profiler(execution_step=f'instance_clustering') def divide_on_periods(self, df, algorithm: Algorithm): df = self.divide_by_days( df, skip_incomplete_corner_days=True, step_minutes=algorithm.recommendation_settings.record_step_minutes) shutdown_periods = [] low_util_periods = [] good_util_periods = [] over_util_periods = [] centroids = [] for index, df_day in enumerate(df): # print(f'Processing day: {index}/{len(cpu_df)}') shutdown, low, medium, high, day_centroids = self.process_day( df=df_day, algorithm=algorithm) shutdown_periods.extend(shutdown) low_util_periods.extend(low) good_util_periods.extend(medium) over_util_periods.extend(high) centroids.extend(day_centroids) return shutdown_periods, low_util_periods, \ good_util_periods, over_util_periods, centroids @staticmethod def group_by_time(df, step_minutes: int = None, freq='10Min'): if step_minutes: freq = f'{step_minutes}Min' return df.groupby(pd.Grouper(freq=freq)).mean() @staticmethod def divide_by_days(df, skip_incomplete_corner_days: bool, step_minutes: int): df_list = [group[1] for group in df.groupby(df.index.date)] if not df_list: return df_list if len(df_list) > MINIMUM_DAYS_TO_CUT_INCOMPLETE_EDGE_DAYS \ and skip_incomplete_corner_days: last_day_df = df_list[-1] if len(last_day_df) < 24 * 60 // step_minutes: df_list = df_list[:-1] first_day_df = df_list[0] if len(first_day_df) < 24 * 60 // step_minutes: df_list = df_list[1:] return df_list def process_day(self, df: pandas.DataFrame, algorithm: Algorithm): shutdown = [] low_util = [] good_util = [] over_util = [] df_, centroids = self.clustering_service.cluster( df=df, algorithm=algorithm) _LOG.debug(f'Clusters centroids: {centroids}') thresholds = algorithm.recommendation_settings.thresholds for index, centroid in enumerate(centroids): if not centroid: continue cluster: pd.DataFrame = df.loc[df['cluster'] == index] cluster.drop('cluster', axis=1, inplace=True) if centroid[0] < thresholds[0]: shutdown.append(cluster) elif thresholds[0] <= centroid[0] < \ thresholds[1]: low_util.append(cluster) elif thresholds[1] <= centroid[0] < \ thresholds[2]: good_util.append(cluster) else: over_util.append(cluster) shutdown = pd.concat(shutdown) if shutdown else None low_util = pd.concat(low_util) if low_util else None good_util = pd.concat(good_util) if good_util else None over_util = pd.concat(over_util) if over_util else None freq = f'{algorithm.recommendation_settings.record_step_minutes}Min' result = [self.get_time_ranges(cluster, freq=freq) for cluster in (shutdown, low_util, good_util, over_util)] result.append(centroids) return result @staticmethod def get_non_empty_attrs(df: pd.DataFrame, attrs): non_empty = [] for attr in attrs: is_empty = all(value == -1 for value in list(df[attr])) if not is_empty: non_empty.append(attr) return non_empty @staticmethod def filter_by_ranges(row, periods): row_time = row.name.time() for period in periods: if period[0] <= row_time <= period[1]: return row def get_time_ranges(self, df, freq='10Min'): if not isinstance(df, pd.DataFrame) or len(df) == 0: return [] df = df.asfreq(freq) periods = [] period_start = None period_end = None for row in df.itertuples(): cpu_load = getattr(row, COLUMN_CPU_LOAD) if period_start and period_end and np.isnan(cpu_load): periods.append((period_start, period_end)) period_start = None period_end = None elif not period_start and not np.isnan(cpu_load): period_start = row.Index.time() elif period_start and not np.isnan(cpu_load): period_end = row.Index.time() if period_start and period_end: periods.append((period_start, period_end)) return self.build_df_from_periods( df=df, periods=periods ) def build_df_from_periods(self, df, periods: list): dfs = [] for period in periods: df_ = df.copy().apply(self.filter_by_ranges, axis=1, periods=[period], result_type='broadcast') if not isinstance(df_, pd.DataFrame): df_ = df_.to_frame() df_ = df_.dropna(thresh=1) if not df.empty: dfs.append(df_) return dfs @staticmethod def filter_short_periods(periods, min_length_sec=1800): periods = [df for df in periods if (df.index.max() - df.index.min()).total_seconds() >= min_length_sec] periods.sort(key=lambda df: df.index.min()) return periods def get_instance_type(self, metric_file_path, algorithm: Algorithm, instance_type_attr='instance_type'): try: df = self.read_metrics(metric_file_path=metric_file_path, algorithm=algorithm, parse_index=False) return df[instance_type_attr][0] except Exception as e: _LOG.error(f'Failed to extract instance type from metric file. ' f'Error: {e}') raise ExecutorException( step_name=JOB_STEP_PROCESS_METRICS, reason=f'Failed to extract instance type from metric file. ' f'Error: {e}' ) @staticmethod def read_metrics(metric_file_path, algorithm: Algorithm = None, parse_index=True): try: if not parse_index: return pd.read_csv(metric_file_path, **algorithm.read_configuration) return pd.read_csv( metric_file_path, parse_dates=True, date_parser=dateparse, index_col=algorithm.timestamp_attribute, **algorithm.read_configuration) except Exception as e: _LOG.error(f'Error occurred while reading metrics file: {str(e)}') raise ExecutorException( step_name=JOB_STEP_PROCESS_METRICS, reason=f'Unable to read metrics file' ) def read_meta(self, metrics_folder): instance_meta_mapping = {} files = list(glob.iglob(metrics_folder + f'**/**/{META_FILE_NAME}', recursive=True)) _LOG.debug(f'Found \'{len(files)}\' meta files, extracting meta') for file in files: instance_meta_mapping = self._read_meta_file( meta_file_path=file, instance_meta_mapping=instance_meta_mapping) _LOG.debug(f'Loaded meta: {instance_meta_mapping}') return instance_meta_mapping @staticmethod def trim_from_appliance_date(df: pandas.DataFrame, applied_recommendations: List[RecommendationHistory]): if not applied_recommendations: return df last_applied_date = max(item.feedback_dt for item in applied_recommendations) _LOG.debug(f'Trimming metrics from date ' f'\'{last_applied_date.isoformat()}\'') if df.index.tz: last_applied_date = last_applied_date.replace(tzinfo=pytz.UTC) return df[df.index > last_applied_date] @staticmethod def _read_meta_file(meta_file_path: str, instance_meta_mapping: dict): _LOG.debug(f'Processing meta file \'{meta_file_path}\'') try: with open(meta_file_path, 'r') as f: file_meta = json.load(f) except json.decoder.JSONDecodeError: _LOG.error(f'Unable to read meta file \'{meta_file_path}\', ' f'skipping') return instance_meta_mapping if not isinstance(file_meta, list): _LOG.error(f'Invalid meta format: must be a valid list') return instance_meta_mapping for resource_meta in file_meta: resource_id = resource_meta.get(META_KEY_RESOURCE_ID) if not resource_id: continue if resource_id not in instance_meta_mapping: instance_meta_mapping[resource_id] = resource_meta continue timestamp = resource_meta.get(META_KEY_CREATE_DATE_TIMESTAMP) instance_timestamp = instance_meta_mapping[ resource_id].get(META_KEY_CREATE_DATE_TIMESTAMP) # overwrite instance meta with latest meta found if timestamp and instance_timestamp and \ timestamp > instance_timestamp \ or not instance_timestamp: instance_meta_mapping[resource_id].update(resource_meta) return instance_meta_mapping