sourcecode/scoring/scorer.py (325 lines of code) (raw):
from abc import ABC, abstractmethod
from contextlib import contextmanager
import gc
import logging
import time
from typing import Dict, List, Optional, Set, Tuple
from . import constants as c
from .constants import FinalScoringArgs, ModelResult, PrescoringArgs
from .pandas_utils import keep_columns
import numpy as np
import pandas as pd
import torch
logger = logging.getLogger("birdwatch.scorer")
logger.setLevel(logging.INFO)
_IN_GROUP = "inGroup"
class EmptyRatingException(Exception):
"""Exception rasied when no ratings are available"""
class Scorer(ABC):
"""Base class which all other scorers must extend.
The Scorer base class defines "score" function which wraps around _score_notes_and_users
and works with other helper functions defining output columns. This paradigm is designed
to improve code readability and decrease bugs by forcing subclasses to be very clear about
exactly which columns are output and which are dropped.
"""
def __init__(
self,
includedTopics: Set[str] = set(),
includedGroups: Set[int] = set(),
includeUnassigned: bool = False,
captureThreshold: Optional[float] = None,
seed: Optional[int] = None,
threads: int = c.defaultNumThreads,
) -> None:
"""Configure a new Scorer object.
Args:
seed (int, optional): if not None, seed value to ensure deterministic execution
"""
self._includedTopics = includedTopics
self._includedGroups = includedGroups
self._includeUnassigned = includeUnassigned
self._captureThreshold = captureThreshold
self._seed = seed
self._threads = threads
@contextmanager
def time_block(self, label):
start = time.time()
try:
yield
finally:
end = time.time()
logger.info(
f"{self.get_name()} {label} elapsed time: {end - start:.2f} secs ({((end-start)/60.0):.2f} mins)"
)
def get_name(self):
return str(type(self))
@abstractmethod
def get_scored_notes_cols(self) -> List[str]:
"""Returns a list of columns which should be present in the scoredNotes output."""
@abstractmethod
def get_internal_scored_notes_cols(self) -> List[str]:
"""Returns a list of internal columns which should be present in the scoredNotes output."""
@abstractmethod
def get_helpfulness_scores_cols(self) -> List[str]:
"""Returns a list of columns which should be present in the helpfulnessScores output."""
@abstractmethod
def get_internal_helpfulness_scores_cols(self) -> List[str]:
"""Returns a list of internal columns which should be present in the helpfulnessScores output."""
@abstractmethod
def get_auxiliary_note_info_cols(self) -> List[str]:
"""Returns a list of columns which should be present in the auxiliaryNoteInfo output."""
@abstractmethod
def _get_dropped_note_cols(self) -> List[str]:
"""Returns a list of columns which should be excluded from scoredNotes and auxiliaryNoteInfo."""
@abstractmethod
def _get_dropped_user_cols(self) -> List[str]:
"""Returns a list of columns which should be excluded from helpfulnessScores output."""
def _filter_input(
self,
noteTopics: pd.DataFrame,
ratings: pd.DataFrame,
noteStatusHistory: pd.DataFrame,
userEnrollment: pd.DataFrame,
) -> Tuple[pd.DataFrame, pd.DataFrame]:
"""Prune the contents of ratings and noteStatusHistory to scope model behavior.
Args:
ratings (pd.DataFrame): preprocessed ratings
noteStatusHistory (pd.DataFrame): one row per note; history of when note had each status
userEnrollment (pd.DataFrame): one row per user specifying enrollment properties
Returns:
Tuple[pd.DataFrame, pd.DataFrame]:
ratings: ratings filtered to only contain rows of interest
noteStatusHistory: noteStatusHistory filtered to only contain rows of interest
"""
if (not self._includedGroups) and (not self._includedTopics):
return ratings, noteStatusHistory
logger.info(f"Filtering ratings for {self.get_name()}. Original rating length: {len(ratings)}")
# Apply topic filter
if self._includedTopics:
notes = noteTopics[noteTopics[c.noteTopicKey].isin(self._includedTopics)][[c.noteIdKey]]
ratings = ratings.merge(notes)
noteStatusHistory = noteStatusHistory.merge(notes)
logger.info(f" Ratings after topic filter: {len(ratings)}")
# Apply group filter
if self._includedGroups:
userEnrollment = userEnrollment[[c.participantIdKey, c.modelingGroupKey]].rename(
columns={c.participantIdKey: c.raterParticipantIdKey}
)
userEnrollment.loc[:, _IN_GROUP] = (
userEnrollment[c.modelingGroupKey].isin(self._includedGroups).astype(pd.BooleanDtype())
)
ratings = ratings.merge(
userEnrollment[[c.raterParticipantIdKey, _IN_GROUP]], on=c.raterParticipantIdKey, how="left"
)
logger.info(f" Ratings without assigned group: {ratings[_IN_GROUP].isna().sum()}")
ratings = ratings.fillna({_IN_GROUP: self._includeUnassigned})
ratings = ratings[ratings[_IN_GROUP]].drop(columns=[_IN_GROUP])
logger.info(f" Ratings after group filter: {len(ratings)}")
return ratings, noteStatusHistory
def _postprocess_output(
self,
noteScores: pd.DataFrame,
userScores: pd.DataFrame,
ratings: pd.DataFrame,
noteStatusHistory: pd.DataFrame,
userEnrollment: pd.DataFrame,
) -> Tuple[pd.DataFrame, pd.DataFrame]:
"""Prune noteScores and userScores and augment with any additional columns as necessary.
Note that ratings, noteStatusHistory and userEnrollment are expected to be the *raw*
versions which were supplied to "score", not the version output after filtering.
Operating on the raw versions allows accurately computing statistics over the entire dataset
(e.g. fraction of users from a modeling group).
Args:
noteScores (pd.DataFrame): scoring output for notes
userScores (pd.DataFrame): scoirng output for users
ratings (pd.DataFrame): preprocessed ratings
noteStatusHistory (pd.DataFrame): one row per note; history of when note had each status
userEnrollment (pd.DataFrame): one row per user specifying enrollment properties
Returns:
Tuple[pd.DataFrame, pd.DataFrame]:
noteScores: note scoring output from _score_notes_and_users
userScores: user scoring output from _score_notes_and_users
"""
if self._captureThreshold is None:
logger.info(f"Skipping postprocessing for {self.get_name()}: captureThreshold is None.")
return noteScores, userScores
# Identify notes with enough ratings from within the modeling group.
logger.info(f"Postprocessing output for {self.get_name()}")
assert self._includedGroups, "includedGroups must be set"
userEnrollment = userEnrollment[[c.participantIdKey, c.modelingGroupKey]].rename(
columns={c.participantIdKey: c.raterParticipantIdKey}
)
userEnrollment.loc[:, _IN_GROUP] = (
userEnrollment[c.modelingGroupKey].isin(self._includedGroups).astype(pd.BooleanDtype())
)
ratings = ratings.merge(
userEnrollment[[c.raterParticipantIdKey, _IN_GROUP]], on=c.raterParticipantIdKey, how="left"
)
ratings = ratings.fillna({_IN_GROUP: self._includeUnassigned})
ratios = ratings[[c.noteIdKey, _IN_GROUP]].groupby(c.noteIdKey).mean().reset_index()
logger.info(f" Original noteScores length: {len(noteScores)}")
noteScores = noteScores.merge(
ratios[ratios[_IN_GROUP] >= self._captureThreshold][[c.noteIdKey]]
)
logger.info(f" Final noteScores length: {len(noteScores)}")
return noteScores, userScores
def _get_note_col_mapping(self) -> Dict[str, str]:
"""Returns a dict mapping default note column names to custom names for a specific model."""
return {c.lowDiligenceNoteInterceptKey: c.lowDiligenceLegacyNoteInterceptKey}
def _get_user_col_mapping(self) -> Dict[str, str]:
"""Returns a dict mapping default user column names to custom names for a specific model."""
return {}
@abstractmethod
def _prescore_notes_and_users(
self, ratings: pd.DataFrame, noteStatusHistory: pd.DataFrame, userEnrollmentRaw: pd.DataFrame
) -> Tuple[pd.DataFrame, pd.DataFrame, c.PrescoringMetaScorerOutput]:
"""
Runs initial rounds of the matrix factorization scoring algorithm and returns intermediate
output that can be used to initialize and reduce the runtime of final scoring.
Args:
ratings (pd.DataFrame)
noteStatusHistory (pd.DataFrame)
userEnrollmentRaw (pd.DataFrame)
Returns:
Tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame]:
prescoringNoteModelOutput (pd.DataFrame)
prescoringRaterModelOutput (pd.DataFrame)
"""
@abstractmethod
def _score_notes_and_users(
self,
ratings: pd.DataFrame,
noteStatusHistory: pd.DataFrame,
prescoringNoteModelOutput: pd.DataFrame,
prescoringRaterModelOutput: pd.DataFrame,
prescoringMetaScorerOutput: c.PrescoringMetaScorerOutput,
) -> Tuple[pd.DataFrame, pd.DataFrame]:
"""Run the matrix factorization scoring algorithm.
See links below for more info:
https://twitter.github.io/communitynotes/ranking-notes/
https://twitter.github.io/communitynotes/contributor-scores/.
Args:
ratings (pd.DataFrame): preprocessed ratings
noteStatusHistory (pd.DataFrame): one row per note; history of when note had each status
prescoringNoteModelOutput (pd.DataFrame)
raterParamsUnfiltered (pd.DataFrame)
usePreviouslySavedStateIfExists (bool)
Returns:
Tuple[pd.DataFrame, pd.DataFrame]:
noteScores pd.DataFrame: one row per note contained note scores and parameters.
userScores pd.DataFrame: one row per user containing a column for each helpfulness score.
"""
def prescore(self, scoringArgs: PrescoringArgs, preserveRatings: bool = True) -> ModelResult:
"""
Runs initial rounds of the matrix factorization scoring algorithm and returns intermediate
output that can be used to initialize and reduce the runtime of final scoring.
"""
torch.set_num_threads(self._threads)
logger.info(
f"prescore: Torch intra-op parallelism for {self.get_name()} set to: {torch.get_num_threads()}"
)
# Transform input, run core scoring algorithm, transform output.
with self.time_block("Filter input"):
ratings, noteStatusHistory = self._filter_input(
scoringArgs.noteTopics,
keep_columns(
scoringArgs.ratings,
[
c.noteIdKey,
c.raterParticipantIdKey,
c.helpfulNumKey,
c.helpfulnessLevelKey,
c.createdAtMillisKey,
]
+ c.notHelpfulTagsTSVOrder
+ c.helpfulTagsTSVOrder,
),
scoringArgs.noteStatusHistory,
scoringArgs.userEnrollment,
)
if not preserveRatings:
# Only remove ratings if we're running in parallel, since otherwise later scorers will
# need the ratings.
del scoringArgs.ratings
gc.collect()
# If there are no ratings left after filtering, then return empty dataframes.
if len(ratings) == 0:
return ModelResult(
pd.DataFrame(columns=self.get_internal_scored_notes_cols()),
(
pd.DataFrame(columns=self.get_internal_helpfulness_scores_cols())
if self.get_internal_helpfulness_scores_cols()
else None
),
(
pd.DataFrame(columns=self.get_auxiliary_note_info_cols())
if self.get_auxiliary_note_info_cols()
else None
),
self.get_name(),
None,
)
noteScores, userScores, metaScores = self._prescore_notes_and_users(
ratings, noteStatusHistory, scoringArgs.userEnrollment
)
# Returning should remove references to ratings, but manually trigger GC just to reclaim
# resources as soon as possible.
del ratings
gc.collect()
# Return dataframes with specified columns in specified order
# Reindex fills required columns with NaN if they aren't present in the original df.
return ModelResult(
scoredNotes=noteScores.reindex(
columns=c.prescoringNoteModelOutputTSVColumns, fill_value=np.nan
),
helpfulnessScores=userScores.reindex(
columns=c.prescoringRaterModelOutputTSVColumns, fill_value=np.nan
),
auxiliaryNoteInfo=noteScores.reindex(
columns=self.get_auxiliary_note_info_cols(), fill_value=np.nan
),
scorerName=self.get_name(),
metaScores=metaScores,
)
def _return_empty_final_scores(self) -> ModelResult:
return ModelResult(
scoredNotes=pd.DataFrame(columns=self.get_scored_notes_cols()),
helpfulnessScores=(
pd.DataFrame(columns=self.get_helpfulness_scores_cols())
if self.get_helpfulness_scores_cols()
else None
),
auxiliaryNoteInfo=(
pd.DataFrame(columns=self.get_auxiliary_note_info_cols())
if self.get_auxiliary_note_info_cols()
else None
),
scorerName=self.get_name(),
metaScores=None,
)
def score_final(self, scoringArgs: FinalScoringArgs) -> ModelResult:
"""
Process ratings to assign status to notes and optionally compute rater properties.
Accepts prescoringNoteModelOutput and prescoringRaterModelOutput as args (fields on scoringArgs)
which are the outputs of the prescore() function. These are used to initialize the final scoring.
It filters the prescoring output to only include the rows relevant to this scorer, based on the
c.scorerNameKey field of those dataframes.
"""
torch.set_num_threads(self._threads)
logger.info(
f"score_final: Torch intra-op parallelism for {self.get_name()} set to: {torch.get_num_threads()}"
)
# Filter unfiltered params to just params for this scorer (with copy).
# Avoid editing the dataframe in FinalScoringArgs, which is shared across scorers.
prescoringNoteModelOutput = scoringArgs.prescoringNoteModelOutput[
scoringArgs.prescoringNoteModelOutput[c.scorerNameKey] == self.get_name()
].drop(columns=c.scorerNameKey, inplace=False)
if scoringArgs.prescoringRaterModelOutput is None:
return self._return_empty_final_scores()
prescoringRaterModelOutput = scoringArgs.prescoringRaterModelOutput[
scoringArgs.prescoringRaterModelOutput[c.scorerNameKey] == self.get_name()
].drop(columns=c.scorerNameKey, inplace=False)
if self.get_name() not in scoringArgs.prescoringMetaOutput.metaScorerOutput:
logger.info(
f"Scorer {self.get_name()} not found in prescoringMetaOutput; returning empty scores from final scoring."
)
return self._return_empty_final_scores()
prescoringMetaScorerOutput = scoringArgs.prescoringMetaOutput.metaScorerOutput[self.get_name()]
# Filter raw input
with self.time_block("Filter input"):
ratings, noteStatusHistory = self._filter_input(
scoringArgs.noteTopics,
scoringArgs.ratings,
scoringArgs.noteStatusHistory,
scoringArgs.userEnrollment,
)
# If there are no ratings left after filtering, then return empty dataframes.
if len(ratings) == 0:
logger.info(
f"No rating left after filtering for Scorer {self.get_name()}, returning empty dataframes."
)
return self._return_empty_final_scores()
try:
noteScores, userScores = self._score_notes_and_users(
ratings=ratings,
noteStatusHistory=noteStatusHistory,
prescoringNoteModelOutput=prescoringNoteModelOutput,
prescoringRaterModelOutput=prescoringRaterModelOutput,
prescoringMetaScorerOutput=prescoringMetaScorerOutput,
)
except EmptyRatingException:
logger.info(f"EmptyRatingException for Scorer {self.get_name()}")
return self._return_empty_final_scores()
with self.time_block("Postprocess output"):
# Only some subclasses do any postprocessing.
# E.g. topic models add confidence bit, group models prune according to authorship filter.
noteScores, userScores = self._postprocess_output(
noteScores,
userScores,
scoringArgs.ratings,
scoringArgs.noteStatusHistory,
scoringArgs.userEnrollment,
)
noteScores = noteScores.rename(columns=self._get_note_col_mapping())
userScores = userScores.rename(columns=self._get_user_col_mapping())
# Process noteScores
noteScores = noteScores.drop(columns=self._get_dropped_note_cols())
assert set(noteScores.columns) == set(
self.get_scored_notes_cols() + self.get_auxiliary_note_info_cols()
), f"""all columns must be either dropped or explicitly defined in an output.
Extra columns that were in noteScores: {set(noteScores.columns) - set(self.get_scored_notes_cols() + self.get_auxiliary_note_info_cols())}
Missing expected columns that should've been in noteScores: {set(self.get_scored_notes_cols() + self.get_auxiliary_note_info_cols()) - set(noteScores.columns)}"""
# Process userScores
userScores = userScores.drop(columns=self._get_dropped_user_cols())
assert set(userScores.columns) == set(self.get_helpfulness_scores_cols()), f"""all columns must be either dropped or explicitly defined in an output.
Extra columns that were in userScores: {set(userScores.columns) - set(self.get_helpfulness_scores_cols())}
Missing expected columns that should've been in userScores: {set(self.get_helpfulness_scores_cols()) - set(userScores.columns)}"""
# Return dataframes with specified columns in specified order
return ModelResult(
scoredNotes=noteScores[self.get_scored_notes_cols()],
helpfulnessScores=userScores[self.get_helpfulness_scores_cols()]
if self.get_helpfulness_scores_cols()
else None,
auxiliaryNoteInfo=noteScores[self.get_auxiliary_note_info_cols()]
if self.get_auxiliary_note_info_cols()
else None,
scorerName=self.get_name(),
metaScores=None,
)
def score(
self,
noteTopics: pd.DataFrame,
ratings: pd.DataFrame,
noteStatusHistory: pd.DataFrame,
userEnrollment: pd.DataFrame,
) -> Tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame]:
"""
This function is deprecated and only included for testing purposes for now. Not intended to be called in
main code flow (since the scorer will be split, and this function calls both phases sequentially)
"""
logger.info(
"CALLED DEPRECATED scorer.score() function. Prefer sequentially calling prescore() then score_final()."
)
prescoringModelResult = self.prescore(
PrescoringArgs(
noteTopics=noteTopics,
ratings=ratings,
noteStatusHistory=noteStatusHistory,
userEnrollment=userEnrollment,
)
)
if prescoringModelResult.scoredNotes is not None:
prescoringModelResult.scoredNotes[c.scorerNameKey] = prescoringModelResult.scorerName
if prescoringModelResult.helpfulnessScores is not None:
prescoringModelResult.helpfulnessScores[c.scorerNameKey] = prescoringModelResult.scorerName
if (
prescoringModelResult.metaScores is not None and prescoringModelResult.scorerName is not None
):
prescoringMetaOutput = c.PrescoringMetaOutput(
metaScorerOutput={prescoringModelResult.scorerName: prescoringModelResult.metaScores}
)
else:
prescoringMetaOutput = c.PrescoringMetaOutput(metaScorerOutput={})
finalScoringArgs = FinalScoringArgs(
noteTopics=noteTopics,
ratings=ratings,
noteStatusHistory=noteStatusHistory,
userEnrollment=userEnrollment,
prescoringNoteModelOutput=prescoringModelResult.scoredNotes,
prescoringRaterModelOutput=prescoringModelResult.helpfulnessScores,
prescoringMetaOutput=prescoringMetaOutput,
)
finalModelResult = self.score_final(finalScoringArgs)
return (
finalModelResult.scoredNotes,
finalModelResult.helpfulnessScores,
finalModelResult.auxiliaryNoteInfo,
)