osci/datalake/local/landing.py (62 lines of code) (raw):

"""Copyright since 2020, EPAM Systems This file is part of OSCI. OSCI is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. OSCI is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with OSCI. If not, see <http://www.gnu.org/licenses/>.""" from .base import LocalSystemArea from osci.datalake.base import BaseLandingArea from osci.utils import get_pandas_data_frame_info from datetime import datetime from pathlib import Path import logging import pandas as pd log = logging.getLogger(__name__) class LocalLandingArea(BaseLandingArea, LocalSystemArea): BASE_AREA_DIR = 'landing' @staticmethod def get_push_events_commits_filename(date: datetime, file_format='parquet'): return date.strftime(f"%Y-%m-%d-{date.hour}.{file_format}") def save_push_events_commits(self, push_event_commits, date: datetime): file_path = self._get_hourly_push_events_commits_path(date) df = pd.DataFrame(push_event_commits) log.info(f'Save push events commits for {date} into file {file_path}') log.info(f'Push events commits df info {get_pandas_data_frame_info(df)}') df.to_parquet(str(file_path), engine='pyarrow', index=False) def get_hour_push_events_commits(self, date: datetime): file_path = self._get_hourly_push_events_commits_path(date) log.info(f'Read push events commits for {date.strftime("%Y-%m-%d %H:00")} from file {file_path}') if file_path.is_file(): df = pd.read_parquet(path=file_path, engine='pyarrow') log.info(f'Push events commits {date.strftime("%Y-%m-%d %H:00")} df info {get_pandas_data_frame_info(df)}') return df else: log.warning(f'Not such push events commits file for {date} in path {file_path}') return None def get_daily_push_events_commits(self, date: datetime): log.info(f'Read push events commits for {date.strftime("%Y-%m-%d")}') df = pd.DataFrame() for hour in range(24): date = date.replace(hour=hour) hour_df = self.get_hour_push_events_commits(date=date) if hour_df is not None: df = pd.concat([df, hour_df]) return df def get_push_events_commits_parent_dir(self, date: datetime, create_if_not_exists=False): path = self._github_events_commits_base / date.strftime("%Y") / date.strftime("%m") / date.strftime("%d") if create_if_not_exists: path.mkdir(parents=True, exist_ok=True) return path def _get_hourly_push_events_commits_path(self, date: datetime) -> Path: return self.get_push_events_commits_parent_dir(date=date, create_if_not_exists=True) / \ self.get_push_events_commits_filename(date) def _get_repository_file_path(self, date: datetime) -> Path: path = self.BASE_PATH / self.BASE_AREA_DIR / self._github_repositories_base / date.strftime("%Y") / date.strftime("%m") path.mkdir(parents=True, exist_ok=True) return path / f'{date:%Y-%m-%d}.csv' def save_repositories(self, df: pd.DataFrame, date: datetime): df.to_csv(self._get_repository_file_path(date=date), index=False) def get_repositories(self, date: datetime) -> pd.DataFrame: file_path = self._get_repository_file_path(date=date) log.debug(f'Read repositories names for {date:%Y-%m-%d} from file {file_path}') if file_path.is_file(): df = pd.read_csv(file_path) log.debug(f'Repositories names {date:%Y-%m-%d} df info {get_pandas_data_frame_info(df)}') return df else: log.warning(f'Not such repositories names file for {date:%Y-%m-%d} in path {file_path}') return pd.DataFrame()