src/services/reports_bucket.py (291 lines of code) (raw):

import tempfile from abc import ABC, abstractmethod from datetime import datetime, timezone from pathlib import PurePosixPath from typing import TYPE_CHECKING, Optional from dateutil.relativedelta import relativedelta from helpers import urljoin, Version from helpers.constants import Cloud from helpers.time_helper import utc_datetime, week_number from models.batch_results import BatchResults from models.metrics import ReportMetrics from models.job import Job from services import SP if TYPE_CHECKING: from modular_sdk.models.tenant import Tenant from services.platform_service import Platform class ReportsBucketKeysBuilder(ABC): """ Paths must look like this: raw/EPAM Systems/AWS/31231231231/latest/0.json.gz raw/EPAM Systems/AWS/31231231231/latest/1.json.gz raw/EPAM Systems/AWS/31231231231/snapshots/2023-12-10-14/ raw/EPAM Systems/AWS/31231231231/jobs/standard/2023-12-10-14/b00649c9-2657-4ade-bd6b-f0f5924f6a50/result/ # noqa raw/EPAM Systems/AWS/31231231231/jobs/event-driven/2023-12-10-14/b00649c9-2657-4ade-bd6b-f0f5924f6a50/result/ # noqa raw/EPAM Systems/AWS/31231231231/jobs/event-driven/2023-12-10-14/b00649c9-2657-4ade-bd6b-f0f5924f6a50/difference/ # noqa """ date_delimiter = '-' prefix = 'raw/' on_demand = 'on-demand/' # any on-flight generated reports snapshots = 'snapshots/' latest = 'latest/' jobs = 'jobs/' standard = 'standard/' ed = 'event-driven/' result = 'result/' difference = 'difference/' @staticmethod def urljoin(*args) -> str: return urljoin(*args) + '/' # delimiter @staticmethod def datetime(_from: datetime | None = None) -> str: """ Builds datetime part of a path with 1 hour precision in UTC. By default, uses the current datetime. '2023-11-02-09/' :return: """ _from = _from or utc_datetime() _from = _from.astimezone(timezone.utc) # just in case return _from.strftime( ReportsBucketKeysBuilder.date_delimiter.join( ('%Y', '%m', '%d', '%H') ) + '/' ) @property @abstractmethod def cloud(self) -> Cloud: """ :return: """ @abstractmethod def job_result(self, job: 'Job') -> str: """ Builds s3 key for a concrete job :param job: :return: """ @abstractmethod def ed_job_result(self, br: 'BatchResults') -> str: """ Builds s3 key for a concrete ed job :param br: :return: """ @abstractmethod def ed_job_difference(self, br: 'BatchResults') -> str: """ Builds s3 key for a concrete ed job difference :param br: :return: """ @abstractmethod def latest_key(self) -> str: """ Builds s3 key to the latest state :return: """ @abstractmethod def snapshots_folder(self) -> str: """ Returns a path to a folder with snapshots """ def snapshot_key(self, date: datetime) -> str: """ Returns a path to snapshot for the given date. You can definitely use this key to write files. But if you want to read the data you should get a key with the nearest older date :param date: :return: """ return self.urljoin(self.snapshots_folder(), self.datetime(date)) def nearest_snapshot_key(self, date: datetime) -> str | None: """ Returns the nearest to given date existing snapshot key """ # todo can be cached prefixes = SP.s3.common_prefixes( bucket=SP.environment_service.default_reports_bucket_name(), delimiter='/', prefix=self.snapshots_folder(), ) to_check = self.urljoin(self.snapshots_folder(), self.datetime(date)) lower = None for prefix in prefixes: if prefix <= to_check: lower = prefix elif not lower: lower = prefix break else: break return lower @staticmethod def _random_filename() -> str: """ Each time returns a random name for a file :return: """ with tempfile.NamedTemporaryFile() as file: return PurePosixPath(file.name).name @classmethod def one_time_on_demand(cls) -> str: """ Generates random one time :return: """ return cls.on_demand + cls._random_filename() class TenantReportsBucketKeysBuilder(ReportsBucketKeysBuilder): def __init__(self, tenant: 'Tenant'): self._tenant = tenant @property def cloud(self) -> Cloud: """ Only AWS|AZURE|GOOGLE currently :return: """ return Cloud[self._tenant.cloud.upper()] def job_result(self, job: 'Job') -> str: assert ( job.tenant_name == self._tenant.name ), f'Job tenant must be {self._tenant.name}' return self.urljoin( self.prefix, self._tenant.customer_name, self.cloud.value, self._tenant.project, self.jobs, self.standard, self.datetime(utc_datetime(job.submitted_at)), job.id, self.result, ) def ed_job_result(self, br: 'BatchResults') -> str: assert ( br.tenant_name == self._tenant.name ), f'Job tenant must be {self._tenant.name}' return self.urljoin( self.prefix, self._tenant.customer_name, self.cloud.value, self._tenant.project, self.jobs, self.ed, self.datetime(utc_datetime(br.submitted_at)), br.id, self.result, ) def ed_job_difference(self, br: 'BatchResults') -> str: assert ( br.tenant_name == self._tenant.name ), f'Job tenant must be {self._tenant.name}' return self.urljoin( self.prefix, self._tenant.customer_name, self.cloud.value, self._tenant.project, self.jobs, self.ed, self.datetime(utc_datetime(br.submitted_at)), br.id, self.difference, ) def latest_key(self) -> str: return self.urljoin( self.prefix, self._tenant.customer_name, self.cloud.value, self._tenant.project, self.latest, ) def snapshots_folder(self) -> str: return self.urljoin( self.prefix, self._tenant.customer_name, self.cloud.value, self._tenant.project, self.snapshots, ) class PlatformReportsBucketKeysBuilder(ReportsBucketKeysBuilder): def __init__(self, platform: 'Platform'): self._platform = platform @property def cloud(self) -> Cloud: """ Currently, the only platform that we support is KUBERNETES """ return Cloud.KUBERNETES def job_result(self, job: 'Job') -> str: assert ( job.platform_id == self._platform.id ), f'Job platform must be {self._platform.id}' return self.urljoin( self.prefix, self._platform.customer, self.cloud.value, self._platform.platform_id, self.jobs, self.standard, self.datetime(utc_datetime(job.submitted_at)), job.id, ) def ed_job_result(self, br: 'BatchResults') -> str: raise NotImplementedError('Event-driven is not available for platform') def ed_job_difference(self, br: 'BatchResults') -> str: raise NotImplementedError('Event-driven is not available for platform') def latest_key(self) -> str: return self.urljoin( self.prefix, self._platform.customer, self.cloud.value, self._platform.platform_id, self.latest, ) def snapshots_folder(self) -> str: return self.urljoin( self.prefix, self._platform.customer, self.cloud.value, self._platform.platform_id, self.snapshots, ) class StatisticsBucketKeysBuilder: _statistics = 'job-statistics/' _standard = 'standard/' _ed = 'event-driven/' _statistics_file = 'statistics.json' _diagnostic_report_file = 'diagnostic_report.json' _report_statistics = 'report-statistics/' _tenant_statistics = 'tenant-statistics/' _rules = 'rules/' _diagnostic = 'diagnostic/' @classmethod def job_statistics(cls, job: Job | BatchResults) -> str: if isinstance(job, Job): return urljoin( cls._statistics, cls._standard, job.id, cls._statistics_file ) return urljoin(cls._statistics, cls._ed, job.id, cls._statistics_file) @classmethod def report_statistics(cls, now: datetime, customer: str) -> str: return urljoin( cls._report_statistics, cls._diagnostic, customer, now.strftime( ReportsBucketKeysBuilder.date_delimiter.join(('%Y', '%m')) + '/' ), cls._diagnostic_report_file, ) @classmethod def tenant_statistics( cls, now: datetime, tenant: Optional['Tenant'] = None, customer: Optional[str] = None, ) -> str: if customer: return urljoin( cls._tenant_statistics, cls._rules, customer, now.strftime( ReportsBucketKeysBuilder.date_delimiter.join(('%Y', '%m')) + '/' ), ) elif tenant: return urljoin( cls._tenant_statistics, cls._rules, tenant.customer_name, now.strftime( ReportsBucketKeysBuilder.date_delimiter.join(('%Y', '%m')) + '/' ), tenant.cloud, tenant.project, str(week_number(now)) + '.json', ) return urljoin(cls._tenant_statistics, cls._rules) @classmethod def xray_log(cls, job_id: str) -> str: now = utc_datetime() return urljoin( 'xray', 'executor', now.year, now.month, now.day, f'{job_id}.log' ) class ReportMetricsBucketKeysBuilder: __slots__ = () date_delimiter = '-' prefix = 'metrics/' data = 'data.json.gz' @staticmethod def datetime(end: datetime) -> str: """ Builds datetime part of a path :return: """ end = end.astimezone(timezone.utc) # just in case return end.strftime( ReportMetricsBucketKeysBuilder.date_delimiter.join( ('%Y', '%m', '%d', '%H', '%M', '%S', '%f') ) ) @classmethod def metrics_key(cls, item: ReportMetrics) -> str: items = [] if pr := item.project: items.append(pr) if cl := item.cloud: items.append(cl.value) if t := item.tenant: items.append(t) if r := item.region: items.append(r) return urljoin( cls.prefix, item.customer, item.type.value, *items, cls.datetime(utc_datetime(item.end)), cls.data, ) class ReportMetaBucketsKeys: __slots__ = () prefix = 'meta/' data = 'data.gz' @classmethod def meta_key(cls, license_key: str, version: Version) -> str: return urljoin(cls.prefix, license_key, version.to_str(), cls.data)