sourcecode/scoring/runner.py (247 lines of code) (raw):

import argparse import logging import os import sys from . import constants as c from .enums import scorers_from_csv from .pandas_utils import patch_pandas from .process_data import LocalDataLoader, tsv_reader, write_parquet_local, write_tsv_local from .run_scoring import run_scoring import pandas as pd logger = logging.getLogger("birdwatch.runner") logger.setLevel(logging.INFO) def parse_args(): parser = argparse.ArgumentParser("Community Notes Scoring") parser.add_argument( "--check-flips", dest="check_flips", help="Validate that note statuses align with prior runs (disable for testing)", action="store_true", ) parser.add_argument( "--nocheck-flips", help="Disable validation that note statuses align with prior runs (use for testing)", action="store_false", dest="check_flips", ) parser.set_defaults(check_flips=False) parser.add_argument( "--enforce-types", dest="enforce_types", help="Raise errors when types in Pandas operations do not meet expectations.", action="store_true", ) parser.add_argument( "--noenforce-types", dest="enforce_types", help="Log to stderr when types in Pandas operations do not meet expectations.", action="store_false", ) parser.set_defaults(enforce_types=False) parser.add_argument( "-e", "--enrollment", default=c.enrollmentInputPath, help="note enrollment dataset" ) parser.add_argument( "--epoch-millis", default=None, type=float, dest="epoch_millis", help="timestamp in milliseconds since epoch to treat as now", ) parser.add_argument( "--headers", dest="headers", help="First row of input files should be a header", action="store_true", ) parser.add_argument( "--noheaders", dest="headers", help="First row of input files should be data. There should be no headers.", action="store_false", ) parser.set_defaults(headers=True) parser.add_argument("-n", "--notes", default=c.notesInputPath, help="note dataset") parser.add_argument( "--previous-scored-notes", default=None, help="previous scored notes dataset path" ) parser.add_argument( "--previous-aux-note-info", default=None, help="previous aux note info dataset path" ) parser.add_argument( "--previous-rating-cutoff-millis", default=None, type=int, help="previous rating cutoff millis" ) parser.add_argument("-o", "--outdir", default=".", help="directory for output files") parser.add_argument( "--pseudoraters", dest="pseudoraters", help="Include calculation of pseudorater intervals", action="store_true", ) parser.add_argument( "--nopseudoraters", dest="pseudoraters", help="Exclude calculation of pseudorater intervals (faster)", action="store_false", ) parser.set_defaults(pseudoraters=True) parser.add_argument("-r", "--ratings", default=c.ratingsInputPath, help="rating dataset") parser.add_argument( "--scorers", default=None, type=scorers_from_csv, help="CSV list of scorers to enable." ) parser.add_argument( "--seed", default=None, type=int, help="set to an int to seed matrix factorization" ) parser.add_argument( "-s", "--status", default=c.noteStatusHistoryInputPath, help="note status history dataset" ) parser.add_argument( "--strict-columns", dest="strict_columns", help="Explicitly select columns and require that expected columns are present.", action="store_true", ) parser.add_argument( "--nostrict-columns", help="Disable validation of expected columns and allow unexpected columns.", action="store_false", dest="strict_columns", ) parser.set_defaults(strict_columns=True) parser.add_argument( "--parallel", help="Enable parallel run of algorithm.", action="store_true", dest="parallel", ) parser.set_defaults(parallel=False) parser.add_argument( "--no-parquet", help="Disable writing parquet files.", default=False, action="store_true", dest="no_parquet", ) parser.add_argument( "--cutoff-timestamp-millis", default=None, type=int, dest="cutoffTimestampMillis", help="filter notes and ratings created after this time.", ) parser.add_argument( "--exclude-ratings-after-a-note-got-first-status-plus-n-hours", default=None, type=int, dest="excludeRatingsAfterANoteGotFirstStatusPlusNHours", help="Exclude ratings after a note got first status plus n hours", ) parser.add_argument( "--days-in-past-to-apply-post-first-status-filtering", default=14, type=int, dest="daysInPastToApplyPostFirstStatusFiltering", help="Days in past to apply post first status filtering", ) parser.add_argument( "--prescoring-delay-hours", default=None, type=int, dest="prescoring_delay_hours", help="Filter prescoring input to simulate delay in hours", ) parser.add_argument( "--sample-ratings", default=0.0, type=float, dest="sample_ratings", help="Set to sample ratings at random.", ) return parser.parse_args() @patch_pandas def _run_scorer( args=None, dataLoader=None, extraScoringArgs={}, ): logger.info("beginning scorer execution") assert args is not None, "args must be available" if args.epoch_millis: c.epochMillis = args.epoch_millis c.useCurrentTimeInsteadOfEpochMillisForNoteStatusHistory = False # Load input dataframes. if dataLoader is None: dataLoader = LocalDataLoader( args.notes, args.ratings, args.status, args.enrollment, args.headers, ) notes, ratings, statusHistory, userEnrollment = dataLoader.get_data() if args.previous_scored_notes is not None: previousScoredNotes = tsv_reader( args.previous_scored_notes, c.noteModelOutputTSVTypeMapping, c.noteModelOutputTSVColumns, header=False, convertNAToNone=False, ) assert ( args.previous_aux_note_info is not None ), "previous_aux_note_info must be available if previous_scored_notes is available" previousAuxiliaryNoteInfo = tsv_reader( args.previous_aux_note_info, c.auxiliaryScoredNotesTSVTypeMapping, c.auxiliaryScoredNotesTSVColumns, header=False, convertNAToNone=False, ) else: previousScoredNotes = None previousAuxiliaryNoteInfo = None # Sample ratings to decrease runtime if args.sample_ratings: origSize = len(ratings) ratings = ratings.sample(frac=args.sample_ratings) logger.info(f"ratings reduced from {origSize} to {len(ratings)}") # Invoke scoring and user contribution algorithms. scoredNotes, helpfulnessScores, newStatus, auxNoteInfo = run_scoring( notes, ratings, statusHistory, userEnrollment, seed=args.seed, pseudoraters=args.pseudoraters, enabledScorers=args.scorers, strictColumns=args.strict_columns, runParallel=args.parallel, dataLoader=dataLoader if args.parallel == True else None, cutoffTimestampMillis=args.cutoffTimestampMillis, excludeRatingsAfterANoteGotFirstStatusPlusNHours=args.excludeRatingsAfterANoteGotFirstStatusPlusNHours, daysInPastToApplyPostFirstStatusFiltering=args.daysInPastToApplyPostFirstStatusFiltering, filterPrescoringInputToSimulateDelayInHours=args.prescoring_delay_hours, checkFlips=args.check_flips, previousScoredNotes=previousScoredNotes, previousAuxiliaryNoteInfo=previousAuxiliaryNoteInfo, previousRatingCutoffTimestampMillis=args.previous_rating_cutoff_millis, **extraScoringArgs, ) # Write outputs to local disk. write_tsv_local(scoredNotes, os.path.join(args.outdir, "scored_notes.tsv")) write_tsv_local(helpfulnessScores, os.path.join(args.outdir, "helpfulness_scores.tsv")) write_tsv_local(newStatus, os.path.join(args.outdir, "note_status_history.tsv")) write_tsv_local(auxNoteInfo, os.path.join(args.outdir, "aux_note_info.tsv")) if not args.no_parquet: write_parquet_local(scoredNotes, os.path.join(args.outdir, "scored_notes.parquet")) write_parquet_local(helpfulnessScores, os.path.join(args.outdir, "helpfulness_scores.parquet")) write_parquet_local(newStatus, os.path.join(args.outdir, "note_status_history.parquet")) write_parquet_local(auxNoteInfo, os.path.join(args.outdir, "aux_note_info.parquet")) def main( args=None, dataLoader=None, extraScoringArgs={}, ): if args is None: args = parse_args() logger.info(f"scorer python version: {sys.version}") logger.info(f"scorer pandas version: {pd.__version__}") # patch_pandas requires that args are available (which matches the production binary) so # we first parse the arguments then invoke the decorated _run_scorer. return _run_scorer(args=args, dataLoader=dataLoader, extraScoringArgs=extraScoringArgs) if __name__ == "__main__": main()