in topic-social-proof/server/src/main/scala/com/twitter/tsp/handlers/TopicSocialProofHandler.scala [57:225]
def getTopicSocialProofResponse(
request: TopicSocialProofRequest
): Future[TopicSocialProofResponse] = {
val scopedStats = statsReceiver.scope(request.displayLocation.toString)
scopedStats.counter("fanoutRequests").incr(request.tweetIds.size)
scopedStats.stat("numTweetsPerRequest").add(request.tweetIds.size)
StatsUtil.trackBlockStats(scopedStats) {
recTargetFactory
.buildRecTopicSocialProofTarget(request).flatMap { target =>
val enableCosineSimilarityScoreCalculation =
decider.isAvailable(DeciderConstants.enableTopicSocialProofScore)
val semanticCoreVersionId =
target.params(TopicSocialProofParams.TopicTweetsSemanticCoreVersionId)
val semanticCoreVersionIdsSet =
target.params(TopicSocialProofParams.TopicTweetsSemanticCoreVersionIdsSet)
val allowListWithTopicFollowTypeFut = uttTopicFilterStore
.getAllowListTopicsForUser(
request.userId,
request.topicListingSetting,
TopicListingViewerContext
.fromThrift(request.context).copy(languageCode =
LocaleUtil.getStandardLanguageCode(request.context.languageCode)),
request.bypassModes.map(_.toSet)
).rescue {
case _ =>
scopedStats.counter("uttTopicFilterStoreFailure").incr()
Future.value(Map.empty[SemanticCoreEntityId, Option[TopicFollowType]])
}
val tweetInfoMapFut: Future[Map[TweetId, Option[TspTweetInfo]]] = Future
.collect(
tweetInfoStore.multiGet(request.tweetIds.toSet)
).raiseWithin(TweetInfoStoreTimeout)(timer).rescue {
case _: TimeoutException =>
scopedStats.counter("tweetInfoStoreTimeout").incr()
Future.value(Map.empty[TweetId, Option[TspTweetInfo]])
case _ =>
scopedStats.counter("tweetInfoStoreFailure").incr()
Future.value(Map.empty[TweetId, Option[TspTweetInfo]])
}
val definedTweetInfoMapFut =
keepTweetsWithTweetInfoAndLanguage(tweetInfoMapFut, request.displayLocation.toString)
Future
.join(definedTweetInfoMapFut, allowListWithTopicFollowTypeFut).map {
case (tweetInfoMap, allowListWithTopicFollowType) =>
val tweetIdsToQuery = tweetInfoMap.keys.toSet
val topicProofQueries =
tweetIdsToQuery.map { tweetId =>
TopicSocialProofStore.Query(
TopicSocialProofStore.CacheableQuery(
tweetId = tweetId,
tweetLanguage = LocaleUtil.getSupportedStandardLanguageCodeWithDefault(
tweetInfoMap.getOrElse(tweetId, None).flatMap {
_.language
}),
enableCosineSimilarityScoreCalculation =
enableCosineSimilarityScoreCalculation
),
allowedSemanticCoreVersionIds = semanticCoreVersionIdsSet
)
}
val topicSocialProofsFut: Future[Map[TweetId, Seq[TopicSocialProof]]] = {
Future
.collect(topicSocialProofStore.multiGet(topicProofQueries)).map(_.map {
case (query, results) =>
query.cacheableQuery.tweetId -> results.toSeq.flatten.filter(
_.semanticCoreVersionId == semanticCoreVersionId)
})
}.raiseWithin(TopicSocialProofStoreTimeout)(timer).rescue {
case _: TimeoutException =>
scopedStats.counter("topicSocialProofStoreTimeout").incr()
Future(Map.empty[TweetId, Seq[TopicSocialProof]])
case _ =>
scopedStats.counter("topicSocialProofStoreFailure").incr()
Future(Map.empty[TweetId, Seq[TopicSocialProof]])
}
val random = new Random(seed = request.userId.toInt)
topicSocialProofsFut.map { topicSocialProofs =>
val filteredTopicSocialProofs = filterByAllowedList(
topicSocialProofs,
request.topicListingSetting,
allowListWithTopicFollowType.keySet
)
val filteredTopicSocialProofsEmptyCount: Int =
filteredTopicSocialProofs.count {
case (_, topicSocialProofs: Seq[TopicSocialProof]) =>
topicSocialProofs.isEmpty
}
scopedStats
.counter("filteredTopicSocialProofsCount").incr(filteredTopicSocialProofs.size)
scopedStats
.counter("filteredTopicSocialProofsEmptyCount").incr(
filteredTopicSocialProofsEmptyCount)
if (isCrTopicTweets(request)) {
val socialProofs = filteredTopicSocialProofs.mapValues(_.flatMap { topicProof =>
val topicWithScores = buildTopicWithRandomScore(
topicProof,
allowListWithTopicFollowType,
random
)
topicWithScores
})
TopicSocialProofResponse(socialProofs)
} else {
val socialProofs = filteredTopicSocialProofs.mapValues(_.flatMap { topicProof =>
getTopicProofScore(
topicProof = topicProof,
allowListWithTopicFollowType = allowListWithTopicFollowType,
params = target.params,
random = random,
statsReceiver = statsReceiver
)
}.sortBy(-_.score).take(MaxCandidates))
val personalizedContextSocialProofs =
if (target.params(TopicSocialProofParams.EnablePersonalizedContextTopics)) {
val personalizedContextEligibility =
checkPersonalizedContextsEligibility(
target.params,
allowListWithTopicFollowType)
val filteredTweets =
filterPersonalizedContexts(socialProofs, tweetInfoMap, target.params)
backfillPersonalizedContexts(
allowListWithTopicFollowType,
filteredTweets,
request.tags.getOrElse(Map.empty),
personalizedContextEligibility)
} else {
Map.empty[TweetId, Seq[TopicWithScore]]
}
val mergedSocialProofs = socialProofs.map {
case (tweetId, proofs) =>
(
tweetId,
proofs
++ personalizedContextSocialProofs.getOrElse(tweetId, Seq.empty))
}
// Note that we will NOT filter out tweets with no TSP in either case
TopicSocialProofResponse(mergedSocialProofs)
}
}
}
}.flatten.raiseWithin(Timeout)(timer).rescue {
case _: ClientDiscardedRequestException =>
scopedStats.counter("ClientDiscardedRequestException").incr()
Future.value(DefaultResponse)
case err: Err if err.code == Err.Cancelled =>
scopedStats.counter("CancelledErr").incr()
Future.value(DefaultResponse)
case _ =>
scopedStats.counter("FailedRequests").incr()
Future.value(DefaultResponse)
}
}
}