sourcecode/scoring/post_selection_similarity.py (215 lines of code) (raw):

import gc import logging import sys from typing import Dict from . import constants as c import numpy as np import pandas as pd logger = logging.getLogger("birdwatch.post_selection_similarity") logger.setLevel(logging.INFO) class PostSelectionSimilarity: def __init__( self, notes: pd.DataFrame, ratings: pd.DataFrame, pmiRegularization: int = 500, smoothedNpmiThreshold: float = 0.55, minimumRatingProportionThreshold: float = 0.4, minUniquePosts: int = 10, minSimPseudocounts: int = 10, windowMillis: int = 1000 * 60 * 20, ): self.ratings = _preprocess_ratings(notes, ratings) with c.time_block("Compute pair counts dict"): self.pairCountsDict = _get_pair_counts_dict(self.ratings, windowMillis=windowMillis) self.uniqueRatingsOnTweets = self.ratings[ [c.tweetIdKey, c.raterParticipantIdKey] ].drop_duplicates() raterTotals = self.uniqueRatingsOnTweets[c.raterParticipantIdKey].value_counts() raterTotalsDict = { index: value for index, value in raterTotals.items() if value >= minUniquePosts } self.pairCountsDict = _join_rater_totals_compute_pmi_and_filter_edges_below_threshold( pairCountsDict=self.pairCountsDict, raterTotalsDict=raterTotalsDict, N=len(self.uniqueRatingsOnTweets), pmiPseudocounts=pmiRegularization, minSimPseudocounts=minSimPseudocounts, smoothedNpmiThreshold=smoothedNpmiThreshold, minimumRatingProportionThreshold=minimumRatingProportionThreshold, ) def get_high_post_selection_similarity_raters(self): uniqueRaters = set() for r1, r2 in self.pairCountsDict.keys(): uniqueRaters.add(r1) uniqueRaters.add(r2) highPostSelectionSimilarityRaters = pd.DataFrame( list(uniqueRaters), columns=[c.raterParticipantIdKey] ) highPostSelectionSimilarityRaters[c.postSelectionValueKey] = 1 return highPostSelectionSimilarityRaters def get_post_selection_similarity_values(self): """ Returns dataframe with [raterParticipantId, postSelectionSimilarityValue] columns. postSelectionSimilarityValue is None by default. """ cliqueToUserMap, userToCliqueMap = aggregate_into_cliques(self.pairCountsDict) # Convert dict to pandas dataframe cliquesDfList = [] for cliqueId in cliqueToUserMap.keys(): for userId in cliqueToUserMap[cliqueId]: cliquesDfList.append({c.raterParticipantIdKey: userId, c.postSelectionValueKey: cliqueId}) cliquesDf = pd.DataFrame( cliquesDfList, columns=[c.raterParticipantIdKey, c.postSelectionValueKey] ) return cliquesDf def filter_ratings_by_post_selection_similarity(notes, ratings, postSelectionSimilarityValues): """ Filters out ratings after the first on each note from raters who have high post selection similarity, or filters all if the note is authored by a user with the same post selection similarity value. """ ratingsWithPostSelectionSimilarity = ( ratings.merge( postSelectionSimilarityValues, on=c.raterParticipantIdKey, how="left", unsafeAllowed=c.postSelectionValueKey, ) .merge(notes[[c.noteIdKey, c.noteAuthorParticipantIdKey]], on=c.noteIdKey, how="left") .merge( postSelectionSimilarityValues, left_on=c.noteAuthorParticipantIdKey, right_on=c.raterParticipantIdKey, how="left", suffixes=("", "_note_author"), unsafeAllowed={c.postSelectionValueKey, c.postSelectionValueKey + "_note_author"}, ) ) ratingsWithNoPostSelectionSimilarityValue = ratingsWithPostSelectionSimilarity[ pd.isna(ratingsWithPostSelectionSimilarity[c.postSelectionValueKey]) ] ratingsWithPostSelectionSimilarityValue = ratingsWithPostSelectionSimilarity[ (~pd.isna(ratingsWithPostSelectionSimilarity[c.postSelectionValueKey])) & ( ratingsWithPostSelectionSimilarity[c.postSelectionValueKey] != ratingsWithPostSelectionSimilarity[c.postSelectionValueKey + "_note_author"] ) ] ratingsWithPostSelectionSimilarityValue.sort_values( by=[c.noteIdKey, c.createdAtMillisKey], ascending=True, inplace=True ) ratingsWithPostSelectionSimilarityValue.drop_duplicates( subset=[c.noteIdKey, c.postSelectionValueKey], keep="first", inplace=True ) if len(notes) < c.minNumNotesForProdData: return ratings ratings = pd.concat( [ratingsWithPostSelectionSimilarityValue, ratingsWithNoPostSelectionSimilarityValue], axis=0 ) ratings.drop( columns={c.noteAuthorParticipantIdKey, c.raterParticipantIdKey + "_note_author"}, errors="ignore", inplace=True, ) return ratings def filter_all_ratings_by_post_selection_similarity(ratings, highPostSelectionSimilarityRaters): """ Deprecated. Filters out all ratings from raters who have high post selection similarity. """ ratings = ratings.merge( highPostSelectionSimilarityRaters, on=c.raterParticipantIdKey, how="left", indicator=True ) ratings = ratings[ratings["_merge"] == "left_only"] ratings = ratings.drop(columns=["_merge"]) return ratings def _preprocess_ratings(notes: pd.DataFrame, ratings: pd.DataFrame) -> pd.DataFrame: """ Preprocess ratings dataframe. """ ratings = notes[[c.noteIdKey, c.tweetIdKey]].merge( ratings[[c.raterParticipantIdKey, c.noteIdKey, c.createdAtMillisKey]], on=c.noteIdKey, how="inner", ) ratings = ratings[(ratings[c.tweetIdKey] != -1) & (ratings[c.tweetIdKey] != "-1")] return ratings def _join_rater_totals_compute_pmi_and_filter_edges_below_threshold( pairCountsDict: Dict, raterTotalsDict: Dict, N: int, pmiPseudocounts: int, minSimPseudocounts: int, smoothedNpmiThreshold: float, minimumRatingProportionThreshold: float, ): keys_to_delete = [] with c.time_block("Compute PMI and minSim"): for leftRaterId, rightRaterId in pairCountsDict: if leftRaterId not in raterTotalsDict or rightRaterId not in raterTotalsDict: keys_to_delete.append((leftRaterId, rightRaterId)) continue leftTotal = raterTotalsDict[leftRaterId] rightTotal = raterTotalsDict[rightRaterId] coRatings = pairCountsDict[(leftRaterId, rightRaterId)] if type(coRatings) != int: # already processed (should only occur when re-running...) continue # PMI pmiNumerator = coRatings * N pmiDenominator = (leftTotal + pmiPseudocounts) * (rightTotal + pmiPseudocounts) smoothedPmi = np.log(pmiNumerator / pmiDenominator) smoothedNpmi = smoothedPmi / -np.log(coRatings / N) # minSim minTotal = min(leftTotal, rightTotal) minSimRatingProp = coRatings / (minTotal + minSimPseudocounts) if (smoothedNpmi >= smoothedNpmiThreshold) or ( minSimRatingProp >= minimumRatingProportionThreshold ): pairCountsDict[(leftRaterId, rightRaterId)] = (smoothedNpmi, minSimRatingProp) else: keys_to_delete.append((leftRaterId, rightRaterId)) print(f"Pairs dict used {sys.getsizeof(pairCountsDict) * 1e-9}GB RAM at max") with c.time_block("Delete unneeded pairs from pairCountsDict"): for key in keys_to_delete: del pairCountsDict[key] print( f"Pairs dict used {sys.getsizeof(pairCountsDict) * 1e-9}GB RAM after deleted unneeded pairs" ) return pairCountsDict def aggregate_into_cliques(pairCountsDict): with c.time_block("Aggregate into cliques by post selection similarity"): userToCliqueMap = dict() cliqueToUserMap = dict() nextNewCliqueId = 1 # start cliqueIdxs from 1 for sid, tid in pairCountsDict.keys(): if sid in userToCliqueMap: if tid in userToCliqueMap: # both in map. merge if not same clique if userToCliqueMap[sid] != userToCliqueMap[tid]: # merge. assign all member's of target clique to source clique. # slow way: iterate over all values here. # fast way: maintain a reverse map of cliqueToUserMap. sourceDestClique = userToCliqueMap[sid] oldTargetCliqueToDel = userToCliqueMap[tid] for userId in cliqueToUserMap[oldTargetCliqueToDel]: cliqueToUserMap[sourceDestClique].append(userId) userToCliqueMap[userId] = sourceDestClique del cliqueToUserMap[oldTargetCliqueToDel] gc.collect() else: # source in map; target not. add target to source's clique sourceClique = userToCliqueMap[sid] userToCliqueMap[tid] = sourceClique cliqueToUserMap[sourceClique].append(tid) elif tid in userToCliqueMap: # target in map; source not. add source to target's clique targetClique = userToCliqueMap[tid] userToCliqueMap[sid] = targetClique cliqueToUserMap[targetClique].append(sid) else: # new clique userToCliqueMap[sid] = nextNewCliqueId userToCliqueMap[tid] = nextNewCliqueId cliqueToUserMap[nextNewCliqueId] = [sid, tid] nextNewCliqueId += 1 return cliqueToUserMap, userToCliqueMap def _get_pair_counts_dict(ratings, windowMillis): pair_counts = dict() # Group by tweetIdKey to process each tweet individually grouped_by_tweet = ratings.groupby(c.tweetIdKey, sort=False) for _, tweet_group in grouped_by_tweet: # Keep track of pairs we've already counted for this tweetId pairs_counted_in_tweet = set() # Group by noteIdKey within the tweet grouped_by_note = tweet_group.groupby(c.noteIdKey, sort=False) for _, note_group in grouped_by_note: note_group.sort_values(c.createdAtMillisKey, inplace=True) # Extract relevant columns as numpy arrays for efficient computation times = note_group[c.createdAtMillisKey].values raters = note_group[c.raterParticipantIdKey].values n = len(note_group) window_start = 0 for i in range(n): # Move the window start forward if the time difference exceeds windowMillis while times[i] - times[window_start] > windowMillis: window_start += 1 # For all indices within the sliding window (excluding the current index) for j in range(window_start, i): if raters[i] != raters[j]: left_rater, right_rater = tuple(sorted((raters[i], raters[j]))) pair = (left_rater, right_rater) # Only count this pair once per tweetId if pair not in pairs_counted_in_tweet: pairs_counted_in_tweet.add(pair) # Update the count for this pair if pair not in pair_counts: pair_counts[pair] = 0 pair_counts[pair] += 1 return pair_counts