docker/services/os_service.py (87 lines of code) (raw):

import json import os import pathlib import shutil from glob import glob from commons.constants import JOB_STEP_VALIDATE_METRICS, CSV_EXTENSION from commons.exception import ExecutorException from commons.log_helper import get_logger from models.algorithm import Algorithm _LOG = get_logger('r8s-os-service') class OSService: def __init__(self): self.r8s_workdir = str(pathlib.Path(__file__).parent.parent.absolute()) def create_work_dirs(self, job_id): _LOG.debug(f'Creating job directory') work_dir = self.create_workdir(job_id=job_id) _LOG.debug(f'Creating metrics directory') metrics_dir = self.create_job_dir(work_dir=work_dir, dir_name='metrics') _LOG.debug(f'Creating reports directory') reports_dir = self.create_job_dir(work_dir=work_dir, dir_name='reports') _LOG.debug(f'Job dir: \'{work_dir}\', metrics dir: \'{metrics_dir}\', ' f'reports dir: \'{reports_dir}\'') return work_dir, metrics_dir, reports_dir def create_workdir(self, job_id): temp_dir = os.path.join(self.r8s_workdir, job_id) os.chdir(str(pathlib.Path(temp_dir).parent)) pathlib.Path(temp_dir).mkdir(exist_ok=True) return temp_dir @staticmethod def create_job_dir(work_dir, dir_name): temp_dir = os.path.join(work_dir, dir_name) os.chdir(str(pathlib.Path(temp_dir).parent)) pathlib.Path(temp_dir).mkdir(exist_ok=True) return temp_dir @staticmethod def clean_workdir(work_dir): shutil.rmtree(work_dir) _LOG.debug(f'Workdir for {work_dir} successfully cleaned') @staticmethod def write_file(content, file_path): if isinstance(content, dict): content = json.dumps(content) with open(file_path, 'w') as f: f.write(content) return file_path @staticmethod def extract_metric_files(algorithm: Algorithm, metrics_folder_path): if not algorithm.required_data_attributes: _LOG.debug(f'No required data attributes specified for algorithm ' f'\'{algorithm.name}\'.') return metric_files = [y for x in os.walk(metrics_folder_path) for y in glob(os.path.join(x[0], '*.csv'))] if not metric_files: _LOG.error(f'No metric files found for ' f'algorithm \'{algorithm.name}\'') raise ExecutorException( step_name=JOB_STEP_VALIDATE_METRICS, reason=f'No metric files found' ) return metric_files @staticmethod def path_to_instance_id(file_path): file_name = file_path.split(os.sep) if len(file_name) < 1: _LOG.warning(f'Invalid file path provided: \'{file_path}\'') return file_name = file_name[-1] if not file_name.endswith(CSV_EXTENSION): _LOG.warning(f'Not a metric file: \'{file_path}\'') return instance_id = file_name.replace(CSV_EXTENSION, '') return instance_id @staticmethod def path_to_cloud(file_path): return file_path.split(os.sep)[-5] @staticmethod def path_to_tenant(file_path): return file_path.split(os.sep)[-4] def group_by_tenant(self, file_paths: list): tenant_metrics_map = {} for file_path in file_paths: tenant = self.path_to_tenant(file_path=file_path) tenant_metrics_map[tenant] = file_path return tenant_metrics_map