in src/scala/com/twitter/timelines/prediction/common/aggregates/real_time/TimelinesOnlineAggregationConfigBase.scala [105:688]
def createStagingGroup(prodGroup: AggregateGroup): AggregateGroup =
prodGroup.copy(
outputStore = StagingStore
)
// Aggregate user engagements/features by tweet Id.
val tweetEngagement30MinuteCountsProd =
AggregateGroup(
inputSource = inputSource,
aggregatePrefix = "real_time_tweet_aggregates_v1",
keys = Set(TimelinesSharedFeatures.SOURCE_TWEET_ID),
features = Set.empty,
labels = TweetLabels ++ TweetNegativeEngagementDontLikeLabels,
metrics = Set(CountMetric),
halfLives = Set(30.minutes),
outputStore = ProductionStore,
includeAnyLabel = false,
includeTimestampFeature = false,
)
// Aggregate user engagements/features by tweet Id.
val tweetVerifiedDontLikeEngagementRealTimeAggregatesProd =
AggregateGroup(
inputSource = inputSource,
aggregatePrefix = "real_time_tweet_aggregates_v6",
preTransforms = Seq(RichRemoveUnverifiedUserTransform),
keys = Set(TimelinesSharedFeatures.SOURCE_TWEET_ID),
features = Set.empty,
labels = TweetNegativeEngagementDontLikeLabels,
metrics = Set(CountMetric),
halfLives = Set(30.minutes, Duration.Top),
outputStore = ProductionStore,
includeAnyLabel = false,
includeTimestampFeature = false,
)
val tweetNegativeEngagement6HourCounts =
AggregateGroup(
inputSource = inputSource,
aggregatePrefix = "real_time_tweet_aggregates_v2",
keys = Set(TimelinesSharedFeatures.SOURCE_TWEET_ID),
features = Set.empty,
labels = TweetNegativeEngagementLabels,
metrics = Set(CountMetric),
halfLives = Set(30.minutes),
outputStore = ProductionStore,
includeAnyLabel = false,
includeTimestampFeature = false,
)
val tweetVerifiedNegativeEngagementCounts =
AggregateGroup(
inputSource = inputSource,
aggregatePrefix = "real_time_tweet_aggregates_v7",
preTransforms = Seq(RichRemoveUnverifiedUserTransform),
keys = Set(TimelinesSharedFeatures.SOURCE_TWEET_ID),
features = Set.empty,
labels = TweetNegativeEngagementLabels,
metrics = Set(CountMetric),
halfLives = Set(30.minutes, Duration.Top),
outputStore = ProductionStore,
includeAnyLabel = false,
includeTimestampFeature = false,
)
val promotedTweetEngagementRealTimeCounts =
AggregateGroup(
inputSource = inputSource,
aggregatePrefix = "real_time_tweet_aggregates_v3.is_promoted",
preTransforms = Seq(
DownsampleTransform(
negativeSamplingRate = 0.0,
keepLabels = Set(ClientLogEventDataRecordFeatures.IsPromoted))),
keys = Set(TimelinesSharedFeatures.SOURCE_TWEET_ID),
features = Set.empty,
labels = TweetCoreAndDwellLabels,
metrics = Set(CountMetric),
halfLives = Set(2.hours, 24.hours),
outputStore = ProductionStore,
includeAnyFeature = false,
includeAnyLabel = false,
includeTimestampFeature = false,
)
/**
* Aggregate total engagement counts by tweet Id for non-public
* engagements. Similar to EB's public engagement counts.
*/
val tweetEngagementTotalCountsProd =
AggregateGroup(
inputSource = inputSource,
aggregatePrefix = "real_time_tweet_aggregates_v1",
keys = Set(TimelinesSharedFeatures.SOURCE_TWEET_ID),
features = Set.empty,
labels = TweetLabels ++ TweetNegativeEngagementDontLikeLabels,
metrics = Set(CountMetric),
halfLives = Set(Duration.Top),
outputStore = ProductionStore,
includeAnyLabel = false,
includeTimestampFeature = false,
)
val tweetNegativeEngagementTotalCounts =
AggregateGroup(
inputSource = inputSource,
aggregatePrefix = "real_time_tweet_aggregates_v2",
keys = Set(TimelinesSharedFeatures.SOURCE_TWEET_ID),
features = Set.empty,
labels = TweetNegativeEngagementLabels,
metrics = Set(CountMetric),
halfLives = Set(Duration.Top),
outputStore = ProductionStore,
includeAnyLabel = false,
includeTimestampFeature = false,
)
/**
* Aggregate tweet features grouped by viewer's user id.
*/
val userEngagementRealTimeAggregatesProd =
AggregateGroup(
inputSource = inputSource,
aggregatePrefix = "real_time_user_aggregates_v1",
keys = Set(SharedFeatures.USER_ID),
features = TweetFeatures,
labels = TweetLabels ++ TweetNegativeEngagementDontLikeLabels,
metrics = Set(CountMetric),
halfLives = Set(30.minutes),
outputStore = ProductionStore,
includeAnyLabel = false,
includeTimestampFeature = false,
)
/**
* Aggregate tweet features grouped by viewer's user id.
*/
val userEngagementRealTimeAggregatesV2 =
AggregateGroup(
inputSource = inputSource,
aggregatePrefix = "real_time_user_aggregates_v2",
keys = Set(SharedFeatures.USER_ID),
features = ClientLogEventDataRecordFeatures.TweetFeaturesV2,
labels = TweetCoreAndDwellLabels,
metrics = Set(CountMetric),
halfLives = Set(30.minutes, Duration.Top),
outputStore = ProductionStore,
includeAnyFeature = false,
includeAnyLabel = false,
includeTimestampFeature = false,
)
/**
* Aggregate author's user state features grouped by viewer's user id.
*/
val userEngagementAuthorUserStateRealTimeAggregates =
AggregateGroup(
inputSource = inputSource,
aggregatePrefix = "real_time_user_aggregates_v3",
preTransforms = Seq.empty,
keys = Set(SharedFeatures.USER_ID),
features = AuthorFeaturesAdapter.UserStateBooleanFeatures,
labels = TweetCoreAndDwellLabels,
metrics = Set(CountMetric),
halfLives = Set(30.minutes, Duration.Top),
outputStore = ProductionStore,
includeAnyFeature = false,
includeAnyLabel = false,
includeTimestampFeature = false,
)
/**
* Aggregate author's user state features grouped by viewer's user id.
*/
val userNegativeEngagementAuthorUserStateRealTimeAggregates =
AggregateGroup(
inputSource = inputSource,
aggregatePrefix = "real_time_user_aggregates_v4",
preTransforms = Seq.empty,
keys = Set(SharedFeatures.USER_ID),
features = AuthorFeaturesAdapter.UserStateBooleanFeatures,
labels = TweetNegativeEngagementLabels ++ TweetNegativeEngagementDontLikeLabels,
metrics = Set(CountMetric),
halfLives = Set(30.minutes, Duration.Top),
outputStore = ProductionStore,
includeAnyFeature = false,
includeAnyLabel = false,
includeTimestampFeature = false,
)
/**
* Aggregate tweet features grouped by viewer's user id, with 48 hour halfLife.
*/
val userEngagement48HourRealTimeAggregatesProd =
AggregateGroup(
inputSource = inputSource,
aggregatePrefix = "real_time_user_aggregates_v5",
keys = Set(SharedFeatures.USER_ID),
features = TweetFeatures,
labels = TweetLabels ++ TweetNegativeEngagementDontLikeLabels,
metrics = Set(CountMetric),
halfLives = Set(48.hours),
outputStore = ProductionStore,
includeAnyLabel = false,
includeTimestampFeature = false,
)
/**
* Aggregate author's user state features grouped by viewer's user id.
*/
val userNegativeEngagementAuthorUserState72HourRealTimeAggregates =
AggregateGroup(
inputSource = inputSource,
aggregatePrefix = "real_time_user_aggregates_v6",
preTransforms = Seq.empty,
keys = Set(SharedFeatures.USER_ID),
features = AuthorFeaturesAdapter.UserStateBooleanFeatures,
labels = TweetNegativeEngagementLabels ++ TweetNegativeEngagementDontLikeLabels,
metrics = Set(CountMetric),
halfLives = Set(72.hours),
outputStore = ProductionStore,
includeAnyFeature = false,
includeAnyLabel = false,
includeTimestampFeature = false,
)
/**
* Aggregate features grouped by source author id: for each author, aggregate features are created
* to quantify engagements (fav, reply, etc.) which tweets of the author has received.
*/
val authorEngagementRealTimeAggregatesProd =
AggregateGroup(
inputSource = inputSource,
aggregatePrefix = "real_time_author_aggregates_v1",
keys = Set(TimelinesSharedFeatures.SOURCE_AUTHOR_ID),
features = Set.empty,
labels = TweetLabels ++ TweetNegativeEngagementDontLikeLabels,
metrics = Set(CountMetric),
halfLives = Set(30.minutes, Duration.Top),
outputStore = ProductionStore,
includeAnyLabel = false,
includeTimestampFeature = false,
)
/**
* Aggregate features grouped by source author id: for each author, aggregate features are created
* to quantify negative engagements (mute, block, etc.) which tweets of the author has received.
*
* This aggregate group is not used in Home, but it is used in Follow Recommendation Service so need to keep it for now.
*
*/
val authorNegativeEngagementRealTimeAggregatesProd =
AggregateGroup(
inputSource = inputSource,
aggregatePrefix = "real_time_author_aggregates_v2",
keys = Set(TimelinesSharedFeatures.SOURCE_AUTHOR_ID),
features = Set.empty,
labels = TweetNegativeEngagementLabels,
metrics = Set(CountMetric),
halfLives = Set(30.minutes, Duration.Top),
outputStore = ProductionStore,
includeAnyLabel = false,
includeTimestampFeature = false,
)
/**
* Aggregate features grouped by source author id: for each author, aggregate features are created
* to quantify negative engagements (don't like) which tweets of the author has received from
* verified users.
*/
val authorVerifiedNegativeEngagementRealTimeAggregatesProd =
AggregateGroup(
inputSource = inputSource,
aggregatePrefix = "real_time_author_aggregates_v3",
preTransforms = Seq(RichRemoveUnverifiedUserTransform),
keys = Set(TimelinesSharedFeatures.SOURCE_AUTHOR_ID),
features = Set.empty,
labels = TweetNegativeEngagementDontLikeLabels,
metrics = Set(CountMetric),
halfLives = Set(30.minutes),
outputStore = ProductionStore,
includeAnyLabel = false,
includeTimestampFeature = false,
)
/**
* Aggregate tweet features grouped by topic id.
*/
val topicEngagementRealTimeAggregatesProd =
AggregateGroup(
inputSource = inputSource,
aggregatePrefix = "real_time_topic_aggregates_v1",
keys = Set(TimelinesSharedFeatures.TOPIC_ID),
features = Set.empty,
labels = TweetLabels ++ AllTweetNegativeEngagementLabels,
metrics = Set(CountMetric),
halfLives = Set(30.minutes, Duration.Top),
outputStore = ProductionStore,
includeAnyLabel = false,
includeTimestampFeature = false,
)
/**
* Aggregate user engagements / user state by topic id.
*/
val topicEngagementUserStateRealTimeAggregatesProd =
AggregateGroup(
inputSource = inputSource,
aggregatePrefix = "real_time_topic_aggregates_v2",
keys = Set(TimelinesSharedFeatures.TOPIC_ID),
features = UserFeaturesAdapter.UserStateBooleanFeatures,
labels = TweetCoreAndDwellLabels,
metrics = Set(CountMetric),
halfLives = Set(30.minutes, Duration.Top),
outputStore = ProductionStore,
includeAnyFeature = false,
includeAnyLabel = false,
includeTimestampFeature = false,
)
/**
* Aggregate user negative engagements / user state by topic id.
*/
val topicNegativeEngagementUserStateRealTimeAggregatesProd =
AggregateGroup(
inputSource = inputSource,
aggregatePrefix = "real_time_topic_aggregates_v3",
keys = Set(TimelinesSharedFeatures.TOPIC_ID),
features = UserFeaturesAdapter.UserStateBooleanFeatures,
labels = TweetNegativeEngagementLabels ++ TweetNegativeEngagementDontLikeLabels,
metrics = Set(CountMetric),
halfLives = Set(30.minutes, Duration.Top),
outputStore = ProductionStore,
includeAnyFeature = false,
includeAnyLabel = false,
includeTimestampFeature = false,
)
/**
* Aggregate tweet features grouped by topic id like real_time_topic_aggregates_v1 but 24hour halfLife
*/
val topicEngagement24HourRealTimeAggregatesProd =
AggregateGroup(
inputSource = inputSource,
aggregatePrefix = "real_time_topic_aggregates_v4",
keys = Set(TimelinesSharedFeatures.TOPIC_ID),
features = Set.empty,
labels = TweetLabels ++ AllTweetNegativeEngagementLabels,
metrics = Set(CountMetric),
halfLives = Set(24.hours),
outputStore = ProductionStore,
includeAnyLabel = false,
includeTimestampFeature = false,
)
// Aggregate user engagements / user state by tweet Id.
val tweetEngagementUserStateRealTimeAggregatesProd =
AggregateGroup(
inputSource = inputSource,
aggregatePrefix = "real_time_tweet_aggregates_v3",
keys = Set(TimelinesSharedFeatures.SOURCE_TWEET_ID),
features = UserFeaturesAdapter.UserStateBooleanFeatures,
labels = TweetCoreAndDwellLabels,
metrics = Set(CountMetric),
halfLives = Set(30.minutes, Duration.Top),
outputStore = ProductionStore,
includeAnyFeature = false,
includeAnyLabel = false,
includeTimestampFeature = false,
)
// Aggregate user engagements / user gender by tweet Id.
val tweetEngagementGenderRealTimeAggregatesProd =
AggregateGroup(
inputSource = inputSource,
aggregatePrefix = "real_time_tweet_aggregates_v4",
keys = Set(TimelinesSharedFeatures.SOURCE_TWEET_ID),
features = UserFeaturesAdapter.GenderBooleanFeatures,
labels =
TweetCoreAndDwellLabels ++ TweetNegativeEngagementLabels ++ TweetNegativeEngagementDontLikeLabels,
metrics = Set(CountMetric),
halfLives = Set(30.minutes, Duration.Top),
outputStore = ProductionStore,
includeAnyFeature = false,
includeAnyLabel = false,
includeTimestampFeature = false,
)
// Aggregate user negative engagements / user state by tweet Id.
val tweetNegativeEngagementUserStateRealTimeAggregates =
AggregateGroup(
inputSource = inputSource,
aggregatePrefix = "real_time_tweet_aggregates_v5",
keys = Set(TimelinesSharedFeatures.SOURCE_TWEET_ID),
features = UserFeaturesAdapter.UserStateBooleanFeatures,
labels = TweetNegativeEngagementLabels ++ TweetNegativeEngagementDontLikeLabels,
metrics = Set(CountMetric),
halfLives = Set(30.minutes, Duration.Top),
outputStore = ProductionStore,
includeAnyFeature = false,
includeAnyLabel = false,
includeTimestampFeature = false,
)
// Aggregate user negative engagements / user state by tweet Id.
val tweetVerifiedNegativeEngagementUserStateRealTimeAggregates =
AggregateGroup(
inputSource = inputSource,
aggregatePrefix = "real_time_tweet_aggregates_v8",
preTransforms = Seq(RichRemoveUnverifiedUserTransform),
keys = Set(TimelinesSharedFeatures.SOURCE_TWEET_ID),
features = UserFeaturesAdapter.UserStateBooleanFeatures,
labels = TweetNegativeEngagementLabels ++ TweetNegativeEngagementDontLikeLabels,
metrics = Set(CountMetric),
halfLives = Set(30.minutes, Duration.Top),
outputStore = ProductionStore,
includeAnyFeature = false,
includeAnyLabel = false,
includeTimestampFeature = false,
)
/**
* Aggregate tweet engagement labels and candidate tweet source features grouped by user id.
*/
val userCandidateTweetSourceEngagementRealTimeAggregatesProd =
AggregateGroup(
inputSource = inputSource,
aggregatePrefix = "real_time_user_candidate_tweet_source_aggregates_v1",
keys = Set(SharedFeatures.USER_ID),
features = CandidateTweetSourceFeatures,
labels = TweetCoreAndDwellLabels ++ NegativeEngagementsRealTimeDontLike,
metrics = Set(CountMetric),
halfLives = Set(30.minutes, Duration.Top),
outputStore = ProductionStore,
includeAnyFeature = false,
includeAnyLabel = false,
includeTimestampFeature = false,
)
/**
* Aggregate tweet engagement labels and candidate tweet source features grouped by user id.
*/
val userCandidateTweetSourceEngagement48HourRealTimeAggregatesProd =
AggregateGroup(
inputSource = inputSource,
aggregatePrefix = "real_time_user_candidate_tweet_source_aggregates_v2",
keys = Set(SharedFeatures.USER_ID),
features = CandidateTweetSourceFeatures,
labels = TweetCoreAndDwellLabels ++ NegativeEngagementsRealTimeDontLike,
metrics = Set(CountMetric),
halfLives = Set(48.hours),
outputStore = ProductionStore,
includeAnyFeature = false,
includeAnyLabel = false,
includeTimestampFeature = false,
)
/**
* Aggregate tweet features grouped by viewer's user id on Profile engagements
*/
val userProfileEngagementRealTimeAggregates =
AggregateGroup(
inputSource = inputSource,
aggregatePrefix = "profile_real_time_user_aggregates_v1",
preTransforms = Seq(IsNewUserTransform),
keys = Set(SharedFeatures.USER_ID),
features = TweetFeatures,
labels = ProfileCoreLabels,
metrics = Set(CountMetric),
halfLives = Set(30.minutes, Duration.Top),
outputStore = ProductionStore,
includeAnyFeature = true,
includeAnyLabel = false,
includeTimestampFeature = false,
)
val NegativeEngagementsUnionTransform = RichITransform(
BinaryUnion(
featuresToUnify = ProfileNegativeEngagementLabels,
outputFeature = ProfileLabelFeatures.IS_NEGATIVE_FEEDBACK_UNION
))
/**
* Aggregate tweet features grouped by viewer's user id on Profile negative engagements.
*/
val userProfileNegativeEngagementRealTimeAggregates =
AggregateGroup(
inputSource = inputSource,
aggregatePrefix = "profile_negative_engagement_real_time_user_aggregates_v1",
preTransforms = Seq(NegativeEngagementsUnionTransform),
keys = Set(SharedFeatures.USER_ID),
features = Set.empty,
labels = ProfileNegativeEngagementLabels ++ ProfileNegativeEngagementUnionLabels,
metrics = Set(CountMetric),
halfLives = Set(30.minutes, 72.hours, 14.day),
outputStore = ProductionStore,
includeAnyFeature = true,
includeAnyLabel = false,
includeTimestampFeature = false,
)
/**
* Aggregate tweet features grouped by viewer's and author's user ids and on Profile engagements
*/
val userAuthorProfileEngagementRealTimeAggregates =
AggregateGroup(
inputSource = inputSource,
aggregatePrefix = "user_author_profile_real_time_aggregates_v1",
keys = Set(SharedFeatures.USER_ID, TimelinesSharedFeatures.SOURCE_AUTHOR_ID),
features = Set.empty,
labels = ProfileCoreLabels,
metrics = Set(CountMetric),
halfLives = Set(30.minutes, 24.hours, 72.hours),
outputStore = ProductionStore,
includeAnyFeature = true,
includeAnyLabel = false,
includeTimestampFeature = false,
)
/**
* Aggregate tweet features grouped by viewer's and author's user ids and on negative Profile engagements
*/
val userAuthorProfileNegativeEngagementRealTimeAggregates =
AggregateGroup(
inputSource = inputSource,
aggregatePrefix = "user_author_profile_negative_engagement_real_time_aggregates_v1",
preTransforms = Seq(NegativeEngagementsUnionTransform),
keys = Set(SharedFeatures.USER_ID, TimelinesSharedFeatures.SOURCE_AUTHOR_ID),
features = Set.empty,
labels = ProfileNegativeEngagementUnionLabels,
metrics = Set(CountMetric),
halfLives = Set(30.minutes, 72.hours, 14.day),
outputStore = ProductionStore,
includeAnyFeature = true,
includeAnyLabel = false,
includeTimestampFeature = false,
)
val newUserAuthorEngagementRealTimeAggregatesProd =
AggregateGroup(
inputSource = inputSource,
aggregatePrefix = "real_time_new_user_author_aggregates_v1",
preTransforms = Seq(IsNewUserTransform),
keys = Set(SharedFeatures.USER_ID, TimelinesSharedFeatures.SOURCE_AUTHOR_ID),
features = Set.empty,
labels = TweetCoreAndDwellLabels ++ Set(
IS_CLICKED,
IS_PROFILE_CLICKED,
IS_PHOTO_EXPANDED
),
metrics = Set(CountMetric),
halfLives = Set(30.minutes, Duration.Top),
outputStore = ProductionStore,
includeAnyFeature = true,
includeAnyLabel = false,
includeTimestampFeature = false,
)
val userAuthorEngagementRealTimeAggregatesProd = {
// Computing user-author real-time aggregates is very expensive so we
// take the union of all major negative feedback engagements to create
// a single negtive label for aggregation. We also include a number of
// core positive engagements.
val BinaryUnionNegativeEngagements =
BinaryUnion(
featuresToUnify = AllTweetNegativeEngagementLabels,
outputFeature = IS_NEGATIVE_FEEDBACK_UNION
)
val BinaryUnionNegativeEngagementsTransform = RichITransform(BinaryUnionNegativeEngagements)
AggregateGroup(
inputSource = inputSource,
aggregatePrefix = "real_time_user_author_aggregates_v1",
preTransforms = Seq(BinaryUnionNegativeEngagementsTransform),
keys = Set(SharedFeatures.USER_ID, TimelinesSharedFeatures.SOURCE_AUTHOR_ID),
features = Set.empty,
labels = UserAuthorEngagementLabels,
metrics = Set(CountMetric),
halfLives = Set(30.minutes, 1.day),
outputStore = ProductionStore,
includeAnyFeature = true,
includeAnyLabel = false,
includeTimestampFeature = false,
)
}