sourcecode/scoring/mf_base_scorer.py (923 lines of code) (raw):

import gc import logging from typing import Dict, List, Optional, Set, Tuple from . import ( constants as c, helpfulness_scores, note_ratings, process_data, tag_consensus, tag_filter, ) from .incorrect_filter import get_user_incorrect_ratio from .matrix_factorization.matrix_factorization import MatrixFactorization from .matrix_factorization.pseudo_raters import PseudoRatersRunner from .pandas_utils import get_df_fingerprint, keep_columns from .reputation_matrix_factorization.diligence_model import ( fit_low_diligence_model_final, fit_low_diligence_model_prescoring, ) from .scorer import Scorer import numpy as np import pandas as pd import torch logger = logging.getLogger("birdwatch.mf_base_scorer") logger.setLevel(logging.INFO) def coalesce_columns(df: pd.DataFrame, columnPrefix: str) -> pd.DataFrame: """Condense all columns beginning with columnPrefix into a single column. With each row there must be at most one column with a non-NaN value in the set of columns beginning with columnPrefix. If a non-NaN value is present that will become the value in the condensed column, otherwise the value will be NaN. After column values are condensed the original (prefixed) columns will be dropped. Args: df: DataFrame containing columns to condense collumnPrefix: Prefix used to detect columns to coalesce, and the name for the output column. Returns: DataFrame with all columns prefixed by columnPrefix dropped and replaced by a single column named columnPrefix Raises: AssertionError if multiple columns prefixed by columnPrefix have non-NaN values for any row. """ # Identify columns to coalesce columns = [col for col in df.columns if col.startswith(f"{columnPrefix}_")] if not columns: return df # Validate that at most one column is set, and store which rows have a column set rowResults = np.invert(df[columns].isna()).sum(axis=1) assert all(rowResults <= 1), "each row should only be in one modeling group" # Coalesce results def _get_value(row): idx = row.first_valid_index() return row[idx] if idx is not None else np.nan coalesced = df[columns].apply(_get_value, axis=1) # Drop old columns and replace with new df = df.drop(columns=columns) df[columnPrefix] = coalesced return df def get_ratings_for_stable_init( ratingsForTraining: pd.DataFrame, userEnrollmentRaw: pd.DataFrame, modelingGroupToInitializeForStability: int, minPercentRatingsFromModelingGroup: float = 0.75, minNumRatingsToIncludeInStableInitialization: int = 5, ) -> pd.DataFrame: """Returns a subset of ratings to use for training an initial matrix factorization. Args: ratingsForTraining (pd.DataFrame) userEnrollmentRaw (pd.DataFrame) modelingGroupToInitializeForStability: modeling group for round 0 ratings minPercentRatingsFromModelingGroup: notes must have this fraction of ratings from the modeling group minNumRatingsToIncludeInStableInitialization: required from modeling group Returns: DF containing a subset of ratings """ ratingsForTrainingWithModelingGroup = ratingsForTraining.merge( userEnrollmentRaw[[c.participantIdKey, c.modelingGroupKey]], left_on=c.raterParticipantIdKey, right_on=c.participantIdKey, ) ratingsForTrainingWithModelingGroup[c.ratingFromInitialModelingGroupKey] = ( ratingsForTrainingWithModelingGroup[c.modelingGroupKey] == modelingGroupToInitializeForStability ) # Only include ratings from the modeling group ratingsForStableInitialization = ratingsForTrainingWithModelingGroup[ ratingsForTrainingWithModelingGroup[c.ratingFromInitialModelingGroupKey] ] # Only include notes that have received at least 75% of their ratings from the modeling group (and 5 total) ratingsForTrainingWithModelingGroup[c.ratingCountKey] = 1 noteStatsByRatedModelingGroup = ( ratingsForTrainingWithModelingGroup[ [c.noteIdKey, c.ratingFromInitialModelingGroupKey, c.ratingCountKey] ] .groupby(c.noteIdKey) .sum() .reset_index() ) noteStatsByRatedModelingGroup[c.percentFromInitialModelingGroupKey] = ( noteStatsByRatedModelingGroup[c.ratingFromInitialModelingGroupKey] / noteStatsByRatedModelingGroup[c.ratingCountKey] ) noteStatsByRatedModelingGroup[ c.percentFromInitialModelingGroupKey ] = noteStatsByRatedModelingGroup[c.percentFromInitialModelingGroupKey].fillna(0) notesRatedMostlyByInitialModelingGroup = noteStatsByRatedModelingGroup[ ( noteStatsByRatedModelingGroup[c.percentFromInitialModelingGroupKey] >= minPercentRatingsFromModelingGroup ) & ( noteStatsByRatedModelingGroup[c.ratingCountKey] >= minNumRatingsToIncludeInStableInitialization ) ] ratingsForStableInitialization = ratingsForStableInitialization.merge( notesRatedMostlyByInitialModelingGroup[[c.noteIdKey]], on=c.noteIdKey ) assert ( len(ratingsForStableInitialization) > 0 ), "No ratings from stable initialization modeling group." return ratingsForStableInitialization # TODO: Consider merging compute_scored_notes, is_crh, is_crnh, filter_ratings, # compute_general_helpfulness_scores and filter_ratings_by_helpfulness_scores into this class. # These functions are only called by this class, and merging them in will allow accessing # member state and simplify the callsites. class MFBaseScorer(Scorer): """Runs MatrixFactorization to determine raw note scores and ultimately note status.""" def __init__( self, includedTopics: Set[str] = set(), includedGroups: Set[int] = set(), includeUnassigned: bool = False, captureThreshold: Optional[float] = None, seed: Optional[int] = None, pseudoraters: Optional[bool] = True, minNumRatingsPerRater: int = 10, minNumRatersPerNote: int = 5, minRatingsNeeded: int = 5, minMeanNoteScore: float = 0.05, minCRHVsCRNHRatio: float = 0.00, minRaterAgreeRatio: float = 0.66, crhThreshold: float = 0.40, crnhThresholdIntercept: float = -0.05, crnhThresholdNoteFactorMultiplier: float = -0.8, crnhThresholdNMIntercept: float = -0.15, crnhThresholdUCBIntercept: float = -0.04, crhSuperThreshold: Optional[float] = 0.5, lowDiligenceThreshold: float = 0.263, factorThreshold: float = 0.5, inertiaDelta: float = 0.01, useStableInitialization: bool = True, saveIntermediateState: bool = False, threads: int = c.defaultNumThreads, maxFirstMFTrainError: float = 0.16, maxFinalMFTrainError: float = 0.09, userFactorLambda=None, noteFactorLambda=None, userInterceptLambda=None, noteInterceptLambda=None, globalInterceptLambda=None, diamondLambda=None, normalizedLossHyperparameters=None, multiplyPenaltyByHarassmentScore: bool = True, minimumHarassmentScoreToPenalize: float = 2.0, tagConsensusHarassmentHelpfulRatingPenalty: int = 10, useReputation: bool = True, tagFilterPercentile: int = 95, incorrectFilterThreshold: float = 2.5, firmRejectThreshold: Optional[float] = None, ): """Configure MatrixFactorizationScorer object. Args: includedGroups: if set, filter ratings and results based on includedGroups includedTopics: if set, filter ratings based on includedTopics seed: if not None, seed value to ensure deterministic execution pseudoraters: if True, compute optional pseudorater confidence intervals minNumRatingsPerRater: Minimum number of ratings which a rater must produce to be included in scoring. Raters with fewer ratings are removed. minNumRatersPerNote: Minimum number of ratings which a note must have to be included in scoring. Notes with fewer ratings are removed. minRatingsNeeded: Minimum number of ratings for a note to achieve status. minMeanNoteScore: Raters included in the second MF round must achieve this minimum average intercept for any notes written. minCRHVsCRNHRatio: Minimum crhCrnhRatioDifference for raters included in the second MF round. crhCrnhRatioDifference is a weighted measure comparing how often an author produces CRH / CRNH notes. See author_helpfulness for more info. minRaterAgreeRatio: Raters in the second MF round must exceed this minimum standard for how often a rater must predict the eventual outcome when rating before a note is assigned status. crhThreshold: Minimum intercept for most notes to achieve CRH status. crnhThresholdIntercept: Maximum intercept for most notes to achieve CRNH status. crnhThresholdNoteFactorMultiplier: Scaling factor making controlling the relationship between CRNH threshold and note intercept. Note that this constant is set negative so that notes with larger (magnitude) factors must have proportionally lower intercepts to become CRNH. crnhThresholdNMIntercept: Maximum intercept for notes which do not claim a tweet is misleading to achieve CRNH status. crnhThresholdUCBIntercept: Maximum UCB of the intercept (determined with pseudoraters) for notes to achieve CRNH status. crhSuperThreshold: Minimum intercept for notes which have consistent and common patterns of repeated reason tags in not-helpful ratings to achieve CRH status. inertiaDelta: Minimum amount which a note that has achieve CRH status must drop below the applicable threshold to lose CRH status. useStableInitialization: whether to use a specific modeling group of users to stably initialize threads: number of threads to use for intra-op parallelism in pytorch maxFirstMFTrainError: maximum error allowed for the first MF training process maxFinalMFTrainError: maximum error allowed for the final MF training process """ super().__init__( includedTopics=includedTopics, includedGroups=includedGroups, includeUnassigned=includeUnassigned, captureThreshold=captureThreshold, seed=seed, threads=threads, ) self._pseudoraters = pseudoraters self._minNumRatingsPerRater = minNumRatingsPerRater self._minNumRatersPerNote = minNumRatersPerNote self._minRatingsNeeded = minRatingsNeeded self._minMeanNoteScore = minMeanNoteScore self._minCRHVsCRNHRatio = minCRHVsCRNHRatio self._minRaterAgreeRatio = minRaterAgreeRatio self._crhThreshold = crhThreshold self._crnhThresholdIntercept = crnhThresholdIntercept self._crnhThresholdNoteFactorMultiplier = crnhThresholdNoteFactorMultiplier self._crnhThresholdNMIntercept = crnhThresholdNMIntercept self._crnhThresholdUCBIntercept = crnhThresholdUCBIntercept self._crhSuperThreshold = crhSuperThreshold self._inertiaDelta = inertiaDelta self._modelingGroupToInitializeForStability = 13 if useStableInitialization else None self._saveIntermediateState = saveIntermediateState self._maxFirstMFTrainError = maxFirstMFTrainError self._maxFinalMFTrainError = maxFinalMFTrainError self._lowDiligenceThreshold = lowDiligenceThreshold self._factorThreshold = factorThreshold self.multiplyPenaltyByHarassmentScore = multiplyPenaltyByHarassmentScore self.minimumHarassmentScoreToPenalize = minimumHarassmentScoreToPenalize self.tagConsensusHarassmentHelpfulRatingPenalty = tagConsensusHarassmentHelpfulRatingPenalty self._useReputation = useReputation self._tagFilterPercentile = tagFilterPercentile self._incorrectFilterThreshold = incorrectFilterThreshold self._firmRejectThreshold = firmRejectThreshold mfArgs = dict( [ pair for pair in [ ("userFactorLambda", userFactorLambda) if userFactorLambda is not None else None, ("noteFactorLambda", noteFactorLambda) if noteFactorLambda is not None else None, ("userInterceptLambda", userInterceptLambda) if userInterceptLambda is not None else None, ("noteInterceptLambda", noteInterceptLambda) if noteInterceptLambda is not None else None, ("globalInterceptLambda", globalInterceptLambda) if globalInterceptLambda is not None else None, ("diamondLambda", diamondLambda) if diamondLambda is not None else None, ("normalizedLossHyperparameters", normalizedLossHyperparameters) if normalizedLossHyperparameters is not None else None, ("initLearningRate", 0.02 if normalizedLossHyperparameters is not None else 0.2), ("noInitLearningRate", 0.02 if normalizedLossHyperparameters is not None else 1.0), ] if pair is not None ] ) self._mfRanker = MatrixFactorization(**mfArgs) def assert_train_error_is_below_threshold(self, ratings, maxTrainError) -> None: """ If we are running a non-test run (number of ratings is above threshold), assert that the final training error for the MF ranker is below the threshold. """ testRun = ( ratings[c.noteIdKey].nunique() < c.minNumNotesForProdData if ratings is not None else False ) if not testRun: finalTrainError = self._mfRanker.get_final_train_error() if finalTrainError is None: raise ValueError("Final train error is None") else: if finalTrainError > maxTrainError: raise ValueError(f"Train error ({finalTrainError}) is above threshold ({maxTrainError})") def get_crh_threshold(self) -> float: """Return CRH threshold for general scoring logic.""" return self._crhThreshold def get_scored_notes_cols(self) -> List[str]: """Returns a list of columns which should be present in the scoredNotes output.""" return [ c.noteIdKey, c.internalNoteInterceptKey, c.internalNoteFactor1Key, c.internalRatingStatusKey, c.internalActiveRulesKey, c.activeFilterTagsKey, c.noteInterceptMaxKey, c.noteInterceptMinKey, c.numFinalRoundRatingsKey, ] def get_internal_scored_notes_cols(self) -> List[str]: """Returns a list of internal columns which should be present in the scoredNotes output.""" return [ c.noteIdKey, c.internalNoteInterceptKey, c.internalNoteFactor1Key, c.internalRatingStatusKey, c.internalActiveRulesKey, c.activeFilterTagsKey, c.noteInterceptMaxKey, c.noteInterceptMinKey, c.numFinalRoundRatingsKey, c.lowDiligenceNoteInterceptKey, c.lowDiligenceNoteFactor1Key, ] def get_helpfulness_scores_cols(self) -> List[str]: """Returns a list of columns which should be present in the helpfulnessScores output.""" return [ c.raterParticipantIdKey, c.internalRaterInterceptKey, c.internalRaterFactor1Key, c.crhCrnhRatioDifferenceKey, c.meanNoteScoreKey, c.raterAgreeRatioKey, c.aboveHelpfulnessThresholdKey, ] def get_internal_helpfulness_scores_cols(self) -> List[str]: """Returns a list of internal columns which should be present in the helpfulnessScores output.""" return [ c.raterParticipantIdKey, c.internalRaterInterceptKey, c.internalRaterFactor1Key, c.crhCrnhRatioDifferenceKey, c.meanNoteScoreKey, c.raterAgreeRatioKey, c.aboveHelpfulnessThresholdKey, c.lowDiligenceRaterInterceptKey, c.lowDiligenceRaterFactor1Key, c.lowDiligenceRaterReputationKey, ] def get_auxiliary_note_info_cols(self) -> List[str]: """Returns a list of columns which should be present in the auxiliaryNoteInfo output.""" return [ c.noteIdKey, c.ratingWeightKey, ] + ( c.notHelpfulTagsAdjustedColumns + c.notHelpfulTagsAdjustedRatioColumns + c.incorrectFilterColumns ) def _get_dropped_note_cols(self) -> List[str]: """Returns a list of columns which should be excluded from scoredNotes and auxiliaryNoteInfo.""" return ( [ c.currentlyRatedHelpfulBoolKey, c.currentlyRatedNotHelpfulBoolKey, c.awaitingMoreRatingsBoolKey, c.currentLabelKey, c.classificationKey, c.numRatingsKey, c.noteAuthorParticipantIdKey, ] + c.helpfulTagsTSVOrder + c.notHelpfulTagsTSVOrder + c.noteParameterUncertaintyTSVAuxColumns ) def _get_dropped_user_cols(self) -> List[str]: """Returns a list of columns which should be excluded from helpfulnessScores output.""" return [] def _prepare_data_for_scoring(self, ratings: pd.DataFrame, final: bool = False) -> pd.DataFrame: """Prepare data for scoring. This includes filtering out notes and raters which do not meet minimum rating counts, and may be overridden by subclasses to add additional filtering. """ if final: return process_data.filter_ratings( ratings, minNumRatingsPerRater=0, minNumRatersPerNote=self._minNumRatersPerNote ) else: return process_data.filter_ratings( ratings, minNumRatingsPerRater=self._minNumRatingsPerRater, minNumRatersPerNote=self._minNumRatersPerNote, ) def _run_regular_matrix_factorization(self, ratingsForTraining: pd.DataFrame): """Train a matrix factorization model on the ratingsForTraining data. Args: ratingsForTraining (pd.DataFrame) Returns: noteParams (pd.DataFrame) raterParams (pd.DataFrame) globalIntercept (float) """ return self._mfRanker.run_mf(ratingsForTraining) def _run_stable_matrix_factorization( self, ratingsForTraining: pd.DataFrame, userEnrollmentRaw: pd.DataFrame, ): """Train a matrix factorization model on the ratingsForTraining data. Due to stability issues when trained on the entire dataset with no initialization, this is done in two steps: 1. Train a model on the subset of the data with modeling group 13 (stable initialization). 2. Train a model on the entire dataset, initializing with the results from step 1. Without this initialization, the factors for some subsets of the data with low crossover between raters can flip relative to each other from run to run. Args: ratingsForTraining (pd.DataFrame) userEnrollmentRaw (pd.DataFrame) Returns: noteParams (pd.DataFrame) raterParams (pd.DataFrame) globalIntercept (float) """ if self._modelingGroupToInitializeForStability is None: return self._run_regular_matrix_factorization(ratingsForTraining) with self.time_block("Prepare data for stable initialization"): ratingsForStableInitialization = get_ratings_for_stable_init( ratingsForTraining, userEnrollmentRaw, self._modelingGroupToInitializeForStability ) with self.time_block("MF on stable-initialization subset"): initializationMF = self._mfRanker.get_new_mf_with_same_args() noteParamsInit, raterParamsInit, globalInterceptInit = initializationMF.run_mf( ratingsForStableInitialization ) with self.time_block("First full MF (initializated with stable-initialization)"): modelResult = self._mfRanker.run_mf( ratingsForTraining, noteInit=noteParamsInit, userInit=raterParamsInit, globalInterceptInit=globalInterceptInit, ) return modelResult def compute_tag_thresholds_for_percentile( self, scoredNotes, raterParams, ratings ) -> Dict[str, float]: with c.time_block(f"{self.get_name()}: Compute tag thresholds for percentiles"): # Compute tag aggregates (in the same way as is done in final scoring in note_ratings.compute_scored_notes) tagAggregates = tag_filter.get_note_tag_aggregates(ratings, scoredNotes, raterParams) assert len(tagAggregates) == len( scoredNotes ), "There should be one aggregate per scored note." scoredNotes = tagAggregates.merge(scoredNotes, on=c.noteIdKey, how="outer") # Compute percentile thresholds for each tag crhNotes = scoredNotes[scoredNotes[c.currentlyRatedHelpfulBoolKey]][[c.noteIdKey]] crhStats = scoredNotes.merge(crhNotes, on=c.noteIdKey, how="inner") thresholds = tag_filter.get_tag_thresholds(crhStats, self._tagFilterPercentile) return thresholds def _prescore_notes_and_users( self, ratings: pd.DataFrame, noteStatusHistory: pd.DataFrame, userEnrollmentRaw: pd.DataFrame ) -> Tuple[pd.DataFrame, pd.DataFrame, c.PrescoringMetaScorerOutput]: """ Fit initial matrix factorization model(s) on the ratings data in order to generate initial note and rater parameters (and rater helpfulness scores) that are passed to the final scoring step. The final scoring step will be able to run faster by using the rater helpfulness scores computed here, and also intializing its parameters with these parameters. Args: ratings (pd.DataFrame) noteStatusHistory (pd.DataFrame) userEnrollmentRaw (pd.DataFrame) Returns: Tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame]: noteParamsUnfiltered (pd.DataFrame) raterParamsUnfiltered (pd.DataFrame) helpfulnessScores (pd.DataFrame) """ if self._seed is not None: logger.info(f"seeding with {self._seed}") torch.manual_seed(self._seed) # Removes ratings where either (1) the note did not receive enough ratings, or # (2) the rater did not rate enough notes. logger.info( f"ratings summary {self.get_name()}: {get_df_fingerprint(ratings, [c.noteIdKey, c.raterParticipantIdKey])}" ) with self.time_block("Prepare ratings"): ratingsForTraining = self._prepare_data_for_scoring( ratings[ [ c.noteIdKey, c.raterParticipantIdKey, c.helpfulNumKey, c.createdAtMillisKey, c.helpfulnessLevelKey, c.notHelpfulIncorrectTagKey, c.notHelpfulIrrelevantSourcesTagKey, c.notHelpfulSourcesMissingOrUnreliableTagKey, c.notHelpfulSpamHarassmentOrAbuseTagKey, c.notHelpfulOtherTagKey, ] ] ) logger.info( f"ratingsForTraining summary {self.get_name()}: {get_df_fingerprint(ratingsForTraining, [c.noteIdKey, c.raterParticipantIdKey])}" ) logger.info( f"noteStatusHistory summary {self.get_name()}: {get_df_fingerprint(noteStatusHistory, [c.noteIdKey])}" ) logger.info( f"userEnrollmentRaw summary {self.get_name()}: {get_df_fingerprint(userEnrollmentRaw, [c.participantIdKey])}" ) if self._saveIntermediateState: self.ratingsForTraining = ratingsForTraining # TODO: Save parameters from this first run in note_model_output next time we add extra fields to model output TSV. with self.time_block("First MF/stable init"): ( noteParamsUnfiltered, raterParamsUnfiltered, globalBias, ) = self._run_stable_matrix_factorization( ratingsForTraining[[c.noteIdKey, c.raterParticipantIdKey, c.helpfulNumKey]], userEnrollmentRaw[[c.participantIdKey, c.modelingGroupKey]], ) if self._saveIntermediateState: self.noteParamsUnfiltered = noteParamsUnfiltered self.raterParamsUnfiltered = raterParamsUnfiltered self.globalBias = globalBias self.assert_train_error_is_below_threshold( ratingsForTraining[[c.noteIdKey]], self._maxFirstMFTrainError ) # If reputation is disabled, generate final intercepts, factors and note status # based on the first round scoring results. Disabling reputation can be desirable # in situations where the overall volume of ratings is lower (e.g. topic models). if not self._useReputation: assert "Topic" in self.get_name(), f"Unexpected scorer: {self.get_name()}" logger.info(f"Skipping rep-filtering in prescoring for {self.get_name()}") helpfulnessScores = raterParamsUnfiltered[[c.raterParticipantIdKey]] helpfulnessScores[ [ c.crhCrnhRatioDifferenceKey, c.meanNoteScoreKey, c.raterAgreeRatioKey, c.aboveHelpfulnessThresholdKey, ] ] = np.nan noteParams = noteParamsUnfiltered raterParams = raterParamsUnfiltered # TODO: delete after we run prescoring diligence properly # diligenceGlobalIntercept = None finalRoundRatings = ratingsForTraining else: assert "Topic" not in self.get_name(), f"Unexpected scorer: {self.get_name()}" logger.info(f"Performing rep-filtering for {self.get_name()}") # Get a dataframe of scored notes based on the algorithm results above with self.time_block("Compute scored notes"): scoredNotes = note_ratings.compute_scored_notes( ratings[ [c.noteIdKey, c.raterParticipantIdKey, c.helpfulnessLevelKey, c.createdAtMillisKey] + c.notHelpfulTagsTSVOrder + c.helpfulTagsTSVOrder ], keep_columns( noteParamsUnfiltered, [ c.noteIdKey, c.internalNoteInterceptKey, c.internalNoteFactor1Key, ] + c.noteParameterUncertaintyTSVColumns, ), raterParamsUnfiltered[ [ c.raterParticipantIdKey, c.internalRaterFactor1Key, ] ], noteStatusHistory[ [ c.noteIdKey, c.createdAtMillisKey, c.noteAuthorParticipantIdKey, c.classificationKey, c.currentLabelKey, c.lockedStatusKey, ] ], minRatingsNeeded=self._minRatingsNeeded, crhThreshold=self._crhThreshold, crnhThresholdIntercept=self._crnhThresholdIntercept, crnhThresholdNoteFactorMultiplier=self._crnhThresholdNoteFactorMultiplier, crnhThresholdNMIntercept=self._crnhThresholdNMIntercept, crnhThresholdUCBIntercept=self._crnhThresholdUCBIntercept, crhSuperThreshold=self._crhSuperThreshold, inertiaDelta=self._inertiaDelta, incorrectFilterThreshold=self._incorrectFilterThreshold, tagFilterThresholds=None, finalRound=False, firmRejectThreshold=self._firmRejectThreshold, ) if self._saveIntermediateState: self.prescoringScoredNotes = scoredNotes # Determine "valid" ratings with self.time_block("Compute valid ratings"): validRatings = note_ratings.get_valid_ratings( ratings[[c.noteIdKey, c.raterParticipantIdKey, c.helpfulNumKey, c.createdAtMillisKey]], noteStatusHistory[ [c.noteIdKey, c.createdAtMillisKey, c.timestampMillisOfNoteMostRecentNonNMRLabelKey] ], scoredNotes[ [ c.noteIdKey, c.currentlyRatedHelpfulBoolKey, c.currentlyRatedNotHelpfulBoolKey, c.awaitingMoreRatingsBoolKey, ] ], ) if self._saveIntermediateState: self.validRatings = validRatings # Assigns contributor (author & rater) helpfulness bit based on (1) performance # authoring and reviewing previous and current notes. with self.time_block("Helpfulness scores pre-harassment "): helpfulnessScoresPreHarassmentFilter = ( helpfulness_scores.compute_general_helpfulness_scores( scoredNotes[ [ c.noteAuthorParticipantIdKey, c.currentlyRatedHelpfulBoolKey, c.currentlyRatedNotHelpfulBoolKey, c.internalNoteInterceptKey, ] ], validRatings[ [c.raterParticipantIdKey, c.ratingAgreesWithNoteStatusKey, c.ratingCountKey] ], self._minMeanNoteScore, self._minCRHVsCRNHRatio, self._minRaterAgreeRatio, ratingsForTraining[[c.noteIdKey, c.raterParticipantIdKey, c.helpfulNumKey]], ) ) if self._saveIntermediateState: self.prescoringHelpfulnessScores = helpfulnessScoresPreHarassmentFilter # Filters ratings matrix to include only rows (ratings) where the rater was # considered helpful. with self.time_block("Filtering by helpfulness score"): ratingsHelpfulnessScoreFilteredPreHarassmentFilter = ( helpfulness_scores.filter_ratings_by_helpfulness_scores( ratingsForTraining[ [ c.noteIdKey, c.raterParticipantIdKey, c.notHelpfulSpamHarassmentOrAbuseTagKey, c.createdAtMillisKey, c.helpfulnessLevelKey, c.notHelpfulOtherTagKey, ] ], helpfulnessScoresPreHarassmentFilter, ) ) if self._saveIntermediateState: self.ratingsHelpfulnessScoreFilteredPreHarassmentFilter = ( ratingsHelpfulnessScoreFilteredPreHarassmentFilter ) with self.time_block("Harassment tag consensus"): harassmentAbuseNoteParams, _, _ = tag_consensus.train_tag_model( ratingsHelpfulnessScoreFilteredPreHarassmentFilter, c.notHelpfulSpamHarassmentOrAbuseTagKey, noteParamsUnfiltered[[c.noteIdKey, c.internalNoteInterceptKey, c.internalNoteFactor1Key]], raterParamsUnfiltered[ [c.raterParticipantIdKey, c.internalRaterInterceptKey, c.internalRaterFactor1Key] ], name="harassment", ) if not self._saveIntermediateState: del ratingsHelpfulnessScoreFilteredPreHarassmentFilter gc.collect() # Assigns contributor (author & rater) helpfulness bit based on (1) performance # authoring and reviewing previous and current notes, and (2) including an extra # penalty for rating a harassment/abuse note as helpful. with self.time_block("Helpfulness scores post-harassment"): helpfulnessScores = helpfulness_scores.compute_general_helpfulness_scores( scoredNotes[ [ c.noteAuthorParticipantIdKey, c.currentlyRatedHelpfulBoolKey, c.currentlyRatedNotHelpfulBoolKey, c.internalNoteInterceptKey, ] ], validRatings[ [c.raterParticipantIdKey, c.ratingAgreesWithNoteStatusKey, c.ratingCountKey] ], self._minMeanNoteScore, self._minCRHVsCRNHRatio, self._minRaterAgreeRatio, ratings=ratingsForTraining[[c.noteIdKey, c.raterParticipantIdKey, c.helpfulNumKey]], tagConsensusHarassmentAbuseNotes=harassmentAbuseNoteParams, tagConsensusHarassmentHelpfulRatingPenalty=self.tagConsensusHarassmentHelpfulRatingPenalty, multiplyPenaltyByHarassmentScore=self.multiplyPenaltyByHarassmentScore, minimumHarassmentScoreToPenalize=self.minimumHarassmentScoreToPenalize, ) if not self._saveIntermediateState: del validRatings gc.collect() if self._saveIntermediateState: self.helpfulnessScores = helpfulnessScores ## One extra final round! # Filter ratings based on prev helpfulness scores with c.time_block("Final round MF"): finalRoundRatings = helpfulness_scores.filter_ratings_by_helpfulness_scores( ratingsForTraining[ [ c.noteIdKey, c.raterParticipantIdKey, c.helpfulNumKey, c.notHelpfulIncorrectTagKey, c.notHelpfulSourcesMissingOrUnreliableTagKey, c.notHelpfulIrrelevantSourcesTagKey, ] ], helpfulnessScores[[c.raterParticipantIdKey, c.aboveHelpfulnessThresholdKey]], ) noteParams, raterParams, globalBias = self._mfRanker.run_mf( ratings=finalRoundRatings[[c.noteIdKey, c.raterParticipantIdKey, c.helpfulNumKey]], noteInit=noteParamsUnfiltered[ [c.noteIdKey, c.internalNoteInterceptKey, c.internalNoteFactor1Key] ], userInit=raterParamsUnfiltered[ [c.raterParticipantIdKey, c.internalRaterInterceptKey, c.internalRaterFactor1Key] ], ) # Run Diligence MF Prescoring, based on the final MF with self.time_block("Low Diligence MF"): # Initialize diligence rater factors with final round helpful MF rater factor raterParamsDiligenceInit = raterParams[ [c.raterParticipantIdKey, c.internalRaterFactor1Key] ].rename({c.internalRaterFactor1Key: c.lowDiligenceRaterFactor1Key}, axis=1) logger.info( f"In {self.get_name()} prescoring, about to call diligence with {len(finalRoundRatings)} final round ratings." ) ( diligenceNoteParams, diligenceRaterParams, diligenceGlobalIntercept, ) = fit_low_diligence_model_prescoring( finalRoundRatings[ [ c.noteIdKey, c.raterParticipantIdKey, c.notHelpfulIncorrectTagKey, c.notHelpfulSourcesMissingOrUnreliableTagKey, c.notHelpfulIrrelevantSourcesTagKey, ] ], raterInitStateDiligence=raterParamsDiligenceInit, ) noteParams = noteParams.merge(diligenceNoteParams, on=c.noteIdKey) raterParams = raterParams.merge(diligenceRaterParams, on=c.raterParticipantIdKey) # Compute scored notes -- currently not returned; only used for downstream computation. scoredNotes = note_ratings.compute_scored_notes( ratings[ [c.noteIdKey, c.raterParticipantIdKey, c.helpfulnessLevelKey, c.createdAtMillisKey] + c.notHelpfulTagsTSVOrder + c.helpfulTagsTSVOrder ], keep_columns( noteParamsUnfiltered, [ c.noteIdKey, c.internalNoteInterceptKey, c.internalNoteFactor1Key, ] + c.noteParameterUncertaintyTSVColumns, ), raterParamsUnfiltered[ [ c.raterParticipantIdKey, c.internalRaterFactor1Key, ] ], noteStatusHistory[ [ c.noteIdKey, c.createdAtMillisKey, c.noteAuthorParticipantIdKey, c.classificationKey, c.currentLabelKey, c.lockedStatusKey, ] ], minRatingsNeeded=self._minRatingsNeeded, crhThreshold=self._crhThreshold, crnhThresholdIntercept=self._crnhThresholdIntercept, crnhThresholdNoteFactorMultiplier=self._crnhThresholdNoteFactorMultiplier, crnhThresholdNMIntercept=self._crnhThresholdNMIntercept, crnhThresholdUCBIntercept=self._crnhThresholdUCBIntercept, crhSuperThreshold=self._crhSuperThreshold, inertiaDelta=self._inertiaDelta, tagFilterThresholds=None, incorrectFilterThreshold=self._incorrectFilterThreshold, finalRound=False, factorThreshold=self._factorThreshold, firmRejectThreshold=self._firmRejectThreshold, ) # Compute meta output metaOutput = c.PrescoringMetaScorerOutput( globalIntercept=globalBias, lowDiligenceGlobalIntercept=diligenceGlobalIntercept, tagFilteringThresholds=self.compute_tag_thresholds_for_percentile( scoredNotes=noteParams[[c.noteIdKey, c.internalNoteFactor1Key]].merge( scoredNotes[[c.noteIdKey, c.currentlyRatedHelpfulBoolKey]], on=c.noteIdKey, suffixes=("", "_dup"), ), raterParams=raterParams[[c.raterParticipantIdKey, c.internalRaterFactor1Key]], ratings=ratings[ [ c.noteIdKey, c.raterParticipantIdKey, ] + c.notHelpfulTagsTSVOrder ], ), finalRoundNumRatings=len(finalRoundRatings), finalRoundNumNotes=finalRoundRatings[c.noteIdKey].nunique(), finalRoundNumUsers=finalRoundRatings[c.raterParticipantIdKey].nunique(), ) # Compute user incorrect tag aggregates userIncorrectTagUsageDf = get_user_incorrect_ratio( ratings[ [ c.noteIdKey, c.raterParticipantIdKey, ] + c.notHelpfulTagsTSVOrder ] ) raterModelOutput = raterParams.merge( helpfulnessScores[ [ c.raterParticipantIdKey, c.crhCrnhRatioDifferenceKey, c.meanNoteScoreKey, c.raterAgreeRatioKey, c.aboveHelpfulnessThresholdKey, ] ], on=c.raterParticipantIdKey, how="outer", ).merge( userIncorrectTagUsageDf, on=c.raterParticipantIdKey, how="left", unsafeAllowed={c.totalRatingsMadeByRaterKey, c.incorrectTagRatingsMadeByRaterKey}, ) noteModelOutput = noteParams # Returning should remove references to these, but manually trigger GC just to reclaim # resources as soon as possible. del ratings del ratingsForTraining del finalRoundRatings gc.collect() return noteModelOutput, raterModelOutput, metaOutput def _score_notes_and_users( self, ratings: pd.DataFrame, noteStatusHistory: pd.DataFrame, prescoringNoteModelOutput: pd.DataFrame, prescoringRaterModelOutput: pd.DataFrame, prescoringMetaScorerOutput: c.PrescoringMetaScorerOutput, flipFactorsForIdentification: bool = False, ) -> Tuple[pd.DataFrame, pd.DataFrame]: """Run the "final" matrix factorization scoring algorithm. Accepts prescoring's output as its input, as well as the new ratings and note status history. See links below for more info: https://twitter.github.io/communitynotes/ranking-notes/ https://twitter.github.io/communitynotes/contributor-scores/. Args: ratings (pd.DataFrame): preprocessed ratings noteStatusHistory (pd.DataFrame): one row per note; history of when note had each status prescoringNoteModelOutput (pd.DataFrame): note parameters. prescoringRaterModelOutput (pd.DataFrame): contains both rater parameters and helpfulnessScores. Returns: Tuple[pd.DataFrame, pd.DataFrame]: noteScores pd.DataFrame: one row per note contained note scores and parameters. userScores pd.DataFrame: one row per user containing a column for each helpfulness score. """ if self._seed is not None: logger.info(f"seeding with {self._seed}") torch.manual_seed(self._seed) # Removes ratings where either the note did not receive enough ratings with self.time_block("Prepare ratings"): ratingsForTraining = self._prepare_data_for_scoring(ratings, final=True) if self._saveIntermediateState: self.ratingsForTraining = ratingsForTraining # Filter raters with no rater parameters in this scorer ratersWithParams = prescoringRaterModelOutput.loc[ ( (~pd.isna(prescoringRaterModelOutput[c.internalRaterInterceptKey])) & (~pd.isna(prescoringRaterModelOutput[c.internalRaterInterceptKey])) ), [c.raterParticipantIdKey], ] ratingsForTraining = ratingsForTraining.merge( ratersWithParams, how="inner", on=c.raterParticipantIdKey ) # Filters ratings matrix to include only rows (ratings) where the rater was # considered helpful. if not self._useReputation: assert ( "Topic" in self.get_name() ), f"Unexpected scorer has reputation filtering disabled: {self.get_name()}" logger.info(f"Skipping rep-filtering in 2nd phase for {self.get_name()}") finalRoundRatings = ratingsForTraining else: finalRoundRatings = helpfulness_scores.filter_ratings_by_helpfulness_scores( ratingsForTraining, prescoringRaterModelOutput ) if self._saveIntermediateState: self.finalRoundRatings = finalRoundRatings assert ( prescoringMetaScorerOutput.finalRoundNumNotes is not None ), "Missing final round num notes" assert ( prescoringMetaScorerOutput.finalRoundNumRatings is not None ), "Missing final round num ratings" assert ( prescoringMetaScorerOutput.finalRoundNumUsers is not None ), "Missing final round num users" if len(finalRoundRatings) == 0: return pd.DataFrame(), pd.DataFrame() # Re-runs matrix factorization using only ratings given by helpful raters. with self.time_block("Final helpfulness-filtered MF"): noteParams, raterParams, globalBias = self._mfRanker.run_mf( ratings=finalRoundRatings, noteInit=prescoringNoteModelOutput, userInit=prescoringRaterModelOutput, globalInterceptInit=prescoringMetaScorerOutput.globalIntercept, freezeRaterParameters=True, freezeGlobalParameters=True, ratingPerNoteLossRatio=prescoringMetaScorerOutput.finalRoundNumRatings / prescoringMetaScorerOutput.finalRoundNumNotes, flipFactorsForIdentification=flipFactorsForIdentification, ) if self._saveIntermediateState: self.noteParams = noteParams self.raterParams = raterParams self.globalBias = globalBias self.finalRoundRatings = finalRoundRatings # self.assert_train_error_is_below_threshold(finalRoundRatings, self._maxFinalMFTrainError) # Add pseudo-raters with the most extreme parameters and re-score notes, to estimate # upper and lower confidence bounds on note parameters. if self._pseudoraters: with self.time_block("Pseudoraters"): noteParams = PseudoRatersRunner( finalRoundRatings, noteParams, raterParams, globalBias, self._mfRanker ).compute_note_parameter_confidence_bounds_with_pseudo_raters() if self._saveIntermediateState: self.prePseudoratersNoteParams = self.noteParams self.noteParams = noteParams else: for col in c.noteParameterUncertaintyTSVColumns: noteParams[col] = np.nan # Add low diligence intercepts. with self.time_block("Low Diligence Reputation Model"): logger.info( f"In {self.get_name()} final scoring, about to call diligence with {len(finalRoundRatings)} final round ratings." ) assert ( prescoringMetaScorerOutput.lowDiligenceGlobalIntercept is not None ), "Missing low diligence global intercept" diligenceNoteParams, diligenceRaterParams = fit_low_diligence_model_final( finalRoundRatings, noteInitStateDiligence=prescoringNoteModelOutput, raterInitStateDiligence=prescoringRaterModelOutput, globalInterceptDiligence=prescoringMetaScorerOutput.lowDiligenceGlobalIntercept, ratingsPerNoteLossRatio=prescoringMetaScorerOutput.finalRoundNumRatings / prescoringMetaScorerOutput.finalRoundNumNotes, ratingsPerUserLossRatio=prescoringMetaScorerOutput.finalRoundNumRatings / prescoringMetaScorerOutput.finalRoundNumUsers, ) logger.info(f"diligenceNP cols: {diligenceNoteParams.columns}") noteParams = noteParams.merge(diligenceNoteParams, on=c.noteIdKey) logger.info(f"np cols: {noteParams.columns}") if self._saveIntermediateState: self.noteParams = noteParams self.raterParams = raterParams self.globalBias = globalBias raterParamsWithRatingCounts = raterParams.merge( prescoringRaterModelOutput[ [c.raterParticipantIdKey, c.incorrectTagRatingsMadeByRaterKey, c.totalRatingsMadeByRaterKey] ], on=c.raterParticipantIdKey, ) # Assigns updated CRH / CRNH bits to notes based on volume of prior ratings # and ML output. with self.time_block("Final compute scored notes"): logger.info(f"About to call compute_scored_notes with {self.get_name()}") scoredNotes = note_ratings.compute_scored_notes( ratings, noteParams, raterParamsWithRatingCounts, noteStatusHistory, minRatingsNeeded=self._minRatingsNeeded, crhThreshold=self._crhThreshold, crnhThresholdIntercept=self._crnhThresholdIntercept, crnhThresholdNoteFactorMultiplier=self._crnhThresholdNoteFactorMultiplier, crnhThresholdNMIntercept=self._crnhThresholdNMIntercept, crnhThresholdUCBIntercept=self._crnhThresholdUCBIntercept, crhSuperThreshold=self._crhSuperThreshold, inertiaDelta=self._inertiaDelta, tagFilterThresholds=prescoringMetaScorerOutput.tagFilteringThresholds, incorrectFilterThreshold=self._incorrectFilterThreshold, lowDiligenceThreshold=self._lowDiligenceThreshold, finalRound=True, factorThreshold=self._factorThreshold, firmRejectThreshold=self._firmRejectThreshold, ) logger.info(f"sn cols: {scoredNotes.columns}") # Takes raterParams from the MF run, but use the pre-computed # helpfulness scores from prescoringRaterModelOutput. helpfulnessScores = raterParams.merge( prescoringRaterModelOutput[ [ c.raterParticipantIdKey, c.crhCrnhRatioDifferenceKey, c.meanNoteScoreKey, c.raterAgreeRatioKey, c.aboveHelpfulnessThresholdKey, ] ], on=c.raterParticipantIdKey, how="outer", ) if self._saveIntermediateState: self.scoredNotes = scoredNotes self.helpfulnessScores = helpfulnessScores return scoredNotes, helpfulnessScores def score_final(self, scoringArgs: c.FinalScoringArgs) -> c.ModelResult: """ Process ratings to assign status to notes and optionally compute rater properties. Accepts prescoringNoteModelOutput and prescoringRaterModelOutput as args (fields on scoringArgs) which are the outputs of the prescore() function. These are used to initialize the final scoring. It filters the prescoring output to only include the rows relevant to this scorer, based on the c.scorerNameKey field of those dataframes. """ torch.set_num_threads(self._threads) logger.info( f"score_final: Torch intra-op parallelism for {self.get_name()} set to: {torch.get_num_threads()}" ) # Filter unfiltered params to just params for this scorer (with copy). # Avoid editing the dataframe in FinalScoringArgs, which is shared across scorers. prescoringNoteModelOutput = scoringArgs.prescoringNoteModelOutput[ scoringArgs.prescoringNoteModelOutput[c.scorerNameKey] == self.get_name() ].drop(columns=c.scorerNameKey, inplace=False) if scoringArgs.prescoringRaterModelOutput is None: return self._return_empty_final_scores() prescoringRaterModelOutput = scoringArgs.prescoringRaterModelOutput[ scoringArgs.prescoringRaterModelOutput[c.scorerNameKey] == self.get_name() ].drop(columns=c.scorerNameKey, inplace=False) if self.get_name() not in scoringArgs.prescoringMetaOutput.metaScorerOutput: logger.info( f"Scorer {self.get_name()} not found in prescoringMetaOutput; returning empty scores from final scoring." ) return self._return_empty_final_scores() prescoringMetaScorerOutput = scoringArgs.prescoringMetaOutput.metaScorerOutput[self.get_name()] # Filter raw input with self.time_block("Filter input"): ratings, noteStatusHistory = self._filter_input( scoringArgs.noteTopics, scoringArgs.ratings, scoringArgs.noteStatusHistory, scoringArgs.userEnrollment, ) # If there are no ratings left after filtering, then return empty dataframes. if len(ratings) == 0: return self._return_empty_final_scores() noteScores, userScores = self._score_notes_and_users( ratings=ratings, noteStatusHistory=noteStatusHistory, prescoringNoteModelOutput=prescoringNoteModelOutput, prescoringRaterModelOutput=prescoringRaterModelOutput, prescoringMetaScorerOutput=prescoringMetaScorerOutput, flipFactorsForIdentification=False, ) if len(noteScores) == 0 and len(userScores) == 0: logger.info( "No ratings left after filtering that happens in _score_notes_and_users, returning empty " "dataframes" ) return self._return_empty_final_scores() with self.time_block("Postprocess output"): # Only some subclasses do any postprocessing. # E.g. topic models add confidence bit, group models prune according to authorship filter. noteScores, userScores = self._postprocess_output( noteScores, userScores, scoringArgs.ratings, scoringArgs.noteStatusHistory, scoringArgs.userEnrollment, ) ## TODO: refactor this logic to compute 2nd round ratings out so score_final doesn't need to be overridden and duplicated. scoredNoteFinalRoundRatings = ( ratings[[c.raterParticipantIdKey, c.noteIdKey]] .merge(userScores[[c.raterParticipantIdKey]], on=c.raterParticipantIdKey) .groupby(c.noteIdKey) .agg("count") .reset_index() .rename(columns={c.raterParticipantIdKey: c.numFinalRoundRatingsKey}) ) noteScores = noteScores.merge( scoredNoteFinalRoundRatings, on=c.noteIdKey, how="left", unsafeAllowed=[c.defaultIndexKey, c.numFinalRoundRatingsKey], ) noteScores = noteScores.rename(columns=self._get_note_col_mapping()) userScores = userScores.rename(columns=self._get_user_col_mapping()) # Process noteScores noteScores = noteScores.drop(columns=self._get_dropped_note_cols()) assert set(noteScores.columns) == set( self.get_scored_notes_cols() + self.get_auxiliary_note_info_cols() ), f"""all columns must be either dropped or explicitly defined in an output. Extra columns that were in noteScores: {set(noteScores.columns) - set(self.get_scored_notes_cols() + self.get_auxiliary_note_info_cols())} Missing expected columns that should've been in noteScores: {set(self.get_scored_notes_cols() + self.get_auxiliary_note_info_cols()) - set(noteScores.columns)}""" # Process userScores userScores = userScores.drop(columns=self._get_dropped_user_cols()) assert set(userScores.columns) == set(self.get_helpfulness_scores_cols()), f"""all columns must be either dropped or explicitly defined in an output. Extra columns that were in userScores: {set(userScores.columns) - set(self.get_helpfulness_scores_cols())} Missing expected columns that should've been in userScores: {set(self.get_helpfulness_scores_cols()) - set(userScores.columns)}""" # Return dataframes with specified columns in specified order return c.ModelResult( scoredNotes=noteScores[self.get_scored_notes_cols()], helpfulnessScores=userScores[self.get_helpfulness_scores_cols()] if self.get_helpfulness_scores_cols() else None, auxiliaryNoteInfo=noteScores[self.get_auxiliary_note_info_cols()] if self.get_auxiliary_note_info_cols() else None, scorerName=self.get_name(), metaScores=None, )