docker/services/resize/resize_service.py (385 lines of code) (raw):
from math import inf
from typing import List
from commons.constants import JOB_STEP_GENERATE_REPORTS, ACTION_SPLIT, \
CLOUD_ATTR
from commons.exception import ExecutorException
from commons.log_helper import get_logger
from models.algorithm import ShapeSorting
from models.base_model import CloudEnum
from models.parent_attributes import ParentMeta
from models.recommendation_history import RecommendationHistory, \
FeedbackStatusEnum
from models.shape import Shape
from services.customer_preferences_service import CustomerPreferencesService
from services.resize.shape_compatibility_filter import ShapeCompatibilityFilter
from services.shape_price_service import ShapePriceService
from services.shape_service import ShapeService
_LOG = get_logger('r8s-resize-service')
SHAPES_COUNT_TO_ADJUST = 3
class ResizeService:
def __init__(self,
customer_preferences_service: CustomerPreferencesService,
shape_service: ShapeService,
shape_price_service: ShapePriceService):
self.customer_preferences_service = customer_preferences_service
self.shape_service = shape_service
self.shape_price_service = shape_price_service
def recommend_size(self, trend, instance_type, resize_action,
cloud, algorithm, instance_meta=None, allow_recursion=True,
max_results=5, parent_meta=None,
shape_compatibility_rule=None,
past_resize_recommendations: List[
RecommendationHistory] = None):
# todo process instance meta
current_shape: Shape = self.shape_service.get(
name=instance_type)
if not current_shape:
_LOG.error(f'Unknown instance type: {instance_type}')
raise ExecutorException(
step_name=JOB_STEP_GENERATE_REPORTS,
reason=f'Unknown instance type: {instance_type}'
)
if not trend.requires_resize():
if resize_action == ACTION_SPLIT:
result_shape = current_shape.get_dto()
result_shape['probability'] = trend.probability
return [result_shape]
return []
cpu_min, cpu_max = trend.get_metric_ranges(
metric=trend.cpu_load,
provided=current_shape.cpu)
memory_min, memory_max = trend.get_metric_ranges(
metric=trend.memory_load,
provided=current_shape.memory)
net_output_min, _ = self.get_suitable_ranges(
perc90=trend.net_output_load.threshold,
provided=current_shape.network_throughput
)
net_output_min, _ = trend.get_metric_ranges(
metric=trend.net_output_load,
provided=current_shape.network_throughput,
only_for_non_empty=True
)
disk_iops_min, _ = trend.get_metric_ranges(
metric=trend.avg_disk_iops,
provided=current_shape.iops,
only_for_non_empty=True
)
all_shapes = self.shape_service.list(cloud=current_shape.cloud)
if parent_meta:
_LOG.debug(f'Applying parent meta: '
f'{parent_meta.as_dict()}')
all_shapes = self.customer_preferences_service. \
get_allowed_instance_types(
cloud=current_shape.cloud.value,
instances_data=all_shapes,
parent_meta=parent_meta
)
all_shapes = ShapeCompatibilityFilter().apply_compatibility_filter(
current_shape=current_shape,
shapes=all_shapes,
compatibility_rule=shape_compatibility_rule
)
if past_resize_recommendations:
all_shapes = self.apply_adjustment(
shapes=all_shapes,
recommendations=past_resize_recommendations)
forbid_change_series = algorithm.recommendation_settings.\
forbid_change_series
forbid_change_family = algorithm.recommendation_settings.\
forbid_change_family
prioritized_shapes = self.divide_by_priority(
sizes=all_shapes,
current_shape=current_shape,
cloud=cloud,
resize_action=resize_action,
parent_meta=parent_meta,
forbid_change_series=forbid_change_series,
forbid_change_family=forbid_change_family
)
suitable_shapes = self.find_suitable_shapes(
cpu_min=cpu_min,
cpu_max=cpu_max,
memory_min=memory_min,
memory_max=memory_max,
net_output_min=net_output_min,
disk_iops_min=disk_iops_min,
prioritized_shapes=prioritized_shapes
)
suitable_shapes = self._remove_shape_duplicates(
shapes=suitable_shapes)
if not suitable_shapes or len(suitable_shapes) < max_results:
if allow_recursion:
_LOG.warning(f'No suitable same-series shape found. '
f'Going to discard requirement for metric '
f'with scale down.')
trend.discard_optional_requirements()
recs = self.recommend_size(
trend=trend,
instance_type=instance_type,
resize_action=resize_action,
algorithm=algorithm,
cloud=cloud,
instance_meta=instance_meta,
parent_meta=parent_meta,
allow_recursion=False,
shape_compatibility_rule=shape_compatibility_rule,
past_resize_recommendations=past_resize_recommendations)
return self._remove_shape_duplicates(
shapes=suitable_shapes + recs,
max_results=max_results
)
elif suitable_shapes and len(suitable_shapes) < max_results:
_LOG.warning(f'Not enough suitable shapes found.')
return suitable_shapes
else:
_LOG.warning(f'No suitable shapes found')
return []
if resize_action == ACTION_SPLIT:
probability = trend.probability
for shape in suitable_shapes:
shape['probability'] = probability
result = self._remove_shape_duplicates(
shapes=suitable_shapes,
max_results=max_results)
return result
@staticmethod
def _remove_shape_duplicates(shapes, max_results: int = None):
result = []
for shape in shapes:
if shape not in result:
result.append(shape)
if max_results and len(result) > max_results:
result = result[:max_results]
return result
@staticmethod
def sort_shapes(shapes: list, sort_option: ShapeSorting = None):
if not sort_option or \
sort_option == ShapeSorting.SORT_BY_PERFORMANCE:
return shapes
if sort_option == ShapeSorting.SORT_BY_PRICE:
# in order: from lowest price to highest, then without price
return sorted(shapes, key=lambda shape: shape.get('price', inf))
return shapes
def add_price(self, instances: list, customer, region,
price_type='on_demand', os=None):
for instance in instances:
shape_name = instance.get('name')
shape_price = self.shape_price_service.get(
customer=customer,
name=shape_name,
region=region,
os=os
)
if not shape_price:
_LOG.warning(f'Missing price for shape \'{shape_name}\' '
f'in region \'{region}\'')
continue
price = getattr(shape_price, price_type, None)
if not price:
continue
instance['price'] = price
return instances
def apply_adjustment(self, shapes: List[Shape],
recommendations: List[RecommendationHistory]) -> List[Shape]:
status_handler_mapping = {
FeedbackStatusEnum.TOO_SMALL: self._adjust_too_small,
FeedbackStatusEnum.TOO_LARGE: self._adjust_too_large,
FeedbackStatusEnum.WRONG: self._adjust_wrong,
FeedbackStatusEnum.DONT_RECOMMEND: lambda *args, **kwargs: [],
}
for recommendation in recommendations:
feedback_status = recommendation.feedback_status
if feedback_status in status_handler_mapping:
handler = status_handler_mapping.get(feedback_status)
shapes = handler(shapes=shapes, recommendation=recommendation)
return shapes
def _adjust_too_small(self, shapes: List[Shape],
recommendation: RecommendationHistory):
recommended_shapes_data = self._get_recommended_shapes(
recommendation=recommendation,
max_items=SHAPES_COUNT_TO_ADJUST
)
recommended_cpu = [item.get('cpu') for item in recommended_shapes_data
if isinstance(item.get('cpu'), (int, float))]
recommended_memory = [item.get('memory') for item in
recommended_shapes_data
if isinstance(item.get('memory'), (int, float))]
threshold_cpu = max(recommended_cpu)
threshold_memory = max(recommended_memory)
result_shapes = []
for shape in shapes:
# shape cpu/memory must be >=, but
# at least one of them must be higher than a threshold value
if shape.cpu >= threshold_cpu and shape.memory >= threshold_memory \
and (shape.cpu != threshold_cpu
or threshold_memory != shape.memory):
result_shapes.append(shape)
return result_shapes
def _adjust_too_large(self, shapes: List[Shape],
recommendation: RecommendationHistory):
recommended_shapes_data = self._get_recommended_shapes(
recommendation=recommendation,
max_items=SHAPES_COUNT_TO_ADJUST
)
recommended_cpu = [item.get('cpu') for item in recommended_shapes_data
if isinstance(item.get('cpu'), (int, float))]
recommended_memory = [item.get('memory') for item in
recommended_shapes_data
if isinstance(item.get('memory'), (int, float))]
threshold_cpu = min(recommended_cpu)
threshold_memory = min(recommended_memory)
result_shapes = []
for shape in shapes:
# shape cpu/memory must be <=, but
# at least one of them must be smaller than a threshold value
if shape.cpu <= threshold_cpu and shape.memory <= threshold_memory \
and (shape.cpu != threshold_cpu
or threshold_memory != shape.memory):
result_shapes.append(shape)
return result_shapes
def _adjust_wrong(self, shapes: List[Shape],
recommendation: RecommendationHistory):
recommended_shapes_data = self._get_recommended_shapes(
recommendation=recommendation,
max_items=SHAPES_COUNT_TO_ADJUST
)
names_to_exclude = [shape_data.get('name') for shape_data
in recommended_shapes_data
if shape_data.get('name')]
return [shape for shape in shapes if shape.name not in names_to_exclude]
@staticmethod
def get_suitable_ranges(perc90, provided, minimum_load_threshold=5,
lowest_minimum=1, lowest_maximum=1):
if provided:
provided = float(provided)
else:
return None, None
if perc90 < minimum_load_threshold:
perc90 = minimum_load_threshold
currently_used = provided / 100 * perc90
min_limit_perc = 30
max_limit_perc = 70
absolute_max = currently_used * 100 / min_limit_perc
absolute_min = currently_used * 100 / max_limit_perc
if absolute_min < lowest_minimum:
absolute_min = lowest_minimum
if absolute_max < lowest_maximum:
absolute_max = lowest_maximum
return round(absolute_min, 2), round(absolute_max, 2)
@staticmethod
def find_suitable_shapes(cpu_min, cpu_max, memory_min, memory_max,
net_output_min,
disk_iops_min,
prioritized_shapes):
suitable_shapes = []
for shapes in prioritized_shapes:
for shape in shapes:
shape_cpu = getattr(shape, 'cpu', -1)
shape_memory = getattr(shape, 'memory', -1)
cpu_suits = True
memory_suits = True
net_output_suits = True
iops_suits = True
if cpu_min and cpu_max:
cpu_suits = cpu_min <= shape_cpu <= cpu_max
if memory_min and memory_max:
memory_suits = memory_min <= shape_memory <= memory_max
if net_output_min:
shape_net_output = shape.network_throughput
net_output_suits = shape_net_output \
and net_output_min <= shape_net_output
if disk_iops_min:
shape_iops = shape.iops
iops_suits = shape_iops and disk_iops_min <= shape_iops
if cpu_suits and memory_suits and iops_suits \
and net_output_suits:
suitable_shapes.append(shape)
return [shape.get_dto() for shape in suitable_shapes]
def divide_by_priority(self, sizes, cloud, current_shape: Shape, resize_action,
parent_meta: ParentMeta = None,
forbid_change_series=True,
forbid_change_family=True):
current_size_name = current_shape.name
current_series_prefix = self._get_series_prefix(
shape_name=current_size_name, cloud=cloud)
prioritised = []
if parent_meta:
shape_rules = parent_meta.shape_rules
_LOG.debug(f'Filtering shape rules for cloud: {cloud}')
shape_rules = [rule for rule in shape_rules
if rule.get(CLOUD_ATTR) == cloud]
_LOG.debug(f'Rule to apply: {shape_rules}')
if shape_rules:
prioritised = self.customer_preferences_service. \
process_priority_filters(
instances_data=sizes,
shape_rules=shape_rules
)
if resize_action == ACTION_SPLIT: # if its split action,
# allow to use same shape
same_series = self._get_same_series(
sizes=sizes,
series_prefix=current_series_prefix)
else:
same_series = self._get_same_series(
sizes=sizes,
series_prefix=current_series_prefix,
exclude_shape_names=(current_size_name,))
same_series_shape_names = [i['name'] for i in same_series]
same_family = []
if not forbid_change_series:
same_family = self._get_same_family(
sizes=sizes,
cloud=cloud,
current_shape=current_shape,
exclude_shapes=same_series_shape_names
)
same_family_shape_names = [i['name'] for i in same_family]
other_shapes = []
if not forbid_change_series and not forbid_change_family:
other_shapes = self._get_other_shapes(
sizes=sizes,
exclude_shapes=same_series_shape_names + same_family_shape_names)
return prioritised, same_series, same_family, other_shapes
@staticmethod
def _get_series_prefix(shape_name, cloud):
if cloud == CloudEnum.CLOUD_AWS.value:
return shape_name.split('.')[0] + '.'
if cloud == CloudEnum.CLOUD_AZURE.value:
parts = shape_name.split('_')
index = 0
for index, ch in enumerate(parts[1]):
if not ch.isalpha():
break
return f'{parts[0]}_{parts[1][0:index]}'
if cloud == CloudEnum.CLOUD_GOOGLE.value:
return shape_name.split('-')[0]
def _get_same_series(self, sizes, series_prefix,
exclude_shape_names=()):
shapes = [shape for shape in sizes
if shape.name.startswith(series_prefix)
and shape.name not in exclude_shape_names]
return self._sort_shapes(shapes=shapes)
def _get_same_family(self, sizes: List[Shape], current_shape, cloud,
exclude_shapes: List[Shape]):
if cloud == CloudEnum.CLOUD_AZURE.value:
family_prefix = current_shape.name.split('_')[0]
same_family = [shape for shape in sizes
if shape.name.startswith(family_prefix) and
shape.name not in exclude_shapes]
else:
same_family = [shape for shape in sizes
if shape.family_type == current_shape.family_type and
shape.name not in exclude_shapes]
return self._sort_shapes(shapes=same_family)
def _get_other_shapes(self, sizes: List[Shape],
exclude_shapes: List[Shape]):
shapes = [shape for shape in sizes if shape.name
not in exclude_shapes]
return self._sort_shapes(shapes=shapes)
@staticmethod
def _sort_shapes(shapes: List[Shape]):
return sorted(shapes, key=lambda x: (x.cpu, x.memory))
@staticmethod
def _get_recommended_shapes(recommendation: RecommendationHistory,
max_items: int = None) -> List[dict]:
recommended_shapes_data = recommendation.recommendation
if not recommended_shapes_data:
return []
if max_items and len(recommended_shapes_data) > max_items:
recommended_shapes_data = recommended_shapes_data[0:max_items]
return recommended_shapes_data