docker/executor.py (323 lines of code) (raw):

import os.path from modular_sdk.models.parent import Parent from commons.constants import (JOB_STEP_INITIALIZATION, TENANT_LICENSE_KEY_ATTR, PROFILE_LOG_PATH) from commons.exception import ExecutorException, LicenseForbiddenException from commons.log_helper import get_logger from commons.profiler import profiler from models.algorithm import Algorithm from models.job import Job, JobStatusEnum, JobTenantStatusEnum from models.license import License from models.parent_attributes import ParentMeta from models.storage import Storage from services import SERVICE_PROVIDER from services.algorithm_service import AlgorithmService from services.environment_service import EnvironmentService from services.job_service import JobService from services.license_manager_service import LicenseManagerService from services.license_service import LicenseService from services.metrics_service import MetricsService from services.mocked_data_service import MockedDataService from services.os_service import OSService from services.recomendation_service import RecommendationService from services.reformat_service import ReformatService from services.resize.resize_service import ResizeService from services.rightsizer_application_service import \ RightSizerApplicationService from services.rightsizer_parent_service import RightSizerParentService from services.schedule.schedule_service import ScheduleService from services.setting_service import SettingsService from services.storage_service import StorageService algorithm_service: AlgorithmService = SERVICE_PROVIDER.algorithm_service() storage_service: StorageService = SERVICE_PROVIDER.storage_service() job_service: JobService = SERVICE_PROVIDER.job_service() environment_service: EnvironmentService = SERVICE_PROVIDER.environment_service() os_service: OSService = SERVICE_PROVIDER.os_service() metrics_service: MetricsService = SERVICE_PROVIDER.metrics_service() schedule_service: ScheduleService = SERVICE_PROVIDER.schedule_service() resize_service: ResizeService = SERVICE_PROVIDER.resize_service() reformat_service: ReformatService = SERVICE_PROVIDER.reformat_service() recommendation_service: RecommendationService = SERVICE_PROVIDER. \ recommendation_service() settings_service: SettingsService = SERVICE_PROVIDER.settings_service() parent_service: RightSizerParentService = SERVICE_PROVIDER.parent_service() mocked_data_service: MockedDataService = SERVICE_PROVIDER.mocked_data_service() application_service: RightSizerApplicationService = SERVICE_PROVIDER. \ application_service() license_service: LicenseService = SERVICE_PROVIDER.license_service() license_manager_service: LicenseManagerService = SERVICE_PROVIDER. \ license_manager_service() _LOG = get_logger('r8s-executor') JOB_ID = environment_service.get_batch_job_id() SCAN_FROM_DATE = environment_service.get_scan_from_date() SCAN_TO_DATE = environment_service.get_scan_to_date() PARENT_ID = environment_service.get_licensed_parent_id() def set_job_fail_reason(exception: Exception): if isinstance(exception, ExecutorException): reason = str(exception) else: reason = f'{type(exception).__name__}: {str(exception)}' _LOG.debug(f'Setting job {JOB_ID} fail reason to \'{reason}\'') job = job_service.get_by_id(object_id=JOB_ID) if job: job.status = JobStatusEnum.JOB_FAILED_STATUS.value job.fail_reason = reason job.save() _LOG.debug('Job reason was saved.') @profiler(execution_step=f'lm_submit_job') def submit_licensed_job(parent: Parent, tenant_name: str, license_: License): customer = parent.customer_id tenant_license_key = license_.customers.get(customer, {}).get( TENANT_LICENSE_KEY_ATTR) algorithm_name = parent.meta.algorithm algorithm_map = { tenant_license_key: algorithm_name } try: licensed_job = license_manager_service.instantiate_licensed_job_dto( job_id=f'{JOB_ID}:{tenant_name}', customer=customer, tenant=tenant_name, algorithm_map=algorithm_map ) except Exception as e: _LOG.warning(f'Job execution could not be granted ' f'for tenant {tenant_name}: {e}.') raise LicenseForbiddenException( tenant_name=tenant_name ) return licensed_job def process_tenant_instances(metrics_dir, reports_dir, input_storage, output_storage, parent_meta: ParentMeta, licensed_parent: Parent, algorithm: Algorithm, license_: License, customer: str, tenant: str, job: Job): _LOG.info(f'Downloading metrics from storage \'{input_storage.name}\', ' f'for tenant: {tenant}') cloud = licensed_parent.meta.cloud.lower() storage_service.download_metrics( data_source=input_storage, output_path=metrics_dir, scan_customer=licensed_parent.customer_id, scan_clouds=[cloud], scan_tenants=[tenant], scan_from_date=SCAN_FROM_DATE, scan_to_date=SCAN_TO_DATE, max_days=algorithm.recommendation_settings.max_days) tenant_folder_path = os.path.join( metrics_dir, customer, cloud, tenant) _LOG.info(f'Loading instances meta for tenant') instance_meta_mapping = metrics_service.read_meta( metrics_folder=tenant_folder_path) _LOG.info(f'Merging metric files by date') metrics_service.merge_metric_files( metrics_folder_path=tenant_folder_path, algorithm=algorithm) _LOG.info(f'Extracting tenant metric files') metric_file_paths = os_service.extract_metric_files( algorithm=algorithm, metrics_folder_path=tenant_folder_path) _LOG.debug(f'Reformatting metrics to relative metric values') for metric_file_path in metric_file_paths.copy(): try: _LOG.debug(f'Validating metric file: \'{metric_file_path}\'') metrics_service.validate_metric_file( algorithm=algorithm, metric_file_path=metric_file_path) _LOG.debug(f'Reformatting metric file: \'{metric_file_path}\'') reformat_service.to_relative_values( metrics_file_path=metric_file_path, algorithm=algorithm) except Exception as e: metric_file_paths.remove(metric_file_path) recommendation_service.dump_error_report( reports_dir=reports_dir, metric_file_path=metric_file_path, exception=e) if environment_service.is_debug(): _LOG.info(f'Searching for instances to replace with mocked data') mocked_data_service.process( instance_meta_mapping=instance_meta_mapping, metric_file_paths=metric_file_paths ) _LOG.debug(f'Submitting licensed job for tenant {tenant}') licensed_job_data = submit_licensed_job( parent=licensed_parent, license_=license_, tenant_name=tenant) _LOG.debug(f'Syncing licensed algorithm from license ' f'{license_.license_key}') algorithm_service.update_from_licensed_job( algorithm=algorithm, licensed_job=licensed_job_data ) _LOG.info(f'Tenant {tenant} metric file paths to ' f'process: \'{metric_file_paths}\'') for index, metric_file_path in enumerate(metric_file_paths, start=1): _LOG.debug( f'Processing {index}/{len(metric_file_paths)} instance: ' f'\'{metric_file_path}\'') result = recommendation_service.process_instance( metric_file_path=metric_file_path, algorithm=algorithm, reports_dir=reports_dir, instance_meta_mapping=instance_meta_mapping, parent_meta=parent_meta ) _LOG.debug(f'Result: {result}') _LOG.debug(f'Uploading job results to storage \'{output_storage.name}\'') storage_service.upload_job_results( job_id=JOB_ID, results_folder_path=reports_dir, storage=output_storage, tenant=tenant ) _LOG.info(f'Setting tenant status to "SUCCEEDED"') job_service.set_licensed_job_status( job=job, tenant=tenant, status=JobTenantStatusEnum.TENANT_SUCCEEDED_STATUS ) def main(): _LOG.debug(f'Creating directories') work_dir, metrics_dir, reports_dir = \ os_service.create_work_dirs(job_id=JOB_ID) _LOG.debug(f'Describing job with id \'{JOB_ID}\'') job: Job = job_service.get_by_id(object_id=JOB_ID) if not job: _LOG.error(f'Job with id \'{JOB_ID}\' does not exist') raise ExecutorException( step_name=JOB_STEP_INITIALIZATION, reason=f'Job with id \'{JOB_ID}\' does not exist' ) _LOG.debug(f'Setting job status to RUNNING') job = job_service.set_status( job=job, status=JobStatusEnum.JOB_RUNNING_STATUS.value) scan_tenants = job_service.get_scan_tenants(job=job) licensed_parent_id = job.parent_id licensed_parent = parent_service.get_parent_by_id( parent_id=licensed_parent_id) _LOG.debug(f'Parent: \'{licensed_parent_id}\'') if not licensed_parent or licensed_parent.is_deleted: _LOG.error(f'Parent \'{licensed_parent_id}\' does not exist') raise ExecutorException( step_name=JOB_STEP_INITIALIZATION, reason=f'Parent \'{licensed_parent_id}\' does not exist' ) licensed_parent_meta = parent_service.get_parent_meta( parent=licensed_parent) license_key = licensed_parent_meta.license_key application_id = licensed_parent.application_id application = application_service.get_application_by_id( application_id=application_id) if not application or application.is_deleted: _LOG.error(f'Application \'{licensed_parent_id}\' does not exist') raise ExecutorException( step_name=JOB_STEP_INITIALIZATION, reason=f'Application \'{licensed_parent_id}\' does not exist' ) application_meta = application_service.get_application_meta( application=application ) algorithm_name = licensed_parent_meta.algorithm algorithm = algorithm_service.get_by_name(name=algorithm_name) _LOG.debug(f'Algorithm: \'{algorithm_name}\'') if not algorithm: _LOG.error(f'Application \'{application_id}\' does not have algorithm ' f'specified') raise ExecutorException( step_name=JOB_STEP_INITIALIZATION, reason=f'Application \'{application_id}\' does not have algorithm ' f'specified' ) input_storage_name = application_meta.input_storage _LOG.debug(f'Input storage: \'{input_storage_name}\'') input_storage: Storage = storage_service.get_by_name( name=input_storage_name) if not input_storage: _LOG.error(f'Input storage \'{input_storage_name}\' does not exist.') raise ExecutorException( step_name=JOB_STEP_INITIALIZATION, reason=f'Input storage \'{input_storage_name}\' does not exist.' ) output_storage_name = application_meta.output_storage _LOG.debug(f'Output storage: \'{output_storage_name}\'') output_storage: Storage = storage_service.get_by_name( name=output_storage_name) if not output_storage: _LOG.error(f'Output storage \'{output_storage_name}\' does not exist.') raise ExecutorException( step_name=JOB_STEP_INITIALIZATION, reason=f'Output storage \'{output_storage_name}\' does not exist.' ) _LOG.info(f'Resolving RIGHTSIZER parent for license') parent = parent_service.resolve( licensed_parent=licensed_parent, scan_tenants=scan_tenants ) if not parent: _LOG.error(f'Can\'t resolve RIGHTSIZER parent for license ' f'\'{licensed_parent_id}\'. Shape rules won\'t be applied.') parent_meta = ParentMeta() else: parent_meta = parent_service.get_parent_meta(parent=parent) _LOG.debug(f'Describing License \'{license_key}\'') license_: License = license_service.get_license(license_id=license_key) for tenant in scan_tenants: try: _LOG.info(f'Processing tenant {tenant}') process_tenant_instances( metrics_dir=metrics_dir, reports_dir=reports_dir, input_storage=input_storage, output_storage=output_storage, parent_meta=parent_meta, licensed_parent=licensed_parent, algorithm=algorithm, license_=license_, customer=parent.customer_id, tenant=tenant, job=job ) except LicenseForbiddenException as e: _LOG.error(e) job_service.set_licensed_job_status( job=job, tenant=tenant, status=JobTenantStatusEnum.TENANT_FAILED_STATUS ) except Exception as e: _LOG.error(f'Unexpected error occurred while processing ' f'tenant {tenant}: {e}') job_service.set_licensed_job_status( job=job, tenant=tenant, status=JobTenantStatusEnum.TENANT_FAILED_STATUS ) _LOG.debug(f'Job {JOB_ID} has finished successfully') _LOG.debug(f'Setting job state to SUCCEEDED') job_service.set_status(job=job, status=JobStatusEnum.JOB_SUCCEEDED_STATUS.value) if os.path.exists(PROFILE_LOG_PATH): _LOG.debug(f'Uploading profile log') storage_service.upload_profile_log( storage=output_storage, job_id=JOB_ID, file_path=PROFILE_LOG_PATH ) _LOG.debug(f'Cleaning workdir') os_service.clean_workdir(work_dir=work_dir) if __name__ == '__main__': try: main() except Exception as exception: set_job_fail_reason(exception=exception) raise