osci/datalake/blob/landing.py (39 lines of code) (raw):
"""Copyright since 2020, EPAM Systems
This file is part of OSCI.
OSCI is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
OSCI is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with OSCI. If not, see <http://www.gnu.org/licenses/>."""
from .base import BlobArea
from osci.datalake.base import BaseLandingArea
from osci.utils import get_pandas_data_frame_info
from datetime import datetime
import logging
import pandas as pd
log = logging.getLogger(__name__)
class BlobLandingArea(BaseLandingArea, BlobArea):
AREA_CONTAINER = 'landing'
def save_push_events_commits(self, push_event_commits, date: datetime, index=False):
file_path = self._get_hourly_push_events_commits_path(date)
df = pd.DataFrame(push_event_commits)
log.info(f'Save push events commits for {date} into file {file_path}')
log.info(f'Push events commits df info {get_pandas_data_frame_info(df)}')
self.write_pandas_dataframe_to_parquet(df, file_path, index=index)
def get_hour_push_events_commits(self, date: datetime):
file_path = self._get_hourly_push_events_commits_path(date)
log.info(f'Read push events commits for {date.strftime("%Y-%m-%d %H:00")} from file {file_path}')
df = self.read_pandas_dataframe_from_parquet(path=file_path)
if df is not None:
log.debug(f'Push events commits {date.strftime("%Y-%m-%d %H:00")} df info {get_pandas_data_frame_info(df)}')
return df
def get_daily_push_events_commits(self, date: datetime):
log.info(f'Read push events commits for {date.strftime("%Y-%m-%d")}')
df = pd.DataFrame()
for hour in range(24):
date = date.replace(hour=hour)
hour_df = self.get_hour_push_events_commits(date=date)
if hour_df is not None:
df = pd.concat([df, hour_df])
return df
def _get_hourly_push_events_commits_path(self, date: datetime) -> str:
return date.strftime(f"{self._github_events_commits_base}/%Y/%m/%d/%Y-%m-%d-{date.hour}.parquet")
def _get_repository_file_path(self, date: datetime) -> str:
return f'{self._github_repositories_base}/{date:%Y/%m/%Y-%m-%d}.csv'
def save_repositories(self, df: pd.DataFrame, date: datetime):
self.write_pandas_dataframe_to_csv(df=df, path=self._get_repository_file_path(date=date))
def get_repositories(self, date: datetime) -> pd.DataFrame:
return self.read_pandas_dataframe_from_csv(path=self._get_repository_file_path(date=date))