in cr-mixer/server/src/main/scala/com/twitter/cr_mixer/module/TweetInfoStoreModule.scala [48:204]
override def modules: Seq[Module] = Seq(UnifiedCacheClient)
@Provides
@Singleton
def providesTweetInfoStore(
statsReceiver: StatsReceiver,
serviceIdentifier: ServiceIdentifier,
stratoClient: StratoClient,
@Named(ModuleNames.UnifiedCache) crMixerUnifiedCacheClient: MemcachedClient,
manhattanKVClientMtlsParams: ManhattanKVClientMtlsParams,
tweetyPieService: TweetService.MethodPerEndpoint,
userTweetGraphPlusService: UserTweetGraphPlus.MethodPerEndpoint,
@Named(ModuleNames.BlueVerifiedAnnotationStore) blueVerifiedAnnotationStore: ReadableStore[
String,
BlueVerifiedAnnotationsV2
],
decider: CrMixerDecider
): ReadableStore[TweetId, TweetInfo] = {
val tweetEngagementScoreStore: ReadableStore[TweetId, TweetEngagementScores] = {
val underlyingStore =
ObservedReadableStore(new ReadableStore[TweetId, TweetEngagementScores] {
override def get(
k: TweetId
): Future[Option[TweetEngagementScores]] = {
userTweetGraphPlusService.tweetEngagementScore(k).map {
Some(_)
}
}
})(statsReceiver.scope("UserTweetGraphTweetEngagementScoreStore"))
DeciderableReadableStore(
underlyingStore,
decider.deciderGateBuilder.idGate(
DeciderKey.enableUtgRealTimeTweetEngagementScoreDeciderKey),
statsReceiver.scope("UserTweetGraphTweetEngagementScoreStore")
)
}
val tweetHealthModelStore: ReadableStore[TweetId, TweetHealthScores] = {
val underlyingStore = TweetHealthModelStore.buildReadableStore(
stratoClient,
Some(
TweetHealthModelStoreConfig(
enablePBlock = true,
enableToxicity = true,
enablePSpammy = true,
enablePReported = true,
enableSpammyTweetContent = true,
enablePNegMultimodal = true,
))
)(statsReceiver.scope("UnderlyingTweetHealthModelStore"))
DeciderableReadableStore(
ObservedMemcachedReadableStore.fromCacheClient(
backingStore = underlyingStore,
cacheClient = crMixerUnifiedCacheClient,
ttl = 2.hours
)(
valueInjection = BinaryScalaCodec(TweetHealthScores),
statsReceiver = statsReceiver.scope("memCachedTweetHealthModelStore"),
keyToString = { k: TweetId => s"tHMS/$k" }
),
decider.deciderGateBuilder.idGate(DeciderKey.enableHealthSignalsScoreDeciderKey),
statsReceiver.scope("TweetHealthModelStore")
) // use s"tHMS/$k" instead of s"tweetHealthModelStore/$k" to differentiate from CR cache
}
val userHealthModelStore: ReadableStore[UserId, UserAgathaScores] = {
val underlyingStore = UserHealthModelStore.buildReadableStore(stratoClient)(
statsReceiver.scope("UnderlyingUserHealthModelStore"))
DeciderableReadableStore(
ObservedMemcachedReadableStore.fromCacheClient(
backingStore = underlyingStore,
cacheClient = crMixerUnifiedCacheClient,
ttl = 18.hours
)(
valueInjection = BinaryScalaCodec(UserAgathaScores),
statsReceiver = statsReceiver.scope("memCachedUserHealthModelStore"),
keyToString = { k: UserId => s"uHMS/$k" }
),
decider.deciderGateBuilder.idGate(DeciderKey.enableUserAgathaScoreDeciderKey),
statsReceiver.scope("UserHealthModelStore")
)
}
val userMediaRepresentationHealthStore: ReadableStore[UserId, UserMediaRepresentationScores] = {
val underlyingStore =
UserMediaRepresentationHealthStore.buildReadableStore(
manhattanKVClientMtlsParams,
statsReceiver.scope("UnderlyingUserMediaRepresentationHealthStore")
)
DeciderableReadableStore(
ObservedMemcachedReadableStore.fromCacheClient(
backingStore = underlyingStore,
cacheClient = crMixerUnifiedCacheClient,
ttl = 12.hours
)(
valueInjection = BinaryScalaCodec(UserMediaRepresentationScores),
statsReceiver = statsReceiver.scope("memCacheUserMediaRepresentationHealthStore"),
keyToString = { k: UserId => s"uMRHS/$k" }
),
decider.deciderGateBuilder.idGate(DeciderKey.enableUserMediaRepresentationStoreDeciderKey),
statsReceiver.scope("UserMediaRepresentationHealthStore")
)
}
val magicRecsRealTimeAggregatesStore: ReadableStore[
TweetId,
MagicRecsRealTimeAggregatesScores
] = {
val underlyingStore =
MagicRecsRealTimeAggregatesStore.buildReadableStore(
serviceIdentifier,
statsReceiver.scope("UnderlyingMagicRecsRealTimeAggregatesScores")
)
DeciderableReadableStore(
underlyingStore,
decider.deciderGateBuilder.idGate(DeciderKey.enableMagicRecsRealTimeAggregatesStore),
statsReceiver.scope("MagicRecsRealTimeAggregatesStore")
)
}
val tweetInfoStore: ReadableStore[TweetId, TweetInfo] = {
val underlyingStore = TweetInfoStore(
TweetyPieFieldsStore.getStoreFromTweetyPie(tweetyPieService),
userMediaRepresentationHealthStore,
magicRecsRealTimeAggregatesStore,
tweetEngagementScoreStore,
blueVerifiedAnnotationStore
)(statsReceiver.scope("tweetInfoStore"))
val memcachedStore = ObservedMemcachedReadableStore.fromCacheClient(
backingStore = underlyingStore,
cacheClient = crMixerUnifiedCacheClient,
ttl = 15.minutes,
// Hydrating tweetInfo is now a required step for all candidates,
// hence we needed to tune these thresholds.
asyncUpdate = serviceIdentifier.environment == "prod"
)(
valueInjection = BinaryScalaCodec(TweetInfo),
statsReceiver = statsReceiver.scope("memCachedTweetInfoStore"),
keyToString = { k: TweetId => s"tIS/$k" }
)
ObservedCachedReadableStore.from(
memcachedStore,
ttl = 15.minutes,
maxKeys = 8388607, // Check TweetInfo definition. size~92b. Around 736 MB
windowSize = 10000L,
cacheName = "tweet_info_cache",
maxMultiGetSize = 20
)(statsReceiver.scope("inMemoryCachedTweetInfoStore"))
}
tweetInfoStore
}