def load_osci_ranking_to_bq()

in osci/publishing/load_osci_ranking_to_bq.py [0:0]


def load_osci_ranking_to_bq(date: datetime.datetime, date_period: str = DatePeriodType.YTD):
    if date_period not in (DatePeriodType.MTD, DatePeriodType.YTD):
        raise ValueError(f'Unsupported {date_period}')
    report = OSCIRankingFactory().get_cls(date_period=date_period)(date=date)
    table = date_period_to_table_map[date_period]

    log.debug(f'Load {report.name} for {date:%Y-%m-%d} to {table.table_id}')

    report_df = report.read()
    report_df = report_df[PublicSchemas.company_contributors_ranking.required]
    report_df = report_df.reset_index().rename(columns={'index': table.Columns.position})
    report_df[table.Columns.position] += 1
    report_df = report_df.rename(columns=table.mapping)
    report_df[table.Columns.date] = date.date()

    return DataLake().big_query.load_dataframe(df=report_df, table_id=table.table_id, schema=table.schema)