in docker/services/meta_service.py [0:0]
def to_adjustments(self, instance_meta: dict) \
-> List[RecommendationHistory]:
instance_meta = copy.deepcopy(instance_meta)
if not instance_meta or not isinstance(instance_meta, dict):
return []
postpone_settings = self.get_postponed_settings(
instance_meta=instance_meta)
if not postpone_settings:
return []
allowed_actions = RecommendationTypeEnum.list()
emulated_appliance_date = datetime.now(timezone.utc) \
- timedelta(days=180)
affected_actions = [action.get(self.postponed_for_actions_key)
for action in postpone_settings]
target_dts = [action.get(self.postponed_key)
for action in postpone_settings]
affected_actions = list(itertools.chain.from_iterable(affected_actions))
if set(affected_actions) == set(allowed_actions):
_LOG.debug(f'Instance processing is postponed for all actions.')
postponed_till_dt = min(target_dts)
raise ProcessingPostponedException(
postponed_till=postponed_till_dt.isoformat())
_LOG.debug(f'Next actions will be postponed for instance: '
f'\'{affected_actions}\'')
history_items = []
for postpone_setting in postpone_settings:
setting_dt = postpone_setting.get(self.postponed_key)
setting_actions = postpone_setting.get(
self.postponed_for_actions_key)
setting_actions = [item for item in setting_actions
if item in allowed_actions]
if not setting_dt or not self.is_postponed(
postponed_till_dt=setting_dt):
continue
for postponed_action in setting_actions:
item = RecommendationHistory(
recommendation_type=postponed_action,
instance_meta=instance_meta,
feedback_status=FeedbackStatusEnum.DONT_RECOMMEND,
feedback_dt=emulated_appliance_date
)
history_items.append(item)
return history_items