osci/datalake/base.py (218 lines of code) (raw):

"""Copyright since 2020, EPAM Systems This file is part of OSCI. OSCI is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. OSCI is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with OSCI. If not, see <http://www.gnu.org/licenses/>.""" from typing import Union, List, Iterable, Tuple, Iterator from datetime import datetime from pathlib import Path from io import BytesIO import abc import re import json import pandas as pd from .schemas import LandingSchemas, StagingSchemas, PublicSchemas from osci.utils import normalize_company class DatePeriodType: DTD = 'DTD' MTD = 'MTD' YTD = 'YTD' MBM = 'MBM' all = frozenset({DTD, MTD, YTD}) class BaseDataLakeArea(metaclass=abc.ABCMeta): """Base class for all data lake areas that should implement given interface.""" def add_fs_prefix(self, path: Union[str, Path]) -> str: raise NotImplementedError() def get_spark_paths(self, paths: Iterable[Union[str, Path]]) -> List[str]: return [self.add_fs_prefix(path) for path in paths] @property def _github_events_commits_base(self): return 'github/events/push' @staticmethod def get_excel_writer() -> Tuple[pd.ExcelWriter, BytesIO]: buffer = BytesIO() return pd.ExcelWriter(buffer, engine='xlsxwriter'), buffer def write_bytes_to_file(self, path: Union[str, Path], buffer: BytesIO): raise NotImplementedError() class BaseLandingArea(BaseDataLakeArea, abc.ABC): schemas = LandingSchemas @property def _github_repositories_base(self): return 'github/repository' def save_push_events_commits(self, push_event_commits, date: datetime): raise NotImplementedError() def get_hour_push_events_commits(self, date: datetime): raise NotImplementedError() def get_daily_push_events_commits(self, date: datetime): raise NotImplementedError() def save_repositories(self, df: pd.DataFrame, date: datetime): raise NotImplementedError() def get_repositories(self, date: datetime) -> pd.DataFrame: raise NotImplementedError() class BaseStagingArea(BaseDataLakeArea, abc.ABC): CONF_AREA_DIR = 'configuration' schemas = StagingSchemas def get_push_events_commits(self, to_date: datetime, date_period_type: str = DatePeriodType.YTD, from_date: datetime = None, company=None): raise NotImplementedError() def save_push_events_commits(self, push_event_commits: pd.DataFrame, company_name: str, date: datetime): raise NotImplementedError() def save_private_push_events_commits(self, push_event_commits: pd.DataFrame, company_name: str, date: datetime): raise NotImplementedError() def get_push_events_commits_spark_paths(self, to_date: datetime, date_period_type: str = DatePeriodType.YTD, from_date: datetime = None, company: str = None) -> List[str]: raise NotImplementedError() @staticmethod def _get_file_name(path) -> str: raise NotImplementedError() push_commits_filename_pattern = r'^(?P<company>(\d|\w|_)+)-(?P<date>\d{4}-\d{2}-\d{2}).parquet$' def filter_push_commit_file(self, file_name: str, to_date: datetime, from_date: datetime = None): match = re.match(self.push_commits_filename_pattern, file_name) if match: date = datetime.strptime(match.group('date'), '%Y-%m-%d') return date <= to_date and (from_date is None or date >= from_date) return None def get_push_events_commits_paths(self, to_date: datetime, date_period_type: str = DatePeriodType.YTD, from_date: datetime = None, company=None) -> Iterable[str]: year, month, day = None, None, None if date_period_type == DatePeriodType.DTD: if from_date is None: raise ValueError(f'`from_date` must be non None for {DatePeriodType.DTD} passed: {from_date}') if to_date < from_date: raise ValueError(f'`from_date` {from_date} must be less than `to_date` {to_date}') if to_date.year == from_date.year: year = to_date.year if to_date.month == from_date.month: month = to_date.month if to_date.day == from_date.day: day = to_date.day elif date_period_type == DatePeriodType.MTD: year, month = to_date.year, to_date.month elif date_period_type == DatePeriodType.YTD: year = to_date.year else: raise ValueError(f'Unsupported date period type: {date_period_type} not in {DatePeriodType.all}') return filter( lambda path: self.filter_push_commit_file(self._get_file_name(path), from_date=from_date, to_date=to_date), self._get_date_partitioned_paths(dir_path=self._github_events_commits_base, year=year, month=month, day=day, company=company) ) def _get_date_partitioned_paths(self, dir_path: str, year: Union[str, int] = None, month: Union[str, int] = None, day: Union[str, int] = None, company: str = None) -> Iterable[str]: raise NotImplementedError() def _get_configuration_file_path(self, path: str) -> str: raise NotImplementedError() def load_projects_filter(self): raise NotImplementedError() _repositories_file_format = 'parquet' def _get_repositories_file_name(self, date: datetime) -> str: return f'repository-{date:%Y-%m-%d}.{self._repositories_file_format}' def get_repositories_path(self, date: datetime) -> Union[Path, str]: raise NotImplementedError() def get_repositories_spark_path(self, date: datetime) -> Union[Path, str]: return self.add_fs_prefix(path=self.get_repositories_path(date=date)) def get_repositories(self, date: datetime) -> pd.DataFrame: raise NotImplementedError() def save_repositories(self, df: pd.DataFrame, date: datetime): raise NotImplementedError() @staticmethod def _get_raw_push_events_commits_file_name(date: datetime, company: str) -> str: return f'{normalize_company(company)}-{date:%Y-%m-%d}.parquet' def get_raw_push_events_commits_path(self, date: datetime, company: str) -> Union[Path, str]: raise NotImplementedError() def get_raw_push_events_commits(self, path: Union[str, Path]) -> pd.DataFrame: raise NotImplementedError() def save_raw_push_events_commits(self, push_event_commits: pd.DataFrame, company_name: str, date: datetime): raise NotImplementedError() def get_daily_raw_push_events_commits_paths(self, date: datetime) -> Iterator[Union[str, Path]]: raise NotImplementedError() def get_daily_raw_push_events_commits(self, date: datetime) -> Iterator[Tuple[str, pd.DataFrame]]: for path in self.get_daily_raw_push_events_commits_paths(date=date): match = re.match(self.push_commits_filename_pattern, self._get_file_name(path)) if match: company = match.group('company') df = self.get_raw_push_events_commits(path) if not df.empty: yield company, df def get_union_daily_raw_push_events_commits(self, date: datetime) -> pd.DataFrame: result_df = pd.DataFrame() for path in self.get_daily_raw_push_events_commits_paths(date=date): df = self.get_raw_push_events_commits(path) if not df.empty: result_df = pd.concat([result_df, df]) return result_df class BasePublicArea(BaseDataLakeArea, abc.ABC): schemas = PublicSchemas def get_report_path(self, report_name: str, date: datetime, company: str = None) -> str: raise NotImplementedError() def save_report(self, report_df: pd.DataFrame, report_name: str, date: datetime, company: str = None): raise NotImplementedError() def get_report(self, report_name: str, date: datetime, company: str = None) -> pd.DataFrame: raise NotImplementedError() def get_report_url(self, report_name: str, date: datetime, company: str = None): raise NotImplementedError() def get_reports_for_last_days_of_month(self, report_name: str, date: datetime, company: str = None): raise NotImplementedError() def get_osci_change_excel_report_path(self, base_report_name: str, report_dir_name: str, date: datetime): raise NotImplementedError() def get_osci_change_excel_report_url(self, base_report_name: str, date: datetime, report_dir_name: str): raise NotImplementedError @staticmethod def get_osci_change_excel_report_name(base_report_name: str, date: datetime): return f'{date:%Y-%m-%d}_{base_report_name}.xlsx' def save_solutions_hub_osci_change_report_view(self, change_report: pd.DataFrame, report_dir_name: str, old_date: datetime, new_date: datetime): writer, buffer = self.get_excel_writer() sheet_name = 'OSCI_Ranking' change_report.to_excel(writer, sheet_name=sheet_name, startrow=2, startcol=1) worksheet = writer.sheets[sheet_name] worksheet.write(0, 1, old_date.strftime('%Y (differences from %B, %d ') + new_date.strftime('to %B, %d)')) worksheet.write(3, 9, '1 Active Contributors are those who authored 10 or more pushes in the time period') worksheet.write(4, 9, '2 Total Community counts those who authored 1 or more pushes in the time period') worksheet.write(5, 9, '* Changes are relative to the metrics at the end of the previous month') worksheet.write(6, 9, 'The top 100 is calculated using the Active Contributors metric') worksheet.write(7, 9, 'If two companies have equal Active Contributors, ' 'their relative positions are determined by Total Community') writer.save() self.write_bytes_to_file(path=self.get_osci_change_excel_report_path(base_report_name=sheet_name, report_dir_name=report_dir_name, date=new_date), buffer=buffer) def save_email(self, email_body: str, date: datetime): raise NotImplementedError() def get_companies_contributors_repository_commits_path(self, date: datetime) -> Union[str, Path]: raise NotImplementedError() def get_companies_contributors_repository_commits_spark_path(self, date) -> str: return self.add_fs_prefix(path=self.get_companies_contributors_repository_commits_path(date=date)) def save_companies_contributors_repository_commits(self, df: pd.DataFrame, date: datetime): raise NotImplementedError() def get_companies_contributors_repository_commits(self, date: datetime) -> pd.DataFrame: raise NotImplementedError() class BaseWebArea(abc.ABC): _osci_ranking_dir_name = 'osci-ranking' _osci_ranking_monthly_dir_name = 'monthly' def _join_paths(self, *paths: Union[str, Path], create_if_not_exists=False) -> Union[str, Path]: raise NotImplementedError() def _save_json(self, path: Union[str, Path], json_data: str): raise NotImplementedError() @property def osci_ranking_dir(self) -> Union[str, Path]: raise NotImplementedError() def generate_monthly_osci_ranking_dir_path(self, date: datetime) -> Union[str, Path]: return self._join_paths( self.osci_ranking_dir, self._osci_ranking_monthly_dir_name, f'{date:%Y}', create_if_not_exists=True ) def get_osci_ranking_monthly_path(self, date: datetime) -> Union[str, Path]: return self._join_paths( self.generate_monthly_osci_ranking_dir_path(date=date), f'{date:%m}.json' ) def save(self, path: Union[str, Path], data): self._save_json( path=path, json_data=json.dumps(data, default=lambda v: f'{v:%Y-%m-%d}' if isinstance(v, datetime) else str(v)) ) def save_monthly_osci_ranking(self, ranking: dict, date: datetime): self.save( path=self.get_osci_ranking_monthly_path(date=date), data=ranking )