def createStagingGroup()

in src/scala/com/twitter/timelines/prediction/common/aggregates/real_time/TimelinesOnlineAggregationConfigBase.scala [105:688]


  def createStagingGroup(prodGroup: AggregateGroup): AggregateGroup =
    prodGroup.copy(
      outputStore = StagingStore
    )

  // Aggregate user engagements/features by tweet Id.
  val tweetEngagement30MinuteCountsProd =
    AggregateGroup(
      inputSource = inputSource,
      aggregatePrefix = "real_time_tweet_aggregates_v1",
      keys = Set(TimelinesSharedFeatures.SOURCE_TWEET_ID),
      features = Set.empty,
      labels = TweetLabels ++ TweetNegativeEngagementDontLikeLabels,
      metrics = Set(CountMetric),
      halfLives = Set(30.minutes),
      outputStore = ProductionStore,
      includeAnyLabel = false,
      includeTimestampFeature = false,
    )

  // Aggregate user engagements/features by tweet Id.
  val tweetVerifiedDontLikeEngagementRealTimeAggregatesProd =
    AggregateGroup(
      inputSource = inputSource,
      aggregatePrefix = "real_time_tweet_aggregates_v6",
      preTransforms = Seq(RichRemoveUnverifiedUserTransform),
      keys = Set(TimelinesSharedFeatures.SOURCE_TWEET_ID),
      features = Set.empty,
      labels = TweetNegativeEngagementDontLikeLabels,
      metrics = Set(CountMetric),
      halfLives = Set(30.minutes, Duration.Top),
      outputStore = ProductionStore,
      includeAnyLabel = false,
      includeTimestampFeature = false,
    )

  val tweetNegativeEngagement6HourCounts =
    AggregateGroup(
      inputSource = inputSource,
      aggregatePrefix = "real_time_tweet_aggregates_v2",
      keys = Set(TimelinesSharedFeatures.SOURCE_TWEET_ID),
      features = Set.empty,
      labels = TweetNegativeEngagementLabels,
      metrics = Set(CountMetric),
      halfLives = Set(30.minutes),
      outputStore = ProductionStore,
      includeAnyLabel = false,
      includeTimestampFeature = false,
    )

  val tweetVerifiedNegativeEngagementCounts =
    AggregateGroup(
      inputSource = inputSource,
      aggregatePrefix = "real_time_tweet_aggregates_v7",
      preTransforms = Seq(RichRemoveUnverifiedUserTransform),
      keys = Set(TimelinesSharedFeatures.SOURCE_TWEET_ID),
      features = Set.empty,
      labels = TweetNegativeEngagementLabels,
      metrics = Set(CountMetric),
      halfLives = Set(30.minutes, Duration.Top),
      outputStore = ProductionStore,
      includeAnyLabel = false,
      includeTimestampFeature = false,
    )

  val promotedTweetEngagementRealTimeCounts =
    AggregateGroup(
      inputSource = inputSource,
      aggregatePrefix = "real_time_tweet_aggregates_v3.is_promoted",
      preTransforms = Seq(
        DownsampleTransform(
          negativeSamplingRate = 0.0,
          keepLabels = Set(ClientLogEventDataRecordFeatures.IsPromoted))),
      keys = Set(TimelinesSharedFeatures.SOURCE_TWEET_ID),
      features = Set.empty,
      labels = TweetCoreAndDwellLabels,
      metrics = Set(CountMetric),
      halfLives = Set(2.hours, 24.hours),
      outputStore = ProductionStore,
      includeAnyFeature = false,
      includeAnyLabel = false,
      includeTimestampFeature = false,
    )

  /**
   * Aggregate total engagement counts by tweet Id for non-public
   * engagements. Similar to EB's public engagement counts.
   */
  val tweetEngagementTotalCountsProd =
    AggregateGroup(
      inputSource = inputSource,
      aggregatePrefix = "real_time_tweet_aggregates_v1",
      keys = Set(TimelinesSharedFeatures.SOURCE_TWEET_ID),
      features = Set.empty,
      labels = TweetLabels ++ TweetNegativeEngagementDontLikeLabels,
      metrics = Set(CountMetric),
      halfLives = Set(Duration.Top),
      outputStore = ProductionStore,
      includeAnyLabel = false,
      includeTimestampFeature = false,
    )

  val tweetNegativeEngagementTotalCounts =
    AggregateGroup(
      inputSource = inputSource,
      aggregatePrefix = "real_time_tweet_aggregates_v2",
      keys = Set(TimelinesSharedFeatures.SOURCE_TWEET_ID),
      features = Set.empty,
      labels = TweetNegativeEngagementLabels,
      metrics = Set(CountMetric),
      halfLives = Set(Duration.Top),
      outputStore = ProductionStore,
      includeAnyLabel = false,
      includeTimestampFeature = false,
    )

  /**
   * Aggregate tweet features grouped by viewer's user id.
   */
  val userEngagementRealTimeAggregatesProd =
    AggregateGroup(
      inputSource = inputSource,
      aggregatePrefix = "real_time_user_aggregates_v1",
      keys = Set(SharedFeatures.USER_ID),
      features = TweetFeatures,
      labels = TweetLabels ++ TweetNegativeEngagementDontLikeLabels,
      metrics = Set(CountMetric),
      halfLives = Set(30.minutes),
      outputStore = ProductionStore,
      includeAnyLabel = false,
      includeTimestampFeature = false,
    )

  /**
   * Aggregate tweet features grouped by viewer's user id.
   */
  val userEngagementRealTimeAggregatesV2 =
    AggregateGroup(
      inputSource = inputSource,
      aggregatePrefix = "real_time_user_aggregates_v2",
      keys = Set(SharedFeatures.USER_ID),
      features = ClientLogEventDataRecordFeatures.TweetFeaturesV2,
      labels = TweetCoreAndDwellLabels,
      metrics = Set(CountMetric),
      halfLives = Set(30.minutes, Duration.Top),
      outputStore = ProductionStore,
      includeAnyFeature = false,
      includeAnyLabel = false,
      includeTimestampFeature = false,
    )

  /**
   * Aggregate author's user state features grouped by viewer's user id.
   */
  val userEngagementAuthorUserStateRealTimeAggregates =
    AggregateGroup(
      inputSource = inputSource,
      aggregatePrefix = "real_time_user_aggregates_v3",
      preTransforms = Seq.empty,
      keys = Set(SharedFeatures.USER_ID),
      features = AuthorFeaturesAdapter.UserStateBooleanFeatures,
      labels = TweetCoreAndDwellLabels,
      metrics = Set(CountMetric),
      halfLives = Set(30.minutes, Duration.Top),
      outputStore = ProductionStore,
      includeAnyFeature = false,
      includeAnyLabel = false,
      includeTimestampFeature = false,
    )

  /**
   * Aggregate author's user state features grouped by viewer's user id.
   */
  val userNegativeEngagementAuthorUserStateRealTimeAggregates =
    AggregateGroup(
      inputSource = inputSource,
      aggregatePrefix = "real_time_user_aggregates_v4",
      preTransforms = Seq.empty,
      keys = Set(SharedFeatures.USER_ID),
      features = AuthorFeaturesAdapter.UserStateBooleanFeatures,
      labels = TweetNegativeEngagementLabels ++ TweetNegativeEngagementDontLikeLabels,
      metrics = Set(CountMetric),
      halfLives = Set(30.minutes, Duration.Top),
      outputStore = ProductionStore,
      includeAnyFeature = false,
      includeAnyLabel = false,
      includeTimestampFeature = false,
    )

  /**
   * Aggregate tweet features grouped by viewer's user id, with 48 hour halfLife.
   */
  val userEngagement48HourRealTimeAggregatesProd =
    AggregateGroup(
      inputSource = inputSource,
      aggregatePrefix = "real_time_user_aggregates_v5",
      keys = Set(SharedFeatures.USER_ID),
      features = TweetFeatures,
      labels = TweetLabels ++ TweetNegativeEngagementDontLikeLabels,
      metrics = Set(CountMetric),
      halfLives = Set(48.hours),
      outputStore = ProductionStore,
      includeAnyLabel = false,
      includeTimestampFeature = false,
    )

  /**
   * Aggregate author's user state features grouped by viewer's user id.
   */
  val userNegativeEngagementAuthorUserState72HourRealTimeAggregates =
    AggregateGroup(
      inputSource = inputSource,
      aggregatePrefix = "real_time_user_aggregates_v6",
      preTransforms = Seq.empty,
      keys = Set(SharedFeatures.USER_ID),
      features = AuthorFeaturesAdapter.UserStateBooleanFeatures,
      labels = TweetNegativeEngagementLabels ++ TweetNegativeEngagementDontLikeLabels,
      metrics = Set(CountMetric),
      halfLives = Set(72.hours),
      outputStore = ProductionStore,
      includeAnyFeature = false,
      includeAnyLabel = false,
      includeTimestampFeature = false,
    )

  /**
   * Aggregate features grouped by source author id: for each author, aggregate features are created
   * to quantify engagements (fav, reply, etc.) which tweets of the author has received.
   */
  val authorEngagementRealTimeAggregatesProd =
    AggregateGroup(
      inputSource = inputSource,
      aggregatePrefix = "real_time_author_aggregates_v1",
      keys = Set(TimelinesSharedFeatures.SOURCE_AUTHOR_ID),
      features = Set.empty,
      labels = TweetLabels ++ TweetNegativeEngagementDontLikeLabels,
      metrics = Set(CountMetric),
      halfLives = Set(30.minutes, Duration.Top),
      outputStore = ProductionStore,
      includeAnyLabel = false,
      includeTimestampFeature = false,
    )

  /**
   * Aggregate features grouped by source author id: for each author, aggregate features are created
   * to quantify negative engagements (mute, block, etc.) which tweets of the author has received.
   *
   * This aggregate group is not used in Home, but it is used in Follow Recommendation Service so need to keep it for now.
   *
   */
  val authorNegativeEngagementRealTimeAggregatesProd =
    AggregateGroup(
      inputSource = inputSource,
      aggregatePrefix = "real_time_author_aggregates_v2",
      keys = Set(TimelinesSharedFeatures.SOURCE_AUTHOR_ID),
      features = Set.empty,
      labels = TweetNegativeEngagementLabels,
      metrics = Set(CountMetric),
      halfLives = Set(30.minutes, Duration.Top),
      outputStore = ProductionStore,
      includeAnyLabel = false,
      includeTimestampFeature = false,
    )

  /**
   * Aggregate features grouped by source author id: for each author, aggregate features are created
   * to quantify negative engagements (don't like) which tweets of the author has received from
   * verified users.
   */
  val authorVerifiedNegativeEngagementRealTimeAggregatesProd =
    AggregateGroup(
      inputSource = inputSource,
      aggregatePrefix = "real_time_author_aggregates_v3",
      preTransforms = Seq(RichRemoveUnverifiedUserTransform),
      keys = Set(TimelinesSharedFeatures.SOURCE_AUTHOR_ID),
      features = Set.empty,
      labels = TweetNegativeEngagementDontLikeLabels,
      metrics = Set(CountMetric),
      halfLives = Set(30.minutes),
      outputStore = ProductionStore,
      includeAnyLabel = false,
      includeTimestampFeature = false,
    )

  /**
   * Aggregate tweet features grouped by topic id.
   */
  val topicEngagementRealTimeAggregatesProd =
    AggregateGroup(
      inputSource = inputSource,
      aggregatePrefix = "real_time_topic_aggregates_v1",
      keys = Set(TimelinesSharedFeatures.TOPIC_ID),
      features = Set.empty,
      labels = TweetLabels ++ AllTweetNegativeEngagementLabels,
      metrics = Set(CountMetric),
      halfLives = Set(30.minutes, Duration.Top),
      outputStore = ProductionStore,
      includeAnyLabel = false,
      includeTimestampFeature = false,
    )

  /**
   * Aggregate user engagements / user state by topic id.
   */
  val topicEngagementUserStateRealTimeAggregatesProd =
    AggregateGroup(
      inputSource = inputSource,
      aggregatePrefix = "real_time_topic_aggregates_v2",
      keys = Set(TimelinesSharedFeatures.TOPIC_ID),
      features = UserFeaturesAdapter.UserStateBooleanFeatures,
      labels = TweetCoreAndDwellLabels,
      metrics = Set(CountMetric),
      halfLives = Set(30.minutes, Duration.Top),
      outputStore = ProductionStore,
      includeAnyFeature = false,
      includeAnyLabel = false,
      includeTimestampFeature = false,
    )

  /**
   * Aggregate user negative engagements / user state by topic id.
   */
  val topicNegativeEngagementUserStateRealTimeAggregatesProd =
    AggregateGroup(
      inputSource = inputSource,
      aggregatePrefix = "real_time_topic_aggregates_v3",
      keys = Set(TimelinesSharedFeatures.TOPIC_ID),
      features = UserFeaturesAdapter.UserStateBooleanFeatures,
      labels = TweetNegativeEngagementLabels ++ TweetNegativeEngagementDontLikeLabels,
      metrics = Set(CountMetric),
      halfLives = Set(30.minutes, Duration.Top),
      outputStore = ProductionStore,
      includeAnyFeature = false,
      includeAnyLabel = false,
      includeTimestampFeature = false,
    )

  /**
   * Aggregate tweet features grouped by topic id like real_time_topic_aggregates_v1 but 24hour halfLife
   */
  val topicEngagement24HourRealTimeAggregatesProd =
    AggregateGroup(
      inputSource = inputSource,
      aggregatePrefix = "real_time_topic_aggregates_v4",
      keys = Set(TimelinesSharedFeatures.TOPIC_ID),
      features = Set.empty,
      labels = TweetLabels ++ AllTweetNegativeEngagementLabels,
      metrics = Set(CountMetric),
      halfLives = Set(24.hours),
      outputStore = ProductionStore,
      includeAnyLabel = false,
      includeTimestampFeature = false,
    )

  // Aggregate user engagements / user state by tweet Id.
  val tweetEngagementUserStateRealTimeAggregatesProd =
    AggregateGroup(
      inputSource = inputSource,
      aggregatePrefix = "real_time_tweet_aggregates_v3",
      keys = Set(TimelinesSharedFeatures.SOURCE_TWEET_ID),
      features = UserFeaturesAdapter.UserStateBooleanFeatures,
      labels = TweetCoreAndDwellLabels,
      metrics = Set(CountMetric),
      halfLives = Set(30.minutes, Duration.Top),
      outputStore = ProductionStore,
      includeAnyFeature = false,
      includeAnyLabel = false,
      includeTimestampFeature = false,
    )

  // Aggregate user engagements / user gender by tweet Id.
  val tweetEngagementGenderRealTimeAggregatesProd =
    AggregateGroup(
      inputSource = inputSource,
      aggregatePrefix = "real_time_tweet_aggregates_v4",
      keys = Set(TimelinesSharedFeatures.SOURCE_TWEET_ID),
      features = UserFeaturesAdapter.GenderBooleanFeatures,
      labels =
        TweetCoreAndDwellLabels ++ TweetNegativeEngagementLabels ++ TweetNegativeEngagementDontLikeLabels,
      metrics = Set(CountMetric),
      halfLives = Set(30.minutes, Duration.Top),
      outputStore = ProductionStore,
      includeAnyFeature = false,
      includeAnyLabel = false,
      includeTimestampFeature = false,
    )

  // Aggregate user negative engagements / user state by tweet Id.
  val tweetNegativeEngagementUserStateRealTimeAggregates =
    AggregateGroup(
      inputSource = inputSource,
      aggregatePrefix = "real_time_tweet_aggregates_v5",
      keys = Set(TimelinesSharedFeatures.SOURCE_TWEET_ID),
      features = UserFeaturesAdapter.UserStateBooleanFeatures,
      labels = TweetNegativeEngagementLabels ++ TweetNegativeEngagementDontLikeLabels,
      metrics = Set(CountMetric),
      halfLives = Set(30.minutes, Duration.Top),
      outputStore = ProductionStore,
      includeAnyFeature = false,
      includeAnyLabel = false,
      includeTimestampFeature = false,
    )

  // Aggregate user negative engagements / user state by tweet Id.
  val tweetVerifiedNegativeEngagementUserStateRealTimeAggregates =
    AggregateGroup(
      inputSource = inputSource,
      aggregatePrefix = "real_time_tweet_aggregates_v8",
      preTransforms = Seq(RichRemoveUnverifiedUserTransform),
      keys = Set(TimelinesSharedFeatures.SOURCE_TWEET_ID),
      features = UserFeaturesAdapter.UserStateBooleanFeatures,
      labels = TweetNegativeEngagementLabels ++ TweetNegativeEngagementDontLikeLabels,
      metrics = Set(CountMetric),
      halfLives = Set(30.minutes, Duration.Top),
      outputStore = ProductionStore,
      includeAnyFeature = false,
      includeAnyLabel = false,
      includeTimestampFeature = false,
    )

  /**
   * Aggregate tweet engagement labels and candidate tweet source features grouped by user id.
   */
  val userCandidateTweetSourceEngagementRealTimeAggregatesProd =
    AggregateGroup(
      inputSource = inputSource,
      aggregatePrefix = "real_time_user_candidate_tweet_source_aggregates_v1",
      keys = Set(SharedFeatures.USER_ID),
      features = CandidateTweetSourceFeatures,
      labels = TweetCoreAndDwellLabels ++ NegativeEngagementsRealTimeDontLike,
      metrics = Set(CountMetric),
      halfLives = Set(30.minutes, Duration.Top),
      outputStore = ProductionStore,
      includeAnyFeature = false,
      includeAnyLabel = false,
      includeTimestampFeature = false,
    )

  /**
   * Aggregate tweet engagement labels and candidate tweet source features grouped by user id.
   */
  val userCandidateTweetSourceEngagement48HourRealTimeAggregatesProd =
    AggregateGroup(
      inputSource = inputSource,
      aggregatePrefix = "real_time_user_candidate_tweet_source_aggregates_v2",
      keys = Set(SharedFeatures.USER_ID),
      features = CandidateTweetSourceFeatures,
      labels = TweetCoreAndDwellLabels ++ NegativeEngagementsRealTimeDontLike,
      metrics = Set(CountMetric),
      halfLives = Set(48.hours),
      outputStore = ProductionStore,
      includeAnyFeature = false,
      includeAnyLabel = false,
      includeTimestampFeature = false,
    )

  /**
   * Aggregate tweet features grouped by viewer's user id on Profile engagements
   */
  val userProfileEngagementRealTimeAggregates =
    AggregateGroup(
      inputSource = inputSource,
      aggregatePrefix = "profile_real_time_user_aggregates_v1",
      preTransforms = Seq(IsNewUserTransform),
      keys = Set(SharedFeatures.USER_ID),
      features = TweetFeatures,
      labels = ProfileCoreLabels,
      metrics = Set(CountMetric),
      halfLives = Set(30.minutes, Duration.Top),
      outputStore = ProductionStore,
      includeAnyFeature = true,
      includeAnyLabel = false,
      includeTimestampFeature = false,
    )

  val NegativeEngagementsUnionTransform = RichITransform(
    BinaryUnion(
      featuresToUnify = ProfileNegativeEngagementLabels,
      outputFeature = ProfileLabelFeatures.IS_NEGATIVE_FEEDBACK_UNION
    ))

  /**
   * Aggregate tweet features grouped by viewer's user id on Profile negative engagements.
   */
  val userProfileNegativeEngagementRealTimeAggregates =
    AggregateGroup(
      inputSource = inputSource,
      aggregatePrefix = "profile_negative_engagement_real_time_user_aggregates_v1",
      preTransforms = Seq(NegativeEngagementsUnionTransform),
      keys = Set(SharedFeatures.USER_ID),
      features = Set.empty,
      labels = ProfileNegativeEngagementLabels ++ ProfileNegativeEngagementUnionLabels,
      metrics = Set(CountMetric),
      halfLives = Set(30.minutes, 72.hours, 14.day),
      outputStore = ProductionStore,
      includeAnyFeature = true,
      includeAnyLabel = false,
      includeTimestampFeature = false,
    )

  /**
   * Aggregate tweet features grouped by viewer's and author's user ids and on Profile engagements
   */
  val userAuthorProfileEngagementRealTimeAggregates =
    AggregateGroup(
      inputSource = inputSource,
      aggregatePrefix = "user_author_profile_real_time_aggregates_v1",
      keys = Set(SharedFeatures.USER_ID, TimelinesSharedFeatures.SOURCE_AUTHOR_ID),
      features = Set.empty,
      labels = ProfileCoreLabels,
      metrics = Set(CountMetric),
      halfLives = Set(30.minutes, 24.hours, 72.hours),
      outputStore = ProductionStore,
      includeAnyFeature = true,
      includeAnyLabel = false,
      includeTimestampFeature = false,
    )

  /**
   * Aggregate tweet features grouped by viewer's and author's user ids and on negative Profile engagements
   */
  val userAuthorProfileNegativeEngagementRealTimeAggregates =
    AggregateGroup(
      inputSource = inputSource,
      aggregatePrefix = "user_author_profile_negative_engagement_real_time_aggregates_v1",
      preTransforms = Seq(NegativeEngagementsUnionTransform),
      keys = Set(SharedFeatures.USER_ID, TimelinesSharedFeatures.SOURCE_AUTHOR_ID),
      features = Set.empty,
      labels = ProfileNegativeEngagementUnionLabels,
      metrics = Set(CountMetric),
      halfLives = Set(30.minutes, 72.hours, 14.day),
      outputStore = ProductionStore,
      includeAnyFeature = true,
      includeAnyLabel = false,
      includeTimestampFeature = false,
    )

  val newUserAuthorEngagementRealTimeAggregatesProd =
    AggregateGroup(
      inputSource = inputSource,
      aggregatePrefix = "real_time_new_user_author_aggregates_v1",
      preTransforms = Seq(IsNewUserTransform),
      keys = Set(SharedFeatures.USER_ID, TimelinesSharedFeatures.SOURCE_AUTHOR_ID),
      features = Set.empty,
      labels = TweetCoreAndDwellLabels ++ Set(
        IS_CLICKED,
        IS_PROFILE_CLICKED,
        IS_PHOTO_EXPANDED
      ),
      metrics = Set(CountMetric),
      halfLives = Set(30.minutes, Duration.Top),
      outputStore = ProductionStore,
      includeAnyFeature = true,
      includeAnyLabel = false,
      includeTimestampFeature = false,
    )

  val userAuthorEngagementRealTimeAggregatesProd = {
    // Computing user-author real-time aggregates is very expensive so we
    // take the union of all major negative feedback engagements to create
    // a single negtive label for aggregation. We also include a number of
    // core positive engagements.
    val BinaryUnionNegativeEngagements =
      BinaryUnion(
        featuresToUnify = AllTweetNegativeEngagementLabels,
        outputFeature = IS_NEGATIVE_FEEDBACK_UNION
      )
    val BinaryUnionNegativeEngagementsTransform = RichITransform(BinaryUnionNegativeEngagements)

    AggregateGroup(
      inputSource = inputSource,
      aggregatePrefix = "real_time_user_author_aggregates_v1",
      preTransforms = Seq(BinaryUnionNegativeEngagementsTransform),
      keys = Set(SharedFeatures.USER_ID, TimelinesSharedFeatures.SOURCE_AUTHOR_ID),
      features = Set.empty,
      labels = UserAuthorEngagementLabels,
      metrics = Set(CountMetric),
      halfLives = Set(30.minutes, 1.day),
      outputStore = ProductionStore,
      includeAnyFeature = true,
      includeAnyLabel = false,
      includeTimestampFeature = false,
    )
  }