docker/services/customer_preferences_service.py (130 lines of code) (raw):

import re from typing import List from commons.log_helper import get_logger from commons.constants import CLOUD_ATTR from models.parent_attributes import ParentMeta from models.shape import Shape _LOG = get_logger('r8s-customer-preferences-filter') FIELD_ATTR = 'field' VALUE_ATTR = 'value' ACTION_ATTR = 'action' CONDITION_ATTR = 'condition' ALLOW = 'allow' DENY = 'deny' PRIORITIZE = 'prioritize' CONDITION_MATCH = 'match' CONDITION_CONTAINS = 'contains' CONDITION_NOT_CONTAINS = 'not_contains' CONDITION_EQUAL = 'equal' class CustomerPreferencesService: def __init__(self): self.condition_processor_mapping = { CONDITION_MATCH: self._match_condition, CONDITION_CONTAINS: self._contains_condition, CONDITION_NOT_CONTAINS: self._not_contains_condition, CONDITION_EQUAL: self._equal_condition, } def get_allowed_instance_types(self, cloud: str, parent_meta: ParentMeta, instances_data: List[Shape]): shape_rules = parent_meta.shape_rules _LOG.debug(f'Filtering shape rules for cloud: {cloud}') shape_rules = [rule for rule in shape_rules if rule.get(CLOUD_ATTR) == cloud] _LOG.debug(f'Rule to apply: {shape_rules}') allow_filters = self.filter_by_action( shape_rules=shape_rules, action=ALLOW ) deny_filters = self.filter_by_action( shape_rules=shape_rules, action=DENY ) allowed_instances = self.process_allow_filters( instances_data=instances_data, filters=allow_filters ) allowed_instances = self.process_exclude_filters( instances_data=allowed_instances, filters=deny_filters ) return allowed_instances def process_allow_filters(self, instances_data, filters: list = None): if not filters: return instances_data allowed_instances = [] for filter_ in filters: matching_instances = self._find_matching( instances_data=instances_data, filter_=filter_ ) allowed_instances.extend(matching_instances) return allowed_instances def process_exclude_filters(self, instances_data, filters: list = None): if not filters: return instances_data names_to_exclude = set() for filter_ in filters: matching_instances = self._find_matching( instances_data=instances_data, filter_=filter_ ) filter_names_to_exclude = [item.name for item in matching_instances] names_to_exclude.update(filter_names_to_exclude) return [instance for instance in instances_data if instance.name not in names_to_exclude] def process_priority_filters(self, instances_data, shape_rules): priority_filter = self.filter_by_action(shape_rules=shape_rules, action=PRIORITIZE) if not priority_filter: return instances_data priority_instances = [] for filter_ in priority_filter: matching_instances = self._find_matching( instances_data=instances_data, filter_=filter_ ) priority_instances.extend(matching_instances) return priority_instances def _find_matching(self, instances_data, filter_): matching_instances = [] for instance in instances_data: matches = self._instance_match( instance_data=instance, filter_=filter_) if matches: matching_instances.append(instance) return matching_instances def _instance_match(self, instance_data, filter_): filter_key = filter_.get(FIELD_ATTR) filter_value = filter_.get(VALUE_ATTR) instance_value = getattr(instance_data, filter_key, None) condition = filter_.get(CONDITION_ATTR) if not filter_value or not instance_value: return False processor = self.condition_processor_mapping.get(condition) if not processor: return False return processor(instance_value=instance_value, value=filter_value) @staticmethod def filter_by_action(shape_rules: list, action): if not shape_rules: return [] return [f for f in shape_rules if f.get(ACTION_ATTR) == action] @staticmethod def _match_condition(instance_value: str, value: str): value = value.replace('.', r'\.').replace('*', '.+') return bool(re.match(value, instance_value)) @staticmethod def _contains_condition(instance_value: str, value: str): if not value or not isinstance(value, str): return False return value.lower() in instance_value.lower() @staticmethod def _not_contains_condition(instance_value: str, value: str): if not value or not isinstance(value, str): return False return value.lower() not in instance_value.lower() @staticmethod def _equal_condition(instance_value: str, value: str): if not value or not isinstance(value, str): return False return value.lower() == instance_value.lower()