docker/services/mocked_data_service.py (412 lines of code) (raw):

import pandas as pd from cron_converter import Cron from commons.constants import ALLOWED_ACTIONS, ACTION_SCALE_DOWN, \ ACTION_SCALE_UP, ACTION_EMPTY, ACTION_SHUTDOWN, ACTION_SPLIT, \ ACTION_SCHEDULE from commons.log_helper import get_logger from models.base_model import CloudEnum from services.os_service import OSService from tests_executor.constants import POINTS_IN_DAY from tests_executor.utils import generate_timestamp_series, constant_to_series, \ generate_constant_metric_series, generate_split_series, \ generate_scheduled_metric_series _LOG = get_logger('r8s_mocked_data_service') TAG_TEST_CASE = 'r8s_test_case' TAG_PERIOD_DAYS = 'r8s_period_days' TAG_CPU = 'r8s_cpu_load' TAG_MEMORY = 'r8s_memory_load' TAG_AVG_DISK_IOPS = 'r8s_avg_disk_iops' TAG_MAX_DISK_IOPS = 'r8s_max_disk_iops' TAG_NET_OUTPUT_LOAD = 'r8s_net_output_load' TAG_STD = 'r8s_std' TAG_CRON_START = 'r8s_cron_start' TAG_CRON_STOP = 'r8s_cron_stop' TAG_PROBABILITY = 'r8s_probability' ALLOWED_TAGS = [TAG_TEST_CASE, TAG_PERIOD_DAYS, TAG_CPU, TAG_MEMORY, TAG_AVG_DISK_IOPS, TAG_MAX_DISK_IOPS, TAG_NET_OUTPUT_LOAD, TAG_STD, TAG_CRON_START, TAG_CRON_STOP, TAG_PROBABILITY] NUMBER_TAGS = [TAG_PERIOD_DAYS, TAG_CPU, TAG_MEMORY, TAG_AVG_DISK_IOPS, TAG_MAX_DISK_IOPS, TAG_NET_OUTPUT_LOAD, TAG_STD, TAG_PROBABILITY] DEFAULT_CONFIG = { ACTION_SCALE_DOWN: { TAG_PERIOD_DAYS: 14, TAG_CPU: 20, TAG_MEMORY: 20, TAG_AVG_DISK_IOPS: -1, TAG_MAX_DISK_IOPS: -1, TAG_NET_OUTPUT_LOAD: -1, TAG_STD: 2, }, ACTION_SCALE_UP: { TAG_PERIOD_DAYS: 14, TAG_CPU: 80, TAG_MEMORY: 80, TAG_AVG_DISK_IOPS: -1, TAG_MAX_DISK_IOPS: -1, TAG_NET_OUTPUT_LOAD: -1, TAG_STD: 2, }, ACTION_EMPTY: { TAG_PERIOD_DAYS: 14, TAG_CPU: 45, TAG_MEMORY: 55, TAG_AVG_DISK_IOPS: -1, TAG_MAX_DISK_IOPS: -1, TAG_NET_OUTPUT_LOAD: -1, TAG_STD: 2, }, ACTION_SHUTDOWN: { TAG_PERIOD_DAYS: 14, TAG_CPU: 5, TAG_MEMORY: 3, TAG_AVG_DISK_IOPS: -1, TAG_MAX_DISK_IOPS: -1, TAG_NET_OUTPUT_LOAD: -1, TAG_STD: 1, }, ACTION_SPLIT: { TAG_PERIOD_DAYS: 14, TAG_CPU: [25, 80], TAG_MEMORY: [25, 80], TAG_AVG_DISK_IOPS: -1, TAG_MAX_DISK_IOPS: -1, TAG_NET_OUTPUT_LOAD: -1, TAG_PROBABILITY: [50, 50], TAG_STD: 1, }, ACTION_SCHEDULE: { TAG_PERIOD_DAYS: 14, TAG_CPU: 50, TAG_MEMORY: 50, TAG_AVG_DISK_IOPS: -1, TAG_MAX_DISK_IOPS: -1, TAG_NET_OUTPUT_LOAD: -1, TAG_STD: 1, TAG_CRON_START: '0 9 * * 0-4', TAG_CRON_STOP: '0 18 * * 0-4', }, } class MockedDataService: """ Allows to replace actual instance metrics with mocked data based on instance meta tags.Supported tags: - r8s_test_case: SCALE_UP/SCALE_DOWN/SHUTDOWN/SCHEDULE/SPLIT - contains resulted action - r8s_period_days: mocked data period - r8s_cpu_load: average cpu load on instance - r8s_memory_load: average memory load on instance - r8s_std: standard deviation, used to modify load - r8s_cron_start (SCHEDULE only): cron that will be used to generate instance load - r8s_cron_stop (SCHEDULE only):cron that will be used to generate instance load - r8s_probability (SPLIT only): to split load by speficic percentages """ def __init__(self, os_service: OSService): self.os_service = os_service self.action_generator_mapping = { ACTION_EMPTY: self.generate_constant_load_metrics, ACTION_SHUTDOWN: self.generate_constant_load_metrics, ACTION_SCALE_UP: self.generate_constant_load_metrics, ACTION_SCALE_DOWN: self.generate_constant_load_metrics, ACTION_SCHEDULE: self.generate_schedule_load_metrics, ACTION_SPLIT: self.generate_split_load_metrics, } def process(self, instance_meta_mapping, metric_file_paths): instance_tags_mapping = self.parse_tags( instance_meta_mapping=instance_meta_mapping) meta_mapping = {k: v for k, v in instance_tags_mapping.items() if TAG_TEST_CASE in v} file_to_meta_mapping = {} for file in metric_file_paths: instance_id = self.os_service.path_to_instance_id(file_path=file) if instance_id in meta_mapping: file_to_meta_mapping[file] = meta_mapping[instance_id] if not file_to_meta_mapping: _LOG.warning(f'No instances with tag \'{TAG_TEST_CASE}\' found.') return for file_path, instance_meta in file_to_meta_mapping.items(): _LOG.debug(f'Going to replace metrics by path \'{file_path}\' ' f'with mocked metrics by tags: {instance_meta}') self.process_instance(instance_meta=instance_meta, metric_file_path=file_path) def process_instance(self, instance_meta, metric_file_path): _LOG.debug(f'Filtering instance meta') instance_meta = {k: v for k, v in instance_meta.items() if k in ALLOWED_TAGS} _LOG.debug(f'Converting tag values to numbers') instance_meta = self.values_to_number(instance_meta=instance_meta) test_case = instance_meta.get(TAG_TEST_CASE).upper() if test_case not in ALLOWED_ACTIONS: _LOG.error(f'Invalid test case specified: \'{test_case}\'. ' f'Allowed test cases: {ALLOWED_ACTIONS}') return test_config = DEFAULT_CONFIG.get(test_case).copy() for key, value in instance_meta.items(): if key not in test_config: continue default_value = test_config.get(key) if isinstance(default_value, list) and not isinstance(value, list): _LOG.warning(f'Expected \'list\' value for key \'{key}\', ' f'skipping') continue if isinstance(default_value, int) and not isinstance(value, (int, float)): _LOG.warning(f'Expected \'int\' value for key \'{key}\', ' f'skipping') continue test_config[key] = value generator = self.action_generator_mapping.get(test_case) generator(test_config, metric_file_path) @staticmethod def values_to_number(instance_meta): for k, v in instance_meta.items(): if v and k in NUMBER_TAGS: try: if '/' in v: values = v.split('/') values = [int(i) for i in values] instance_meta[k] = values instance_meta[k] = int(v) except (TypeError, ValueError): _LOG.error(f'Invalid value specified for \'{k}\' tag, ' f'skipping') return instance_meta def generate_constant_load_metrics(self, config, metric_file_path): period_days = config.get(TAG_PERIOD_DAYS) length = POINTS_IN_DAY * period_days instance_id_series, instance_type_series, timestamp_series, \ shape_series, shape_size_koef_series = self.generate_common_columns( metric_file_path=metric_file_path, length=length ) deviation = config.get(TAG_STD) cpu_avg = config.get(TAG_CPU) memory_avg = config.get(TAG_MEMORY) avg_iops_avg = config.get(TAG_AVG_DISK_IOPS) max_iops_avg = config.get(TAG_MAX_DISK_IOPS) net_output_avg = config.get(TAG_NET_OUTPUT_LOAD) cpu_load_series = generate_constant_metric_series( distribution='normal', loc=cpu_avg, scale=deviation, size=length ) memory_load_series = generate_constant_metric_series( distribution='normal', loc=memory_avg, scale=deviation, size=length ) net_output_load_series = generate_constant_metric_series( distribution='normal', loc=net_output_avg, scale=deviation, size=length ) avg_iops_series = generate_constant_metric_series( distribution='normal', loc=avg_iops_avg, scale=deviation, size=length ) max_iops_series = generate_constant_metric_series( distribution='normal', loc=max_iops_avg, scale=deviation, size=length ) df_data = { 'instance_id': instance_id_series, 'instance_type': instance_type_series, 'shape': shape_series, 'shape_size_koef': shape_size_koef_series, 'timestamp': timestamp_series, 'cpu_load': cpu_load_series, 'memory_load': memory_load_series, 'net_output_load': net_output_load_series, 'avg_disk_iops': avg_iops_series, 'max_disk_iops': max_iops_series, } df = pd.DataFrame(df_data) df.to_csv(metric_file_path, sep=',', index=False) def generate_schedule_load_metrics(self, config, metric_file_path): period_days = config.get(TAG_PERIOD_DAYS) length = POINTS_IN_DAY * period_days instance_id_series, instance_type_series, timestamp_series, \ shape_series, shape_size_koef_series = self.generate_common_columns( metric_file_path=metric_file_path, length=length ) deviation = config.get(TAG_STD) cpu_avg = config.get(TAG_CPU) memory_avg = config.get(TAG_MEMORY) avg_iops_avg = config.get(TAG_AVG_DISK_IOPS) max_iops_avg = config.get(TAG_MAX_DISK_IOPS) net_output_avg = config.get(TAG_NET_OUTPUT_LOAD) cron_start = config.get(TAG_CRON_START) cron_stop = config.get(TAG_CRON_STOP) cron_start_list = self._cron_to_list(cron_start) cron_stop_list = self._cron_to_list(cron_stop) if not cron_start_list or not cron_stop_list: _LOG.error(f'Some of the specified cron strings are not valid') return work_days, work_hours = self._get_work_days_hours( cron_start_list=cron_start_list, cron_stop_list=cron_stop_list) cpu_load_series = generate_scheduled_metric_series( distribution='normal', timestamp_series=timestamp_series, work_days=work_days, work_hours=work_hours, work_kwargs=dict(loc=cpu_avg, scale=deviation), idle_kwargs=dict(loc=3, scale=deviation) ) memory_load_series = generate_scheduled_metric_series( distribution='normal', timestamp_series=timestamp_series, work_days=work_days, work_hours=work_hours, work_kwargs=dict(loc=memory_avg, scale=deviation), idle_kwargs=dict(loc=3, scale=deviation) ) net_output_load_series = generate_constant_metric_series( distribution='normal', loc=net_output_avg, scale=deviation, size=length ) avg_iops_series = generate_constant_metric_series( distribution='normal', loc=avg_iops_avg, scale=deviation, size=length ) max_iops_series = generate_constant_metric_series( distribution='normal', loc=max_iops_avg, scale=deviation, size=length ) df_data = { 'instance_id': instance_id_series, 'instance_type': instance_type_series, 'shape': shape_series, 'shape_size_koef': shape_size_koef_series, 'timestamp': timestamp_series, 'cpu_load': cpu_load_series, 'memory_load': memory_load_series, 'net_output_load': net_output_load_series, 'avg_disk_iops': avg_iops_series, 'max_disk_iops': max_iops_series, } df = pd.DataFrame(df_data) df.to_csv(metric_file_path, sep=',', index=False) def generate_split_load_metrics(self, config, metric_file_path): period_days = config.get(TAG_PERIOD_DAYS) length = POINTS_IN_DAY * period_days instance_id_series, instance_type_series, timestamp_series, \ shape_series, shape_size_koef_series = self.generate_common_columns( metric_file_path=metric_file_path, length=length ) deviation = config.get(TAG_STD) cpu_avg = config.get(TAG_CPU) memory_avg = config.get(TAG_MEMORY) avg_iops_avg = config.get(TAG_AVG_DISK_IOPS) max_iops_avg = config.get(TAG_MAX_DISK_IOPS) net_output_avg = config.get(TAG_NET_OUTPUT_LOAD) probability = config.get(TAG_PROBABILITY) cpu_load_series = generate_split_series( distribution='normal', avg_loads=cpu_avg, probabilities=probability, scale=deviation, size=length ) memory_load_series = generate_split_series( distribution='normal', avg_loads=memory_avg, probabilities=probability, scale=deviation, size=length ) net_output_load_series = generate_constant_metric_series( distribution='normal', loc=net_output_avg, scale=deviation, size=length ) avg_iops_series = generate_constant_metric_series( distribution='normal', loc=avg_iops_avg, scale=deviation, size=length ) max_iops_series = generate_constant_metric_series( distribution='normal', loc=max_iops_avg, scale=deviation, size=length ) df_data = { 'instance_id': instance_id_series, 'instance_type': instance_type_series, 'shape': shape_series, 'shape_size_koef': shape_size_koef_series, 'timestamp': timestamp_series, 'cpu_load': cpu_load_series, 'memory_load': memory_load_series, 'net_output_load': net_output_load_series, 'avg_disk_iops': avg_iops_series, 'max_disk_iops': max_iops_series, } df = pd.DataFrame(df_data) df.to_csv(metric_file_path, sep=',', index=False) def generate_common_columns(self, metric_file_path, length): timestamp_series = generate_timestamp_series( length=length ) shape_cloud_mapping = { CloudEnum.CLOUD_AWS.value: 'c5.4xlarge', CloudEnum.CLOUD_AZURE.value: 'Standard_D8_v5', CloudEnum.CLOUD_GOOGLE.value: 'n2-standard-8', } instance_id = self.os_service.path_to_instance_id( file_path=metric_file_path) cloud = self.os_service.path_to_cloud(file_path=metric_file_path) instance_id_series = constant_to_series( value=instance_id, length=length ) instance_type_series = constant_to_series( value=shape_cloud_mapping.get(cloud.upper()), length=length ) shape_series = constant_to_series('stub', length) shape_size_koef_series = constant_to_series(0.5, length) return instance_id_series, instance_type_series, timestamp_series, \ shape_series, shape_size_koef_series @staticmethod def _cron_to_list(cron_str): try: cron = Cron(cron_str) return cron.to_list() except ValueError: _LOG.error(f'Invalid cron string specified: \'{cron_str}\'') @staticmethod def _get_work_days_hours(cron_start_list, cron_stop_list): start_hours = cron_start_list[1] stop_hours = cron_stop_list[1] start_week_days = cron_start_list[-1] stop_week_days = cron_stop_list[-1] work_hours = [list(range(start, stop)) for start, stop in zip(start_hours, stop_hours)] work_hours = list(set([item for sublist in work_hours for item in sublist])) work_days = list(set(start_week_days + stop_week_days)) return work_days, work_hours @staticmethod def parse_tags(instance_meta_mapping: dict): result = {} for instance_id, instance_meta in instance_meta_mapping.items(): _LOG.debug(f'Parsing instance tags from meta: \'{instance_meta}\'') tags_list = instance_meta.get('tags') if not tags_list or tags_list and not isinstance(tags_list, list): result[instance_id] = {} continue instance_tags = {} for item in tags_list: if not isinstance(item, dict): continue key = item.get('key') value = item.get('value') if not isinstance(key, str) or not isinstance(value, str): _LOG.warning(f'Both tag key and value must be strings: ' f'{key}:{value}') continue instance_tags[key] = value result[instance_id] = instance_tags return result