def getTopicSocialProofResponse()

in topic-social-proof/server/src/main/scala/com/twitter/tsp/handlers/TopicSocialProofHandler.scala [57:225]


  def getTopicSocialProofResponse(
    request: TopicSocialProofRequest
  ): Future[TopicSocialProofResponse] = {
    val scopedStats = statsReceiver.scope(request.displayLocation.toString)
    scopedStats.counter("fanoutRequests").incr(request.tweetIds.size)
    scopedStats.stat("numTweetsPerRequest").add(request.tweetIds.size)
    StatsUtil.trackBlockStats(scopedStats) {
      recTargetFactory
        .buildRecTopicSocialProofTarget(request).flatMap { target =>
          val enableCosineSimilarityScoreCalculation =
            decider.isAvailable(DeciderConstants.enableTopicSocialProofScore)

          val semanticCoreVersionId =
            target.params(TopicSocialProofParams.TopicTweetsSemanticCoreVersionId)

          val semanticCoreVersionIdsSet =
            target.params(TopicSocialProofParams.TopicTweetsSemanticCoreVersionIdsSet)

          val allowListWithTopicFollowTypeFut = uttTopicFilterStore
            .getAllowListTopicsForUser(
              request.userId,
              request.topicListingSetting,
              TopicListingViewerContext
                .fromThrift(request.context).copy(languageCode =
                  LocaleUtil.getStandardLanguageCode(request.context.languageCode)),
              request.bypassModes.map(_.toSet)
            ).rescue {
              case _ =>
                scopedStats.counter("uttTopicFilterStoreFailure").incr()
                Future.value(Map.empty[SemanticCoreEntityId, Option[TopicFollowType]])
            }

          val tweetInfoMapFut: Future[Map[TweetId, Option[TspTweetInfo]]] = Future
            .collect(
              tweetInfoStore.multiGet(request.tweetIds.toSet)
            ).raiseWithin(TweetInfoStoreTimeout)(timer).rescue {
              case _: TimeoutException =>
                scopedStats.counter("tweetInfoStoreTimeout").incr()
                Future.value(Map.empty[TweetId, Option[TspTweetInfo]])
              case _ =>
                scopedStats.counter("tweetInfoStoreFailure").incr()
                Future.value(Map.empty[TweetId, Option[TspTweetInfo]])
            }

          val definedTweetInfoMapFut =
            keepTweetsWithTweetInfoAndLanguage(tweetInfoMapFut, request.displayLocation.toString)

          Future
            .join(definedTweetInfoMapFut, allowListWithTopicFollowTypeFut).map {
              case (tweetInfoMap, allowListWithTopicFollowType) =>
                val tweetIdsToQuery = tweetInfoMap.keys.toSet
                val topicProofQueries =
                  tweetIdsToQuery.map { tweetId =>
                    TopicSocialProofStore.Query(
                      TopicSocialProofStore.CacheableQuery(
                        tweetId = tweetId,
                        tweetLanguage = LocaleUtil.getSupportedStandardLanguageCodeWithDefault(
                          tweetInfoMap.getOrElse(tweetId, None).flatMap {
                            _.language
                          }),
                        enableCosineSimilarityScoreCalculation =
                          enableCosineSimilarityScoreCalculation
                      ),
                      allowedSemanticCoreVersionIds = semanticCoreVersionIdsSet
                    )
                  }

                val topicSocialProofsFut: Future[Map[TweetId, Seq[TopicSocialProof]]] = {
                  Future
                    .collect(topicSocialProofStore.multiGet(topicProofQueries)).map(_.map {
                      case (query, results) =>
                        query.cacheableQuery.tweetId -> results.toSeq.flatten.filter(
                          _.semanticCoreVersionId == semanticCoreVersionId)
                    })
                }.raiseWithin(TopicSocialProofStoreTimeout)(timer).rescue {
                  case _: TimeoutException =>
                    scopedStats.counter("topicSocialProofStoreTimeout").incr()
                    Future(Map.empty[TweetId, Seq[TopicSocialProof]])
                  case _ =>
                    scopedStats.counter("topicSocialProofStoreFailure").incr()
                    Future(Map.empty[TweetId, Seq[TopicSocialProof]])
                }

                val random = new Random(seed = request.userId.toInt)

                topicSocialProofsFut.map { topicSocialProofs =>
                  val filteredTopicSocialProofs = filterByAllowedList(
                    topicSocialProofs,
                    request.topicListingSetting,
                    allowListWithTopicFollowType.keySet
                  )

                  val filteredTopicSocialProofsEmptyCount: Int =
                    filteredTopicSocialProofs.count {
                      case (_, topicSocialProofs: Seq[TopicSocialProof]) =>
                        topicSocialProofs.isEmpty
                    }

                  scopedStats
                    .counter("filteredTopicSocialProofsCount").incr(filteredTopicSocialProofs.size)
                  scopedStats
                    .counter("filteredTopicSocialProofsEmptyCount").incr(
                      filteredTopicSocialProofsEmptyCount)

                  if (isCrTopicTweets(request)) {
                    val socialProofs = filteredTopicSocialProofs.mapValues(_.flatMap { topicProof =>
                      val topicWithScores = buildTopicWithRandomScore(
                        topicProof,
                        allowListWithTopicFollowType,
                        random
                      )
                      topicWithScores
                    })
                    TopicSocialProofResponse(socialProofs)
                  } else {
                    val socialProofs = filteredTopicSocialProofs.mapValues(_.flatMap { topicProof =>
                      getTopicProofScore(
                        topicProof = topicProof,
                        allowListWithTopicFollowType = allowListWithTopicFollowType,
                        params = target.params,
                        random = random,
                        statsReceiver = statsReceiver
                      )

                    }.sortBy(-_.score).take(MaxCandidates))

                    val personalizedContextSocialProofs =
                      if (target.params(TopicSocialProofParams.EnablePersonalizedContextTopics)) {
                        val personalizedContextEligibility =
                          checkPersonalizedContextsEligibility(
                            target.params,
                            allowListWithTopicFollowType)
                        val filteredTweets =
                          filterPersonalizedContexts(socialProofs, tweetInfoMap, target.params)
                        backfillPersonalizedContexts(
                          allowListWithTopicFollowType,
                          filteredTweets,
                          request.tags.getOrElse(Map.empty),
                          personalizedContextEligibility)
                      } else {
                        Map.empty[TweetId, Seq[TopicWithScore]]
                      }

                    val mergedSocialProofs = socialProofs.map {
                      case (tweetId, proofs) =>
                        (
                          tweetId,
                          proofs
                            ++ personalizedContextSocialProofs.getOrElse(tweetId, Seq.empty))
                    }

                    // Note that we will NOT filter out tweets with no TSP in either case
                    TopicSocialProofResponse(mergedSocialProofs)
                  }
                }
            }
        }.flatten.raiseWithin(Timeout)(timer).rescue {
          case _: ClientDiscardedRequestException =>
            scopedStats.counter("ClientDiscardedRequestException").incr()
            Future.value(DefaultResponse)
          case err: Err if err.code == Err.Cancelled =>
            scopedStats.counter("CancelledErr").incr()
            Future.value(DefaultResponse)
          case _ =>
            scopedStats.counter("FailedRequests").incr()
            Future.value(DefaultResponse)
        }
    }
  }