osci/datalake/blob/staging.py (89 lines of code) (raw):
"""Copyright since 2020, EPAM Systems
This file is part of OSCI.
OSCI is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
OSCI is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with OSCI. If not, see <http://www.gnu.org/licenses/>."""
from pathlib import Path
import yaml
from .base import BlobArea
from osci.datalake.base import BaseStagingArea, DatePeriodType
from osci.utils import get_pandas_data_frame_info, normalize_company
from datetime import datetime
from typing import Union, Iterable, List, Iterator
import logging
import pandas as pd
log = logging.getLogger(__name__)
class BlobStagingArea(BaseStagingArea, BlobArea):
AREA_CONTAINER = 'staging'
def get_push_events_commits_file_path(self, date: datetime, company: str):
return f'{self.get_push_events_commits_parent_dir(date)}/{self.get_push_events_commits_filename(date, company)}'
@staticmethod
def _get_file_name(path: str) -> str:
return path.split('/').pop()
@staticmethod
def get_private_push_events_commits_file_path(date: datetime, company: str):
return date.strftime(
f'{normalize_company(name=company).upper()}/github/events/push/%Y/%m/%Y-%m-%d.parquet')
def save_private_push_events_commits(self, push_event_commits: pd.DataFrame, company_name: str, date: datetime):
file_path = self.get_private_push_events_commits_file_path(date=date, company=company_name)
log.info(f'Save private push events commits for {date} into file {file_path}\n'
f'DF INFO: {get_pandas_data_frame_info(push_event_commits)}')
self.write_pandas_dataframe_to_parquet(push_event_commits, file_path, index=False)
def save_push_events_commits(self, push_event_commits: pd.DataFrame, company_name: str, date: datetime):
file_path = self.get_push_events_commits_file_path(date=date, company=company_name)
log.info(f'Save push events commits for {date} into file {file_path}\n'
f'DF INFO: {get_pandas_data_frame_info(push_event_commits)}')
self.write_pandas_dataframe_to_parquet(push_event_commits, file_path, index=False)
def get_push_events_commits_spark_paths(self, to_date: datetime, date_period_type: str = DatePeriodType.YTD,
from_date: datetime = None, company=None) -> List[str]:
return self.get_spark_paths(self.get_push_events_commits_paths(to_date=to_date,
date_period_type=date_period_type,
from_date=from_date,
company=company))
def get_push_events_commits(self, to_date: datetime, date_period_type: str = DatePeriodType.YTD,
from_date: datetime = None, company=None) -> pd.DataFrame:
paths = self.get_push_events_commits_paths(to_date=to_date, date_period_type=date_period_type,
from_date=from_date, company=company)
return pd.concat([self.read_pandas_dataframe_from_parquet(path) for path in paths]) if paths else pd.DataFrame()
def get_push_events_commits_parent_dir(self, date: datetime) -> str:
return date.strftime(f'{self._github_events_commits_base}/%Y/%m/%d')
@staticmethod
def get_push_events_commits_filename(date: datetime, company: str) -> str:
return date.strftime(f'{normalize_company(name=company)}-%Y-%m-%d.parquet')
def _get_date_partitioned_paths(self, dir_path: str, year: Union[str, int] = None,
month: Union[str, int] = None,
day: Union[str, int] = None,
company: str = None) -> Iterable[str]:
dir_path += '/'
if year is not None:
dir_path += f'{year}/'
if month is not None:
dir_path += f'{str(month).zfill(2)}/'
if day is not None:
dir_path += f'{str(day).zfill(2)}/'
if company is not None:
dir_path += f'{normalize_company(name=company)}-'
return (blob['name'] for blob in self.container_client.list_blobs(name_starts_with=dir_path))
def _get_configuration_file_path(self, file: str) -> str:
return f'{self.CONF_AREA_DIR}/{file}'
def load_projects_filter(self):
return self.read_yaml_file(path=self._get_configuration_file_path(file='projects_filter.yaml'))
def get_repositories_path(self, date: datetime) -> str:
return f'{self._github_repositories_base}/{date:%Y/%m}/{self._get_repositories_file_name(date)}'
def get_repositories(self, date: datetime) -> pd.DataFrame:
return self.read_pandas_dataframe_from_parquet(path=self.get_repositories_path(date=date))
def save_repositories(self, df: pd.DataFrame, date: datetime):
self.write_pandas_dataframe_to_parquet(df=df, path=self.get_repositories_path(date=date))
def get_raw_push_events_commits_parent_dir_path(self, date: datetime):
return f'{self._github_raw_events_commits_base}/{date:%Y/%m/%d}'
def get_raw_push_events_commits_path(self, date: datetime, company: str) -> str:
return f'{self.get_raw_push_events_commits_parent_dir_path(date)}/' \
f'{self._get_raw_push_events_commits_file_name(date, company)}'
def get_raw_push_events_commits(self, path: str) -> pd.DataFrame:
df = self.read_pandas_dataframe_from_parquet(path)
return df if df is not None else pd.DataFrame()
def save_raw_push_events_commits(self, push_event_commits: pd.DataFrame, company_name: str, date: datetime):
self.write_pandas_dataframe_to_parquet(df=push_event_commits,
path=self.get_raw_push_events_commits_path(date=date,
company=company_name))
def get_daily_raw_push_events_commits_paths(self, date: datetime) -> Iterator[Union[str]]:
prefix = self.get_raw_push_events_commits_parent_dir_path(date)
return (
blob['name']
for blob in self.container_client.list_blobs(name_starts_with=prefix)
)