osci/jobs/base.py (57 lines of code) (raw):
"""Copyright since 2020, EPAM Systems
This file is part of OSCI.
OSCI is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
OSCI is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with OSCI. If not, see <http://www.gnu.org/licenses/>."""
import logging
from datetime import datetime
from pyspark.sql import DataFrame, functions as f
from typing import Type
from osci.datalake import DataLake, DatePeriodType, GeneralReportFactory, Report, CompanyReportFactory, CompanyReport
from osci.transformers.filter_duplicates import filter_out_duplicates
from .session import Session
log = logging.getLogger(__name__)
class PushCommitsRankingJob:
"""Base push commits ranking spark job"""
REPORT_NAME = 'unnamed_report'
REPORT_FACTORY: Type[GeneralReportFactory] = None
report_cls: Type[Report]
def __init__(self, date_period_type: str = DatePeriodType.YTD):
self.data_lake = DataLake()
self.commits_schema = self.data_lake.staging.schemas.push_commits
self.date_period_type = date_period_type
self.report_cls: Type[Report] = self.REPORT_FACTORY().get_cls(date_period=self.date_period_type)
def extract(self, to_date: datetime, from_date: datetime = None) -> DataFrame:
return self.filter_df(
commits=Session().load_dataframe(paths=self._get_dataset_paths(to_date, from_date))
)
def _get_dataset_paths(self, to_date: datetime, from_date: datetime = None):
paths = self.data_lake.staging.get_push_events_commits_spark_paths(from_date=from_date,
to_date=to_date,
date_period_type=self.date_period_type)
log.debug(f'Loaded paths for ({from_date} {to_date}) {paths}')
return paths
def transform(self, df: DataFrame, *args, **kwargs) -> DataFrame:
raise NotImplementedError()
def load(self, df: DataFrame, date: datetime):
self.report_cls(date=date).save(df=df.toPandas())
def run(self, to_date: datetime, from_date: datetime = None):
df = self.extract(to_date, from_date)
df = self.transform(df)
self.load(df, to_date)
def filter_out_duplicates_commits(self, df) -> DataFrame:
return filter_out_duplicates(df, commits_id_field=self.commits_schema.sha,
datetime_field=self.commits_schema.event_created_at)
def filter_out_na_org(self, df) -> DataFrame:
return df.where(f.col(self.commits_schema.org_name).isNotNull())
def filter_df(self, commits) -> DataFrame:
return self.filter_out_duplicates_commits(commits)
class CompanyPushCommitsRankingJob(PushCommitsRankingJob):
REPORT_FACTORY: CompanyReportFactory
report_cls: Type[CompanyReport]
def __init__(self, company: str, date_period_type: str = DatePeriodType.YTD):
self.company = company
super().__init__(date_period_type=date_period_type)
def transform(self, df: DataFrame, *args, **kwargs) -> DataFrame:
raise NotImplementedError()
def _get_dataset_paths(self, to_date: datetime, from_date: datetime = None):
return self.data_lake.staging.get_push_events_commits_spark_paths(from_date=from_date,
to_date=to_date,
date_period_type=self.date_period_type,
company=self.company)
def load(self, df: DataFrame, date: datetime):
self.report_cls(date=date, company=self.company).save(df=df.toPandas())