in src/services/health_checks/operation_mode_check.py [0:0]
def check(self):
_LOG.debug(f'Listing applications')
applications = list(self.application_service.list(
_type=MAESTRO_RIGHTSIZER_APPLICATION_TYPE, deleted=False))
parents = []
for application in applications:
application_parents = self.parent_service.list_application_parents(
application_id=application.application_id,
type_=RIGHTSIZER_PARENT_TYPE,
only_active=True
)
parents.extend(application_parents)
if not applications or not parents:
_LOG.warning(f'No active parents/applications found')
result = CheckCollectionResult(
id='NONE',
type=CHECK_TYPE_OPERATION_MODE
)
return result.as_dict()
pairs = {
CLOUD_AWS: [],
CLOUD_AZURE: [],
CLOUD_GOOGLE: []
}
for parent in parents:
if not parent.cloud or parent.cloud not in pairs:
continue
related_application = [app for app in applications
if parent.application_id ==
app.application_id]
if related_application and len(related_application) == 1:
related_application = related_application[0]
else:
related_application = None
pairs[parent.cloud].append((related_application, parent))
result = []
for cloud, cloud_pairs in pairs.items():
for application, parent in cloud_pairs:
checks = []
for check_instance in self.checks:
check_result = check_instance.check(application=application,
parent=parent)
checks.append(check_result)
operation_mode_result = CheckCollectionResult(
id=parent.parent_id,
type=CHECK_TYPE_OPERATION_MODE,
details=checks
)
result.append(operation_mode_result.as_dict())
common_check_results = []
for cloud, cloud_pairs in pairs.items():
if not cloud_pairs:
continue
for check_instance in self.singe_checks:
check_result = check_instance.check(pairs=cloud_pairs)
common_check_results.append(check_result)
common_check_result = CheckCollectionResult(
id='COMMON',
type=CHECK_TYPE_OPERATION_MODE,
details=common_check_results
)
result.append(common_check_result.as_dict())
return result