osci/actions/process/company_rankers.py (95 lines of code) (raw):

"""Copyright since 2021, EPAM Systems This file is part of OSCI. OSCI is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. OSCI is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with OSCI. If not, see <http://www.gnu.org/licenses/>.""" from datetime import datetime from osci.jobs.contributors_ranking import ContributorsRankingJob from osci.jobs.contributors_repos_ranking import ContributorsReposRankingJob from osci.jobs.month_by_month_commits_amount import MonthByMonthCommitsJob from osci.jobs.repositories_ranking import ReposRankingJob from osci.actions import Action, ActionParam from osci.actions.consts import get_default_from_day, get_default_to_day from osci.datalake import DatePeriodType class CompanyContributorsRankingAction(Action): """Count employees commits""" params = ( ActionParam(name='company', type=str, required=True, short_name='c', description='Company name'), ActionParam(name='to_day', type=datetime, required=True, short_name='td', description='till this day'), ActionParam(name='date_period', type=str, required=False, short_name='dp', description='data period: day, month or year', default=DatePeriodType.YTD, choices=DatePeriodType.all), ActionParam(name='from_day', type=datetime, required=False, short_name='fd', description=f'Optional parameter will be ignored for no `DTD` time period.'), ) @classmethod def name(cls) -> str: return 'company-contributors-ranking' def _execute(self, company: str, to_day: datetime, date_period: str, from_day: datetime): ContributorsRankingJob(date_period_type=date_period, company=company).run(to_date=to_day, from_date=from_day) class CompanyContributorsReposRankingAction(Action): """Count employees repos commits""" params = ( ActionParam(name='company', type=str, required=True, short_name='c', description='Company name', default=''), ActionParam(name='to_day', type=datetime, required=True, short_name='td', description='till this day', default=get_default_to_day()), ActionParam(name='date_period', type=str, required=False, short_name='dp', description='data period: day, month or year', default=DatePeriodType.YTD, choices=DatePeriodType.all), ActionParam(name='from_day', type=datetime, required=False, short_name='fd', description=f'Optional parameter will be ignored for no `DTD` time period.', default=get_default_from_day()), ) @classmethod def name(cls) -> str: return 'company-contributors-repos-ranking' def _execute(self, company: str, to_day: datetime, date_period: str, from_day: datetime): ContributorsReposRankingJob(date_period_type=date_period, company=company).run(to_date=to_day, from_date=from_day) class CompanyMonthByMonthCommitsAmountAction(Action): """Get month-by-month amount of commits""" params = ( ActionParam(name='company', type=str, required=True, short_name='c', description='Company name', default=''), ActionParam(name='to_day', type=datetime, required=True, short_name='td', description='till this day', default=get_default_to_day()), ) @classmethod def name(cls) -> str: return 'company-month-by-month-commits-amount' def _execute(self, company: str, to_day: datetime): MonthByMonthCommitsJob(date_period_type=DatePeriodType.YTD, company=company).run(to_date=to_day) class CompanyReposRankingAction(Action): """Get amount of repos commits""" params = ( ActionParam(name='company', type=str, required=True, short_name='c', description='Company name', default=''), ActionParam(name='to_day', type=datetime, required=True, short_name='td', description='till this day', default=get_default_to_day()), ActionParam(name='date_period', type=str, required=False, short_name='dp', description='data period: day, month or year', default=DatePeriodType.YTD, choices=DatePeriodType.all), ActionParam(name='from_day', type=datetime, required=False, short_name='fd', description=f'Optional parameter will be ignored for no `DTD` time period.', default=get_default_from_day()), ) @classmethod def name(cls) -> str: return 'company-repos-ranking' def _execute(self, company: str, to_day: datetime, date_period: str, from_day: datetime): ReposRankingJob(date_period_type=date_period, company=company).run(to_date=to_day, from_date=from_day) class DailyCompanyRankingsAction(Action): params = ( ActionParam(name='company', type=str, required=True, short_name='c', description='Company name', default=''), ActionParam(name='to_day', type=datetime, required=True, short_name='td', description='till this day', default=get_default_to_day()), ) @classmethod def name(cls) -> str: return 'daily-company-rankings' def _execute(self, company: str, to_day: datetime): for date_period in [DatePeriodType.YTD, DatePeriodType.MTD]: company_contributors_ranking_job = ContributorsRankingJob(date_period_type=date_period, company=company) company_contributors_repos_ranking_job = ContributorsReposRankingJob(date_period_type=date_period, company=company) company_repos_ranking_job = ReposRankingJob(date_period_type=date_period, company=company) commits = company_contributors_ranking_job.extract(to_date=to_day).cache() company_contributors_ranking_job.load(df=company_contributors_ranking_job.transform(commits), date=to_day) company_contributors_repos_ranking_job.load(df=company_contributors_repos_ranking_job.transform(commits), date=to_day) company_repos_ranking_job.load(df=company_repos_ranking_job.transform(commits), date=to_day) company_month_by_month_commits_amount_job = MonthByMonthCommitsJob(date_period_type=DatePeriodType.YTD, company=company) company_month_by_month_commits_amount_job.load( df=company_month_by_month_commits_amount_job.transform(commits), date=to_day )