in osci/transformers/rankers/employees_ranking.py [0:0]
def get_companies_employees_activity_rank_combined(df: DataFrame, commits_id_field: str,
author_email_field: str, company_field: str,
commits_thresholds: List[CommitsThresholds] = DEFAULT_THRESHOLDS,
order_by_field: str = DEFAULT_THRESHOLDS[1].col) -> DataFrame:
"""Get companies rank by employees activity (amount of commits)
:param df: PushEventsCommits
:param commits_id_field: Commit identifier field (ex. 'sha')
:param author_email_field: Commit author email field
:param company_field: Company name field
:param commits_thresholds: Commits thresholds (ex.: [CommitsThresholds(col='Commits >= 10', threshold=10)])
:param order_by_field: Result order by field
:return:
"""
if not len(commits_thresholds):
raise ValueError(f'Param commits_thresholds must be non empty. Passed: {commits_thresholds}')
commits_count_field = 'Commits'
employees_activity = get_companies_employees_activity(df=df,
commits_id_field=commits_id_field,
author_email_field=author_email_field,
company_field=company_field,
result_field=commits_count_field).cache()
return reduce(
lambda df1, df2: df1.join(df2, on=company_field, how='left'),
[
employees_activity.filter(
f.col(commits_count_field) >= commits_threshold.threshold
).select(
company_field, author_email_field
).groupBy(
f.col(company_field)
).agg(
f.count(f.col(author_email_field)).alias(commits_threshold.col)
)
for commits_threshold in commits_thresholds
]
).sort(order_by_field, ascending=False)