sourcecode/scoring/matrix_factorization/pseudo_raters.py (286 lines of code) (raw):
from dataclasses import dataclass
import logging
from .. import constants as c
from .matrix_factorization import Constants as mf_c, MatrixFactorization
import numpy as np
import pandas as pd
import torch
logger = logging.getLogger("birdwatch.pseudo_raters")
logger.setLevel(logging.INFO)
@dataclass
class Constants:
extraRaterInterceptKey = "extraRaterIntercept"
extraRaterFactor1Key = "extraRaterFactor1"
extraRatingHelpfulNumKey = "extraRatingHelpfulNum"
originalKey = "original"
refitOriginalKey = "refit_orig"
allKey = "all"
negFacKey = "neg_fac"
posFacKey = "pos_fac"
class PseudoRatersRunner:
def __init__(
self,
ratings: pd.DataFrame,
noteParams: pd.DataFrame,
raterParams: pd.DataFrame,
globalBias: float,
mfRanker: MatrixFactorization,
log=True,
checkParamsSame=True,
):
self._log = log
self._mfRanker = mfRanker
self._checkParamsSame = checkParamsSame
self.ratings = ratings
(
self.noteIdMap,
self.raterIdMap,
self.ratingFeaturesAndLabels,
) = self._mfRanker.get_note_and_rater_id_maps(ratings)
self.noteParams = noteParams
self.raterParams = raterParams
self.globalBias = globalBias
def compute_note_parameter_confidence_bounds_with_pseudo_raters(self):
with c.time_block("Pseudoraters: prepare data"):
self._make_extreme_raters(self.raterParams, self.raterIdMap)
self._add_extreme_raters_to_id_maps_and_params()
self._create_extreme_ratings()
with c.time_block("Pseudoraters: fit models"):
noteParamsList = self._fit_note_params_for_each_dataset_with_extreme_ratings()
with c.time_block("Pseudoraters: aggregate"):
notesWithConfidenceBounds = self._aggregate_note_params(noteParamsList)
noteParams = self.noteParams.merge(
notesWithConfidenceBounds.reset_index(), on=c.noteIdKey, how="left"
)
return noteParams
def _check_rater_parameters_same(self, newMatrixFactorization: MatrixFactorization):
(
noteParamsFromNewModel,
raterParamsFromNewModel,
) = newMatrixFactorization._get_parameters_from_trained_model()
raterParamsCols = [
c.raterParticipantIdKey,
c.internalRaterInterceptKey,
c.internalRaterFactor1Key,
]
newParams = raterParamsFromNewModel.loc[:, raterParamsCols].set_index(c.raterParticipantIdKey)
oldParams = self.raterParams.loc[:, raterParamsCols].set_index(c.raterParticipantIdKey)
overlapParticipantIds = newParams.index.intersection(oldParams.index)
assert len(overlapParticipantIds) == len(oldParams)
def _check_note_parameters_same(self, newMatrixFactorization: MatrixFactorization):
(
noteParamsFromNewModel,
raterParamsFromNewModel,
) = newMatrixFactorization._get_parameters_from_trained_model(
flipFactorsForIdentification=False
)
assert (noteParamsFromNewModel == self.noteParams).all().all()
def _make_extreme_raters(self, raterParams: pd.DataFrame, raterIdMap: pd.DataFrame):
"""Populates self.extremeRaters, which is a list of dicts with rater id info"""
# Because LCB is turned off and we therefore don't use not-helpful pseudoratings anymore,
# we only include the min rater intercept (which produces the highest possible note intercept)
# If we were to use not-helpful pseudoratings, we would also include the max rater intercept.
raterInterceptValues = [
raterParams[c.internalRaterInterceptKey].min(),
]
raterFactorValues = [
raterParams[c.internalRaterFactor1Key].min(),
0.0,
raterParams[c.internalRaterFactor1Key].max(),
]
self.extremeRaters = []
i = 0
for raterIntercept in raterInterceptValues:
for raterFactor in raterFactorValues:
# These pseudo-raters need to have IDs that don't conflict with real raterParticipantIds
raterParticipantId = -1 - i
raterIndex = raterIdMap[mf_c.raterIndexKey].max() + 1 + i
self.extremeRaters.append(
{
c.raterParticipantIdKey: raterParticipantId,
mf_c.raterIndexKey: raterIndex,
c.internalRaterInterceptKey: raterIntercept,
c.internalRaterFactor1Key: raterFactor,
}
)
i += 1
def _add_extreme_raters_to_id_maps_and_params(self):
"""Adds extreme raters to a new copy of raterIdMap and raterParams called
self.raterIdMapWithExtreme and self.raterParamsWithExtreme
"""
# TODO: should concat all at once for efficiency instead of iterative appends
self.raterIdMapWithExtreme = self.raterIdMap.copy(deep=True)
self.raterParamsWithExtreme = self.raterParams.copy(deep=True)
for i, raterDict in enumerate(self.extremeRaters):
if not (self.raterIdMap[c.raterParticipantIdKey] == raterDict[c.raterParticipantIdKey]).any():
self.raterIdMapWithExtreme = pd.concat(
[
self.raterIdMapWithExtreme,
pd.DataFrame(
{
c.raterParticipantIdKey: [raterDict[c.raterParticipantIdKey]],
mf_c.raterIndexKey: [raterDict[mf_c.raterIndexKey]],
}
),
],
unsafeAllowed=c.raterParticipantIdKey,
)
if not (
self.raterParams[c.raterParticipantIdKey] == raterDict[c.raterParticipantIdKey]
).any():
self.raterParamsWithExtreme = pd.concat(
[
self.raterParamsWithExtreme,
pd.DataFrame(
{
c.raterParticipantIdKey: [raterDict[c.raterParticipantIdKey]],
c.internalRaterInterceptKey: [raterDict[c.internalRaterInterceptKey]],
c.internalRaterFactor1Key: [raterDict[c.internalRaterFactor1Key]],
}
),
],
unsafeAllowed={
c.raterParticipantIdKey,
c.internalRaterInterceptKey,
c.internalRaterFactor1Key,
},
)
def _create_new_model_with_extreme_raters_from_original_params(
self, ratingFeaturesAndLabelsWithExtremeRatings: pd.DataFrame
):
"""Create a new instance of BiasedMatrixFactorization, initialized with the original
model parameters, but with extreme raters added to the raterIdMap and raterParams
"""
newExtremeMF = self._mfRanker.get_new_mf_with_same_args()
newExtremeMF.noteIdMap = self.noteIdMap
newExtremeMF.raterIdMap = self.raterIdMapWithExtreme
newExtremeMF.ratingFeaturesAndLabels = ratingFeaturesAndLabelsWithExtremeRatings
newExtremeMF._create_mf_model(
self.noteParams.copy(deep=True), self.raterParamsWithExtreme.copy(deep=True), self.globalBias
)
newExtremeMF.mf_model.freeze_rater_and_global_parameters()
newExtremeMF.optimizer = torch.optim.Adam(
list(newExtremeMF.mf_model.note_intercepts.parameters())
+ list(newExtremeMF.mf_model.note_factors.parameters()),
lr=newExtremeMF._initLearningRate,
)
if self._checkParamsSame:
self._check_note_parameters_same(newExtremeMF)
self._check_rater_parameters_same(newExtremeMF)
return newExtremeMF
def _fit_all_notes_with_raters_constant(self, ratingFeaturesAndLabelsWithExtremeRatings):
newExtremeMF = self._create_new_model_with_extreme_raters_from_original_params(
ratingFeaturesAndLabelsWithExtremeRatings
)
newExtremeMF.prepare_features_and_labels()
newExtremeMF._fit_model()
# Double check that we kept rater parameters fixed during re-training of note parameters.
if self._checkParamsSame:
self._check_rater_parameters_same(newExtremeMF)
fitNoteParams, fitRaterParams = newExtremeMF._get_parameters_from_trained_model(
flipFactorsForIdentification=False
)
return fitNoteParams
def _create_extreme_ratings(self):
self.extremeRatingsToAddWithoutNotes = []
self.extremeRatingsToAddWithoutNotes.append(
{
c.internalRaterInterceptKey: None,
c.internalRaterFactor1Key: None,
c.helpfulNumKey: None,
}
)
for extremeRater in self.extremeRaters:
extremeRater[c.raterParticipantIdKey] = str(extremeRater[c.raterParticipantIdKey])
# Since LCB is turned off, don't waste compute on not-helpful pseudoratings.
for helpfulNum in [1.0]: # Only helpful ratings
extremeRater[c.helpfulNumKey] = helpfulNum
self.extremeRatingsToAddWithoutNotes.append(extremeRater.copy())
def _create_dataset_with_extreme_rating_on_each_note(self, ratingToAddWithoutNoteId):
## for each rating (ided by raterParticipantId and raterIndex)
if ratingToAddWithoutNoteId[c.helpfulNumKey] is not None:
ratingsWithNoteIds = []
for noteRow in (
self.ratingFeaturesAndLabels[[c.noteIdKey, mf_c.noteIndexKey]]
.drop_duplicates()
.itertuples()
):
ratingToAdd = ratingToAddWithoutNoteId.copy()
ratingToAdd[c.noteIdKey] = getattr(noteRow, c.noteIdKey)
ratingToAdd[mf_c.noteIndexKey] = getattr(noteRow, mf_c.noteIndexKey)
ratingsWithNoteIds.append(ratingToAdd)
extremeRatingsToAdd = pd.DataFrame(ratingsWithNoteIds).drop(
[c.internalRaterInterceptKey, c.internalRaterFactor1Key], axis=1
)
extremeRatingsToAdd[c.noteIdKey] = extremeRatingsToAdd[c.noteIdKey].astype(np.int64)
if isinstance(self.ratingFeaturesAndLabels[c.raterParticipantIdKey].dtype, pd.Int64Dtype):
# Only convert ID type from string to Int64 if is necessary to match existing IDs (which is
# expected when running in prod, but not always in unit tests or public data.)
extremeRatingsToAdd[c.raterParticipantIdKey] = extremeRatingsToAdd[
c.raterParticipantIdKey
].astype(pd.Int64Dtype())
ratingFeaturesAndLabelsWithExtremeRatings = pd.concat(
[self.ratingFeaturesAndLabels, extremeRatingsToAdd]
)
else:
ratingFeaturesAndLabelsWithExtremeRatings = self.ratingFeaturesAndLabels
return ratingFeaturesAndLabelsWithExtremeRatings
def _fit_note_params_for_each_dataset_with_extreme_ratings(self):
noteParamsList = []
for ratingToAddWithoutNoteId in self.extremeRatingsToAddWithoutNotes:
ratingFeaturesAndLabelsWithExtremeRatings = (
self._create_dataset_with_extreme_rating_on_each_note(ratingToAddWithoutNoteId)
)
if self._log:
logger.info("------------------")
logger.info(f"Re-scoring all notes with extra rating added: {ratingToAddWithoutNoteId}")
with c.time_block("Pseudo: fit all notes with raters constant"):
fitNoteParams = self._fit_all_notes_with_raters_constant(
ratingFeaturesAndLabelsWithExtremeRatings
)
fitNoteParams[Constants.extraRaterInterceptKey] = ratingToAddWithoutNoteId[
c.internalRaterInterceptKey
]
fitNoteParams[Constants.extraRaterFactor1Key] = ratingToAddWithoutNoteId[
c.internalRaterFactor1Key
]
fitNoteParams[Constants.extraRatingHelpfulNumKey] = ratingToAddWithoutNoteId[c.helpfulNumKey]
noteParamsList.append(fitNoteParams)
return noteParamsList
def _aggregate_note_params(self, noteParamsList, joinOrig=False):
rawRescoredNotesWithEachExtraRater = pd.concat(
noteParamsList,
unsafeAllowed={
Constants.extraRaterInterceptKey,
Constants.extraRaterFactor1Key,
Constants.extraRatingHelpfulNumKey,
},
)
rawRescoredNotesWithEachExtraRater.drop(mf_c.noteIndexKey, axis=1, inplace=True)
rawRescoredNotesWithEachExtraRater = rawRescoredNotesWithEachExtraRater.sort_values(
by=[c.noteIdKey, Constants.extraRaterInterceptKey]
)
rawRescoredNotesWithEachExtraRaterAgg = (
rawRescoredNotesWithEachExtraRater[
[c.noteIdKey, c.internalNoteInterceptKey, c.internalNoteFactor1Key]
]
.groupby(c.noteIdKey)
.agg({"min", "median", "max"})
)
refitSameRatings = rawRescoredNotesWithEachExtraRater[
pd.isna(rawRescoredNotesWithEachExtraRater[Constants.extraRaterInterceptKey])
][[c.noteIdKey, c.internalNoteInterceptKey, c.internalNoteFactor1Key]].set_index(c.noteIdKey)
refitSameRatings.columns = pd.MultiIndex.from_product(
[refitSameRatings.columns, [Constants.refitOriginalKey]]
)
notesWithConfidenceBounds = refitSameRatings.join(rawRescoredNotesWithEachExtraRaterAgg)
if joinOrig:
orig = self.noteParams[
[c.noteIdKey, c.internalNoteInterceptKey, c.internalNoteFactor1Key]
].set_index(c.noteIdKey)
orig.columns = pd.MultiIndex.from_product([orig.columns, [Constants.originalKey]])
notesWithConfidenceBounds = notesWithConfidenceBounds.join(orig)
raterFacs = self.ratingFeaturesAndLabels.merge(self.raterParams, on=c.raterParticipantIdKey)
raterFacs[Constants.allKey] = 1
raterFacs[Constants.negFacKey] = raterFacs[c.internalRaterFactor1Key] < 0
raterFacs[Constants.posFacKey] = raterFacs[c.internalRaterFactor1Key] > 0
r = raterFacs.groupby(c.noteIdKey)[
[Constants.allKey, Constants.negFacKey, Constants.posFacKey]
].sum()
r.columns = pd.MultiIndex.from_product([[c.ratingCountKey], r.columns])
notesWithConfidenceBounds = notesWithConfidenceBounds.join(r)
def flatten_column_names(c):
if type(c) == tuple:
return f"{c[0]}_{c[1]}"
else:
return c
notesWithConfidenceBounds.columns = [
flatten_column_names(c) for c in notesWithConfidenceBounds.columns
]
notesWithConfidenceBounds = notesWithConfidenceBounds[
notesWithConfidenceBounds.columns.sort_values()
]
return notesWithConfidenceBounds