def generate[P <: Platform[P]]()

in src/scala/com/twitter/simclusters_v2/summingbird/storm/TweetJob.scala [32:215]


  def generate[P <: Platform[P]](
    profile: SimClustersTweetProfile,
    timelineEventSource: Producer[P, Event],
    userInterestedInService: P#Service[Long, ClustersUserIsInterestedIn],
    tweetClusterScoreStore: P#Store[(SimClusterEntity, FullClusterIdBucket), ClustersWithScores],
    tweetTopKClustersStore: P#Store[EntityWithVersion, TopKClustersWithScores],
    clusterTopKTweetsStore: P#Store[FullClusterId, TopKTweetsWithScores],
    clusterTopKTweetsLightStore: Option[P#Store[FullClusterId, TopKTweetsWithScores]]
  )(
    implicit jobId: JobId
  ): TailProducer[P, Any] = {

    val userInterestNonEmptyCount = Counter(Group(jobId.get), Name("num_user_interests_non_empty"))
    val userInterestEmptyCount = Counter(Group(jobId.get), Name("num_user_interests_empty"))

    val numClustersCount = Counter(Group(jobId.get), Name("num_clusters"))

    val entityClusterPairCount = Counter(Group(jobId.get), Name("num_entity_cluster_pairs_emitted"))

    // Fav QPS is around 6K
    val qualifiedFavEvents = timelineEventSource
      .collect {
        case Event.Favorite(favEvent)
            if favEvent.userId != favEvent.tweetUserId && !isTweetTooOld(favEvent) =>
          (favEvent.userId, favEvent)
      }
      .observe("num_qualified_favorite_events")

    val entityWithSimClustersProducer = qualifiedFavEvents
      .leftJoin(userInterestedInService)
      .map {
        case (_, (favEvent, userInterestOpt)) =>
          (favEvent.tweetId, (favEvent, userInterestOpt))
      }
      .flatMap {
        case (_, (favEvent, Some(userInterests))) =>
          userInterestNonEmptyCount.incr()

          val timestamp = favEvent.eventTimeMs

          val clustersWithScores = SimClustersInterestedInUtil.topClustersWithScores(userInterests)

          // clusters.size is around 25 in average
          numClustersCount.incrBy(clustersWithScores.size)

          val simClusterScoresByHashBucket = clustersWithScores.groupBy {
            case (clusterId, _) => SimClustersHashUtil.clusterIdToBucket(clusterId)
          }

          for {
            (hashBucket, scores) <- simClusterScoresByHashBucket
          } yield {
            entityClusterPairCount.incr()

            val clusterBucket = FullClusterIdBucket(userInterests.knownForModelVersion, hashBucket)

            val tweetId: SimClusterEntity = SimClusterEntity.TweetId(favEvent.tweetId)

            (tweetId, clusterBucket) -> SimClustersInterestedInUtil
              .buildClusterWithScores(
                scores,
                timestamp,
                profile.favScoreThresholdForUserInterest
              )
          }
        case _ =>
          userInterestEmptyCount.incr()
          None
      }
      .observe("entity_cluster_delta_scores")
      .name(NodeName.TweetClusterScoreFlatMapNodeName)
      .sumByKey(tweetClusterScoreStore)(clustersWithScoreMonoid)
      .name(NodeName.TweetClusterScoreSummerNodeName)
      .map {
        case ((simClusterEntity, clusterBucket), (oldValueOpt, deltaValue)) =>
          val updatedClusterIds = deltaValue.clustersToScore.map(_.keySet).getOrElse(Set.empty[Int])

          (simClusterEntity, clusterBucket) -> clustersWithScoreMonoid.plus(
            oldValueOpt
              .map { oldValue =>
                oldValue.copy(
                  clustersToScore =
                    oldValue.clustersToScore.map(_.filterKeys(updatedClusterIds.contains))
                )
              }.getOrElse(clustersWithScoreMonoid.zero),
            deltaValue
          )
      }
      .observe("entity_cluster_updated_scores")
      .name(NodeName.TweetClusterUpdatedScoresFlatMapNodeName)

    val tweetTopK = entityWithSimClustersProducer
      .flatMap {
        case ((simClusterEntity, FullClusterIdBucket(modelVersion, _)), clusterWithScores)
            if simClusterEntity.isInstanceOf[SimClusterEntity.TweetId] =>
          clusterWithScores.clustersToScore
            .map { clustersToScores =>
              val topClustersWithFavScores = clustersToScores.mapValues { scores: Scores =>
                Scores(
                  favClusterNormalized8HrHalfLifeScore =
                    scores.favClusterNormalized8HrHalfLifeScore.filter(
                      _.value >= Configs.scoreThresholdForTweetTopKClustersCache
                    )
                )
              }

              (
                EntityWithVersion(simClusterEntity, modelVersion),
                TopKClustersWithScores(Some(topClustersWithFavScores), None)
              )
            }
        case _ =>
          None

      }
      .observe("tweet_topk_updates")
      .sumByKey(tweetTopKClustersStore)(topKClustersWithScoresMonoid)
      .name(NodeName.TweetTopKNodeName)

    val clusterTopKTweets = entityWithSimClustersProducer
      .flatMap {
        case ((simClusterEntity, FullClusterIdBucket(modelVersion, _)), clusterWithScores) =>
          simClusterEntity match {
            case SimClusterEntity.TweetId(tweetId) =>
              clusterWithScores.clustersToScore
                .map { clustersToScores =>
                  clustersToScores.toSeq.map {
                    case (clusterId, scores) =>
                      val topTweetsByFavScore = Map(
                        tweetId -> Scores(favClusterNormalized8HrHalfLifeScore =
                          scores.favClusterNormalized8HrHalfLifeScore.filter(_.value >=
                            Configs.scoreThresholdForClusterTopKTweetsCache)))

                      (
                        FullClusterId(modelVersion, clusterId),
                        TopKTweetsWithScores(Some(topTweetsByFavScore), None)
                      )
                  }
                }.getOrElse(Nil)
            case _ =>
              Nil
          }
      }
      .observe("cluster_topk_tweets_updates")
      .sumByKey(clusterTopKTweetsStore)(topKTweetsWithScoresMonoid)
      .name(NodeName.ClusterTopKTweetsNodeName)

    val clusterTopKTweetsLight = clusterTopKTweetsLightStore.map { lightStore =>
      entityWithSimClustersProducer
        .flatMap {
          case ((simClusterEntity, FullClusterIdBucket(modelVersion, _)), clusterWithScores) =>
            simClusterEntity match {
              case SimClusterEntity.TweetId(tweetId) if isTweetTooOldForLight(tweetId) =>
                clusterWithScores.clustersToScore
                  .map { clustersToScores =>
                    clustersToScores.toSeq.map {
                      case (clusterId, scores) =>
                        val topTweetsByFavScore = Map(
                          tweetId -> Scores(favClusterNormalized8HrHalfLifeScore =
                            scores.favClusterNormalized8HrHalfLifeScore.filter(_.value >=
                              Configs.scoreThresholdForClusterTopKTweetsCache)))

                        (
                          FullClusterId(modelVersion, clusterId),
                          TopKTweetsWithScores(Some(topTweetsByFavScore), None)
                        )
                    }
                  }.getOrElse(Nil)
              case _ =>
                Nil
            }
        }
        .observe("cluster_topk_tweets_updates")
        .sumByKey(lightStore)(topKTweetsWithScoresLightMonoid)
        .name(NodeName.ClusterTopKTweetsLightNodeName)
    }

    clusterTopKTweetsLight match {
      case Some(lightNode) =>
        tweetTopK.also(clusterTopKTweets).also(lightNode)
      case None =>
        tweetTopK.also(clusterTopKTweets)
    }
  }