sourcecode/scoring/post_selection_similarity_old.py (389 lines of code) (raw):
import gc
import logging
from typing import Dict
from . import constants as c
import numpy as np
import pandas as pd
logger = logging.getLogger("birdwatch.post_selection_similarity")
logger.setLevel(logging.INFO)
class PostSelectionSimilarity:
def __init__(self):
pass
def initialize(
self,
notes: pd.DataFrame,
ratings: pd.DataFrame,
pmiRegularization: int = 500,
smoothedNpmiThreshold: float = 0.45,
minimumRatingProportionThreshold: float = 0.4,
minUniquePosts: int = 10,
minSimPseudocounts: int = 10,
windowMillis: int = 1000 * 60 * 20,
):
self.ratings = _preprocess_ratings(notes, ratings)
self.pairCounts = _get_pair_tuples(self.ratings, windowMillis=windowMillis)
self.pairStatsDf = _tuples_to_df(self.pairCounts)
self.uniqueRatingsOnTweets = self.ratings[
[c.tweetIdKey, c.raterParticipantIdKey]
].drop_duplicates()
self.pairStatsDf = _join_rater_totals(self.pairStatsDf, self.uniqueRatingsOnTweets)
self.pmiDf = _compute_pmi(
self.pairStatsDf, len(self.uniqueRatingsOnTweets), pmiRegularization, minSimPseudocounts
)
self.filter_edges_below_threshold(
smoothedNpmiThreshold, minimumRatingProportionThreshold, minUniquePosts
)
def filter_edges_below_threshold(
self, smoothedNpmiThreshold, minimumRatingProportionThreshold, minUniquePosts
):
self.graphDf = self.pmiDf[
(self.pmiDf["smoothedNpmi"] >= smoothedNpmiThreshold)
| (
(self.pmiDf["minSimRatingProp"] >= minimumRatingProportionThreshold)
& (self.pmiDf["minTotal"] >= minUniquePosts)
)
]
def get_high_post_selection_similarity_raters(self):
highPostSelectionSimilarityRaters = pd.concat(
[
self.graphDf[["leftRaterId"]].rename(columns={"leftRaterId": c.raterParticipantIdKey}),
self.graphDf[["rightRaterId"]].rename(columns={"rightRaterId": c.raterParticipantIdKey}),
]
).drop_duplicates()
highPostSelectionSimilarityRaters[c.postSelectionValueKey] = 1
return highPostSelectionSimilarityRaters
def get_post_selection_similarity_values(self):
"""
Returns dataframe with [raterParticipantId, postSelectionSimilarityValue] columns.
postSelectionSimilarityValue is None by default.
"""
cliqueToUserMap, userToCliqueMap = aggregate_into_cliques(self.graphDf)
# Convert dict to pandas dataframe
cliquesDfList = []
for cliqueId in cliqueToUserMap.keys():
for userId in cliqueToUserMap[cliqueId]:
cliquesDfList.append({c.raterParticipantIdKey: userId, c.postSelectionValueKey: cliqueId})
cliquesDf = pd.DataFrame(
cliquesDfList, columns=[c.raterParticipantIdKey, c.postSelectionValueKey]
)
return cliquesDf
def filter_ratings_by_post_selection_similarity(notes, ratings, postSelectionSimilarityValues):
"""
Filters out ratings after the first on each note from raters who have high post selection similarity,
or filters all if the note is authored by a user with the same post selection similarity value.
"""
ratingsWithPostSelectionSimilarity = (
ratings.merge(
postSelectionSimilarityValues,
on=c.raterParticipantIdKey,
how="left",
unsafeAllowed=c.postSelectionValueKey,
)
.merge(notes[[c.noteIdKey, c.noteAuthorParticipantIdKey]], on=c.noteIdKey, how="left")
.merge(
postSelectionSimilarityValues,
left_on=c.noteAuthorParticipantIdKey,
right_on=c.raterParticipantIdKey,
how="left",
suffixes=("", "_note_author"),
unsafeAllowed={c.postSelectionValueKey, c.postSelectionValueKey + "_note_author"},
)
)
ratingsWithNoPostSelectionSimilarityValue = ratingsWithPostSelectionSimilarity[
pd.isna(ratingsWithPostSelectionSimilarity[c.postSelectionValueKey])
]
ratingsWithPostSelectionSimilarityValue = ratingsWithPostSelectionSimilarity[
(~pd.isna(ratingsWithPostSelectionSimilarity[c.postSelectionValueKey]))
& (
ratingsWithPostSelectionSimilarity[c.postSelectionValueKey]
!= ratingsWithPostSelectionSimilarity[c.postSelectionValueKey + "_note_author"]
)
]
ratingsWithPostSelectionSimilarityValue.sort_values(
by=[c.noteIdKey, c.createdAtMillisKey], ascending=True, inplace=True
)
ratingsWithPostSelectionSimilarityValue.drop_duplicates(
subset=[c.noteIdKey, c.postSelectionValueKey], keep="first", inplace=True
)
ratings = pd.concat(
[ratingsWithPostSelectionSimilarityValue, ratingsWithNoPostSelectionSimilarityValue], axis=0
)
ratings.drop(
columns={c.noteAuthorParticipantIdKey, c.raterParticipantIdKey + "_note_author"},
errors="ignore",
inplace=True,
)
return ratings
def filter_all_ratings_by_post_selection_similarity(ratings, highPostSelectionSimilarityRaters):
"""
Deprecated.
Filters out all ratings from raters who have high post selection similarity.
"""
ratings = ratings.merge(
highPostSelectionSimilarityRaters, on=c.raterParticipantIdKey, how="left", indicator=True
)
ratings = ratings[ratings["_merge"] == "left_only"]
ratings = ratings.drop(columns=["_merge"])
return ratings
def _compute_pmi(
pairStatsDf: pd.DataFrame, N: int, pmiPseudocounts: int = 500, minSimPseudocounts: int = 10
) -> pd.DataFrame:
"""
Compute PMI between raters.
"""
numerator = pairStatsDf["pairRatings"] * N
denominator = (pairStatsDf["leftTotal"] + pmiPseudocounts) * (
pairStatsDf["rightTotal"] + pmiPseudocounts
)
pairStatsDf["smoothedPmi"] = np.log(numerator / denominator)
pairStatsDf["smoothedNpmi"] = pairStatsDf["smoothedPmi"] / -np.log(pairStatsDf["pairRatings"] / N)
pairStatsDf["minTotal"] = np.minimum(pairStatsDf["leftTotal"], pairStatsDf["rightTotal"])
pairStatsDf["minSimRatingProp"] = pairStatsDf["pairRatings"] / (
pairStatsDf["minTotal"] + minSimPseudocounts
)
return pairStatsDf
def _preprocess_ratings(notes: pd.DataFrame, ratings: pd.DataFrame) -> pd.DataFrame:
"""
Preprocess ratings dataframe.
"""
ratings = notes[[c.noteIdKey, c.tweetIdKey]].merge(
ratings[[c.raterParticipantIdKey, c.noteIdKey, c.createdAtMillisKey]],
on=c.noteIdKey,
how="inner",
)
ratings = ratings[(ratings[c.tweetIdKey] != -1) & (ratings[c.tweetIdKey] != "-1")]
return ratings
def _join_rater_totals(
pairStatsDf: pd.DataFrame, uniqueRatingsOnTweets: pd.DataFrame, minRatings: int = 10
):
raterTotals = uniqueRatingsOnTweets[c.raterParticipantIdKey].value_counts().reset_index()
raterTotals.columns = [c.raterParticipantIdKey, "count"]
raterTotals = raterTotals[raterTotals["count"] >= minRatings]
pairStatsDf = pairStatsDf.merge(
raterTotals.rename(columns={c.raterParticipantIdKey: "leftRaterId", "count": "leftTotal"})
)
pairStatsDf = pairStatsDf.merge(
raterTotals.rename(columns={c.raterParticipantIdKey: "rightRaterId", "count": "rightTotal"})
)
return pairStatsDf
def aggregate_into_cliques(graphDf):
with c.time_block("Aggregate into cliques by post selection similarity"):
userToCliqueMap = dict()
cliqueToUserMap = dict()
nextNewCliqueId = 1 # start cliqueIdxs from 1
for i, row in graphDf.iterrows():
sid = row["leftRaterId"]
tid = row["rightRaterId"]
if sid in userToCliqueMap:
if tid in userToCliqueMap:
# both in map. merge if not same clique
if userToCliqueMap[sid] != userToCliqueMap[tid]:
# merge. assign all member's of target clique to source clique.
# slow way: iterate over all values here.
# fast way: maintain a reverse map of cliqueToUserMap.
sourceDestClique = userToCliqueMap[sid]
oldTargetCliqueToDel = userToCliqueMap[tid]
for userId in cliqueToUserMap[oldTargetCliqueToDel]:
cliqueToUserMap[sourceDestClique].append(userId)
userToCliqueMap[userId] = sourceDestClique
del cliqueToUserMap[oldTargetCliqueToDel]
gc.collect()
else:
# source in map; target not. add target to source's clique
sourceClique = userToCliqueMap[sid]
userToCliqueMap[tid] = sourceClique
cliqueToUserMap[sourceClique].append(tid)
elif tid in userToCliqueMap:
# target in map; source not. add source to target's clique
targetClique = userToCliqueMap[tid]
userToCliqueMap[sid] = targetClique
cliqueToUserMap[targetClique].append(sid)
else:
# new clique
userToCliqueMap[sid] = nextNewCliqueId
userToCliqueMap[tid] = nextNewCliqueId
cliqueToUserMap[nextNewCliqueId] = [sid, tid]
nextNewCliqueId += 1
return cliqueToUserMap, userToCliqueMap
def _make_rater_stats_df(pairCounts):
with c.time_block("Making rater stats dataframe from pair counts dict"):
leftRater, rightRater, pairRatings = [], [], []
for i, ((left, right), count) in enumerate(pairCounts.items()):
leftRater.append(left)
rightRater.append(right)
pairRatings.append(count)
return pd.DataFrame(
{
"leftRaterId": np.array(leftRater),
"rightRaterId": np.array(rightRater),
"pairRatings": np.array(pairRatings),
}
)
def _get_pair_counts_df_dict(ratings, windowMillis):
import numpy as np
import pandas as pd
from collections import defaultdict
# Assign column keys to local variables for faster access
noteIdKey = c.noteIdKey
createdAtMillisKey = c.createdAtMillisKey
raterParticipantIdKey = c.raterParticipantIdKey
# Sort ratings by noteIdKey and createdAtMillisKey
ratings_sorted = ratings.sort_values([noteIdKey, createdAtMillisKey])
# Initialize a defaultdict to store counts of pairs
pair_counts = defaultdict(int)
# Group by noteIdKey to process each note individually
grouped = ratings_sorted.groupby(noteIdKey, sort=False)
for noteId, group in grouped:
# Extract relevant columns as numpy arrays for efficient computation
times = group[createdAtMillisKey].values
raters = group[raterParticipantIdKey].values
n = len(group)
window_start = 0
for i in range(n):
# Move the window start forward if the time difference exceeds windowMillis
while times[i] - times[window_start] > windowMillis:
window_start += 1
# For all indices within the sliding window (excluding the current index)
for j in range(window_start, i):
if raters[i] != raters[j]:
left_rater, right_rater = tuple(sorted((raters[i], raters[j])))
# Update the count for this pair
pair_counts[(left_rater, right_rater)] += 1
# Convert the pair_counts dictionary to a DataFrame
if pair_counts:
pairs = np.array(list(pair_counts.keys()))
counts = np.array(list(pair_counts.values()))
df = pd.DataFrame({
'leftRaterId': pairs[:, 0],
'rightRaterId': pairs[:, 1],
'pairRatings': counts
})
else:
# Return an empty DataFrame with appropriate columns
df = pd.DataFrame(columns=['leftRaterId', 'rightRaterId', 'pairRatings'])
return df
def _get_pair_ratings_df_optimized(ratings, windowMillis):
# Assign column keys to local variables for faster access
noteIdKey = c.noteIdKey
createdAtMillisKey = c.createdAtMillisKey
raterParticipantIdKey = c.raterParticipantIdKey
tweetIdKey = c.tweetIdKey
# Sort ratings by noteIdKey and createdAtMillisKey
ratings_sorted = ratings.sort_values([noteIdKey, createdAtMillisKey])
# Initialize lists to collect data
left_raters = []
right_raters = []
tweet_ids = []
# Group by noteIdKey to process each note individually
grouped = ratings_sorted.groupby(noteIdKey, sort=False)
for noteId, group in grouped:
# Extract relevant columns as numpy arrays for efficient computation
times = group[createdAtMillisKey].values
raters = group[raterParticipantIdKey].values
tweetId = group[tweetIdKey].iloc[0] # Assuming tweetIdKey is constant within a note
n = len(group)
window_start = 0
for i in range(n):
# Move the window start forward if the time difference exceeds windowMillis
while times[i] - times[window_start] > windowMillis:
window_start += 1
# For all indices within the sliding window (excluding the current index)
for j in range(window_start, i):
if raters[i] != raters[j]:
left_rater, right_rater = tuple(sorted((raters[i], raters[j])))
left_raters.append(left_rater)
right_raters.append(right_rater)
tweet_ids.append(tweetId)
# Convert lists to numpy arrays for efficient DataFrame creation
left_raters = np.array(left_raters)
right_raters = np.array(right_raters)
tweet_ids = np.array(tweet_ids)
# Create the DataFrame from the collected data
df = pd.DataFrame({
'leftRaterId': left_raters,
'rightRaterId': right_raters,
'tweetId': tweet_ids,
})
# Drop duplicates
df = df.drop_duplicates()
# Group by leftRaterId and rightRaterId and count the number of occurrences
df = (
df.groupby(['leftRaterId', 'rightRaterId'], as_index=False)
.agg(pairRatings=('tweetId', 'count'))
)
return df
# get number of ratings per pair in same time window
def _get_pair_tuples(ratings, windowMillis):
tuples = []
ratings = ratings.sort_values([c.noteIdKey, c.createdAtMillisKey])
values = ratings[
[c.noteIdKey, c.createdAtMillisKey, c.raterParticipantIdKey, c.tweetIdKey]
].values
print(len(values))
for i in range(len(values)):
priorNote, priorTs, priorRater, priorTweet = values[i]
if i == 0 or i == 1000 or i == 100000 or i % 5000000 == 0:
print(f"i={i} len(tuples)={len(tuples)}")
j = i + 1
while j < len(values):
nextNote, nextTs, nextRater, nextTweet = values[j]
assert priorNote <= nextNote, (priorNote, nextNote)
if nextNote != priorNote:
break # break if we're onto a new note
assert priorTweet == nextTweet, (priorTweet, nextTweet) # tweet should be same
assert priorRater != nextRater, (priorRater, nextRater) # rater should be different
assert priorTs <= nextTs, (priorTs, nextTs)
if nextTs > (priorTs + windowMillis):
break # break if we're beyond the overlap window
leftRater, rigthRater = tuple(sorted((priorRater, nextRater)))
tuples.append((leftRater, rigthRater, priorTweet))
j += 1
return tuples
def _get_pair_tuples_optimized(ratings, windowMillis):
# Sort ratings by noteIdKey and createdAtMillisKey
ratings_sorted = ratings.sort_values([c.noteIdKey, c.createdAtMillisKey])
# Initialize an empty list to store the result
tuples = []
# Group by noteIdKey to process each note individually
grouped = ratings_sorted.groupby(c.noteIdKey, sort=False)
for noteId, group in grouped:
# Extract relevant columns as numpy arrays for efficient computation
times = group[c.createdAtMillisKey].values
raters = group[c.raterParticipantIdKey].values
priorTweet = group[c.tweetIdKey].iloc[0]
n = len(group)
window_start = 0 # Start index of the sliding window
for i in range(n):
# Move the window start forward if the time difference exceeds windowMillis
while times[i] - times[window_start] > windowMillis:
window_start += 1
# For all indices within the sliding window (excluding the current index)
for j in range(window_start, i):
# Check if raters are different
if raters[i] != raters[j]:
# Sort raters to maintain consistency
leftRater, rightRater = tuple(sorted((raters[i], raters[j])))
tuples.append((leftRater, rightRater, priorTweet))
return tuples
import multiprocessing as mp
def _get_pair_tuples_parallel(ratings, windowMillis):
# Sort and group ratings
ratings_sorted = ratings.sort_values([c.noteIdKey, c.createdAtMillisKey])
grouped = ratings_sorted.groupby(c.noteIdKey, sort=False)
# Prepare arguments for parallel processing
args = [(group, windowMillis) for _, group in grouped]
# Use multiprocessing Pool
with mp.Pool(mp.cpu_count()) as pool:
results = pool.starmap(_get_pair_tuples_process_group, args)
# Flatten the list of results
tuples = [tup for sublist in results for tup in sublist]
return tuples
def _get_pair_tuples_process_group(group, windowMillis):
# Same logic as before, applied to a single group
times = group[c.createdAtMillisKey].values
raters = group[c.raterParticipantIdKey].values
priorTweet = group[c.tweetIdKey].iloc[0]
n = len(group)
window_start = 0
tuples = []
for i in range(n):
while times[i] - times[window_start] > windowMillis:
window_start += 1
for j in range(window_start, i):
if raters[i] != raters[j]:
leftRater, rightRater = tuple(sorted((raters[i], raters[j])))
tuples.append((leftRater, rightRater, priorTweet))
return tuples
def _tuples_to_df(tuples, name="pairRatings"):
leftRater, rightRater, tweetId = zip(*tuples)
df = pd.DataFrame(
{
"leftRaterId": np.array(leftRater),
"rightRaterId": np.array(rightRater),
"tweetId": np.array(tweetId),
}
)
print(len(df))
df = df.drop_duplicates()
print(len(df))
df = (
df.groupby(["leftRaterId", "rightRaterId"])
.count()
.reset_index(drop=False)
.rename(columns={"tweetId": name})
)
print(len(df))
return df
def _get_pair_counts(ratings: pd.DataFrame, windowMillis: int = 1000 * 60 * 30) -> Dict:
"""
Compute counts of unique posts that were co-rated within windowMillis millis of each other
by different users.
Returns dict: (raterId1, raterId2) => count.
"""
with c.time_block("Computing rating pair counts"):
counts = dict()
seen = set()
ratings = ratings.sort_values([c.noteIdKey, c.createdAtMillisKey])
values = ratings[
[c.noteIdKey, c.createdAtMillisKey, c.raterParticipantIdKey, c.tweetIdKey]
].values
logger.info(len(values))
for i in range(len(values)):
priorNote, priorTs, priorRater, priorTweet = values[i]
if i == 0 or i == 1000 or i == 100000 or i % 5000000 == 0:
logger.info(f"get_pair_counts i={i}")
j = i + 1
while j < len(values):
nextNote, nextTs, nextRater, nextTweet = values[j]
assert priorNote <= nextNote, (priorNote, nextNote)
if nextNote != priorNote:
break # break if we're onto a new note
assert priorTweet == nextTweet, (priorTweet, nextTweet) # tweet should be same
assert priorRater != nextRater, (priorRater, nextRater) # rater should be different
assert priorTs <= nextTs, (priorTs, nextTs)
if nextTs > (priorTs + windowMillis):
break # break if we're beyond windowMillis
raterPairKey = tuple(sorted((priorRater, nextRater)))
raterTweetPairKey = (raterPairKey, priorTweet)
if raterTweetPairKey in seen:
break # break if we already counted a match on this tweet
seen.add(raterTweetPairKey)
if raterPairKey not in counts:
counts[raterPairKey] = 0
counts[raterPairKey] += 1
j += 1
return counts