osci/transformers/rankers/employees_ranking.py (64 lines of code) (raw):
"""Copyright since 2019, EPAM Systems
This file is part of OSCI.
OSCI is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
OSCI is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with OSCI. If not, see <http://www.gnu.org/licenses/>."""
from typing import List, NamedTuple
from functools import reduce
from pyspark.sql import DataFrame
from pyspark.sql import functions as f
class CommitsThresholds(NamedTuple):
"""Commits thresholds"""
col: str # Result column name
threshold: int
DEFAULT_THRESHOLDS = (
CommitsThresholds(col='Commits >= 1', threshold=1),
CommitsThresholds(col='Commits >= 10', threshold=10)
)
def get_companies_employees_activity(df: DataFrame, commits_id_field: str, author_email_field: str,
company_field: str, result_field: str = 'Commits') -> DataFrame:
"""Get companies employees activity by amount of commits
:param df: PushEventsCommits
:param commits_id_field: Commit identifier field (ex. 'sha')
:param author_email_field: Commit author email field
:param company_field: Company name field
:param result_field: Field in output df which must contains amount of commits
:return:
"""
return df \
.select(f.col(commits_id_field), f.col(author_email_field), f.col(company_field)) \
.groupBy(author_email_field, company_field) \
.agg(f.count(f.col(commits_id_field)).alias(result_field))
def get_companies_employees_activity_rank_combined(df: DataFrame, commits_id_field: str,
author_email_field: str, company_field: str,
commits_thresholds: List[CommitsThresholds] = DEFAULT_THRESHOLDS,
order_by_field: str = DEFAULT_THRESHOLDS[1].col) -> DataFrame:
"""Get companies rank by employees activity (amount of commits)
:param df: PushEventsCommits
:param commits_id_field: Commit identifier field (ex. 'sha')
:param author_email_field: Commit author email field
:param company_field: Company name field
:param commits_thresholds: Commits thresholds (ex.: [CommitsThresholds(col='Commits >= 10', threshold=10)])
:param order_by_field: Result order by field
:return:
"""
if not len(commits_thresholds):
raise ValueError(f'Param commits_thresholds must be non empty. Passed: {commits_thresholds}')
commits_count_field = 'Commits'
employees_activity = get_companies_employees_activity(df=df,
commits_id_field=commits_id_field,
author_email_field=author_email_field,
company_field=company_field,
result_field=commits_count_field).cache()
return reduce(
lambda df1, df2: df1.join(df2, on=company_field, how='left'),
[
employees_activity.filter(
f.col(commits_count_field) >= commits_threshold.threshold
).select(
company_field, author_email_field
).groupBy(
f.col(company_field)
).agg(
f.count(f.col(author_email_field)).alias(commits_threshold.col)
)
for commits_threshold in commits_thresholds
]
).sort(order_by_field, ascending=False)
def get_companies_rank_by_employees_amount(df: DataFrame,
company_field: str,
author_email_field: str,
result_employee_field: str = 'Employees') -> DataFrame:
"""Get companies ranking by amount of employees
:param df: PushEventsCommits
:param company_field: Company name field
:param author_email_field: Commit author email field
:param result_employee_field: Field in output df which must contains amount of employees
:return:
"""
return df \
.select(company_field, author_email_field) \
.groupBy(company_field) \
.agg(f.countDistinct(f.col(author_email_field)).alias(result_employee_field)) \
.sort(result_employee_field, ascending=False)
def get_amount_employees_monthly(df: DataFrame,
author_email_field: str,
datetime_field: str,
result_employee_field: str = 'Employees',
result_month_field: str = 'Month') -> DataFrame:
"""Get amount of employees (that have any activity) monthly for company
:param df: PushEventsCommits
:param author_email_field: Commit author email field
:param datetime_field: Event created at datetime field
:param result_employee_field: Field in output df which must contains amount of employees
:param result_month_field: Field in output df which must contains month
:return:
"""
return df.select(author_email_field, datetime_field) \
.withColumn(result_month_field, f.date_format(datetime_field, "yyyy-MM")) \
.select(author_email_field, result_month_field) \
.groupBy(result_month_field) \
.agg(f.count(f.col(author_email_field)).alias(result_employee_field)) \
.sort(result_month_field)