osci/filter/filter_unlicensed.py (40 lines of code) (raw):
"""Copyright since 2021, EPAM Systems
This file is part of OSCI.
OSCI is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
OSCI is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with OSCI. If not, see <http://www.gnu.org/licenses/>."""
from datetime import datetime
from typing import List
from osci.datalake import DataLake
from osci.datalake.repositories import Repositories
import logging
import pandas as pd
log = logging.getLogger(__name__)
def filter_and_adjunct_push_event_commit(df: pd.DataFrame, licensed_repos_df: pd.DataFrame, filter_columns: List[str],
adjunct_columns: List[str], default_columns: List[str], right_index: str = "",
left_index: str = "") -> pd.DataFrame:
"""Adjunct dataframe and filter DataFrame without license
:param df: push event commit dataframe
:param licensed_repos_df: licensed repository dataframe
:param filter_columns: filter columns
:param adjunct_columns: Columns, that are added tp `df`
:param default_columns: Default required columns in schema
:param left_index: column name on df
:param right_index: column index on licensed_repos_df
"""
try:
return df.join(licensed_repos_df[adjunct_columns].set_index(right_index),
on=left_index).dropna(subset=filter_columns).reset_index(drop=True)
except KeyError as ex:
log.warning("`licensed_repos_df` or `df` is empty \n"
f"{licensed_repos_df.info()} , {df.info()}")
log.exception(ex)
return pd.DataFrame(columns=default_columns)
def filter_out_unlicensed(date: datetime):
"""Read row PEC, filter and save them with license, language
:param date: push events on this day
"""
log.debug(f'Filter out unlicensed push events commits for date {date:%Y-%m-%d}')
log.debug(f'Read licensed repos for date {date:%Y-%m-%d}')
licensed_repos_df = Repositories(date=date).read()
for company, df in DataLake().staging.get_daily_raw_push_events_commits(date):
log.debug(f'Filter out unlicensed push events commits for date {date:%Y-%m-%d} for {company}')
filtered_df = filter_and_adjunct_push_event_commit(df, licensed_repos_df,
[DataLake().staging.schemas.repositories.license],
[DataLake().staging.schemas.repositories.name,
DataLake().staging.schemas.repositories.language,
DataLake().staging.schemas.repositories.license
],
DataLake().staging.schemas.push_commits.required,
right_index=DataLake().staging.schemas.repositories.name,
left_index=DataLake().staging.schemas.push_commits.repo_name
)
if not filtered_df.empty:
DataLake().staging.save_push_events_commits(
push_event_commits=filtered_df,
company_name=company,
date=date
)