in tweetypie/server/src/main/scala/com/twitter/tweetypie/config/LogicalRepositories.scala [286:805]
def repoConfig[K, V](name: String, repo: K => Stitch[V]): RepoConfig[K, V] =
new RepoConfig[K, V](
name = name,
toRepo = repo,
stats = repoStats.scope(name),
memcachedClientWithInProcessCaching = caches.memcachedClientWithInProcessCaching)
def repo2Config[K, C, V](name: String, repo: (K, C) => Stitch[V]): RepoConfig[(K, C), V] =
repoConfig[(K, C), V](name, repo.tupled)
new LogicalRepositories {
// the final tweetResultRepo has a circular dependency, where it depends on hydrators
// that in turn depend on the tweetResultRepo, so we create a `tweetResultRepo` function
// that proxies to `var finalTweetResultRepo`, which gets set at the end of this block.
var finalTweetResultRepo: TweetResultRepository.Type = null
val tweetResultRepo: TweetResultRepository.Type =
(tweetId, opts) => finalTweetResultRepo(tweetId, opts)
val tweetRepo: TweetRepository.Type = TweetRepository.fromTweetResult(tweetResultRepo)
val optionalTweetRepo: TweetRepository.Optional = TweetRepository.optional(tweetRepo)
val userRepo: UserRepository.Type =
repo2Config(repo = external.userRepo, name = "user")
.observe()
.toRepo2
val optionalUserRepo: UserRepository.Optional = UserRepository.optional(userRepo)
private[this] val tweetVisibilityStatsReceiver: StatsReceiver =
repoStats.scope("tweet_visibility_library")
private[this] val userUnavailableVisibilityStatsReceiver: StatsReceiver =
repoStats.scope("user_unavailable_visibility_library")
private[this] val quotedTweetVisibilityStatsReceiver: StatsReceiver =
repoStats.scope("quoted_tweet_visibility_library")
private[this] val deletedTweetVisibilityStatsReceiver: StatsReceiver =
repoStats.scope("deleted_tweet_visibility_library")
// TweetVisibilityLibrary still uses the old c.t.logging.Logger
private[this] val tweetVisibilityLogger =
com.twitter.logging.Logger("com.twitter.tweetypie.TweetVisibility")
private[this] val visibilityDecider: Decider = DeciderUtil.mkDecider(
deciderOverlayPath = settings.vfDeciderOverlayFilename,
useLocalDeciderOverrides = true)
private[this] val visibilityDeciderGates = VisibilityDeciderGates(visibilityDecider)
private[this] def visibilityLibrary(statsReceiver: StatsReceiver) = VisibilityLibrary
.Builder(
log = tweetVisibilityLogger,
statsReceiver = statsReceiver,
memoizeSafetyLevelParams = visibilityDeciderGates.enableMemoizeSafetyLevelParams
)
.withDecider(visibilityDecider)
.withDefaultABDecider(isLocal = false)
.withCaptureDebugStats(Gate.True)
.withEnableComposableActions(Gate.True)
.withEnableFailClosed(Gate.True)
.withEnableShortCircuiting(visibilityDeciderGates.enableShortCircuitingTVL)
.withSpecialLogging(visibilityDeciderGates.enableSpecialLogging)
.build()
def countryNameGenerator(statsReceiver: StatsReceiver) = {
// TweetVisibilityLibrary, DeletedTweetVisibilityLibrary, and
// UserUnavailableVisibilityLibrary do not evaluate any Rules
// that require the display of country names in copy
CountryNameGenerator.providesWithCustomMap(Map.empty, statsReceiver)
}
def tombstoneGenerator(
countryNameGenerator: CountryNameGenerator,
statsReceiver: StatsReceiver
) =
TombstoneGenerator(
visibilityLibrary(statsReceiver).visParams,
countryNameGenerator,
statsReceiver)
private[this] val userUnavailableVisibilityLibrary =
UserUnavailableStateVisibilityLibrary(
visibilityLibrary(userUnavailableVisibilityStatsReceiver),
visibilityDecider,
tombstoneGenerator(
countryNameGenerator(userUnavailableVisibilityStatsReceiver),
userUnavailableVisibilityStatsReceiver
),
LocalizedInterstitialGenerator(visibilityDecider, userUnavailableVisibilityStatsReceiver)
)
val userIdentityRepo: UserIdentityRepository.Type =
repoConfig(repo = UserIdentityRepository(userRepo), name = "user_identity")
.observe()
.toRepo
val userProtectionRepo: UserProtectionRepository.Type =
repoConfig(repo = UserProtectionRepository(userRepo), name = "user_protection")
.observe()
.toRepo
val userViewRepo: UserViewRepository.Type =
repoConfig(repo = UserViewRepository(userRepo), name = "user_view")
.observe()
.toRepo
val userVisibilityRepo: UserVisibilityRepository.Type =
repoConfig(
repo = UserVisibilityRepository(userRepo, userUnavailableVisibilityLibrary),
name = "user_visibility"
).observe().toRepo
val urlRepo: UrlRepository.Type =
repoConfig(repo = external.urlRepo, name = "url")
.observe()
.toRepo
val profileGeoRepo: ProfileGeoRepository.Type =
repoConfig(repo = external.profileGeoRepo, name = "profile_geo")
.observe()
.toRepo
val quoterHasAlreadyQuotedRepo: QuoterHasAlreadyQuotedRepository.Type =
repo2Config(repo = external.quoterHasAlreadyQuotedRepo, name = "quoter_has_already_quoted")
.observe()
.toRepo2
val lastQuoteOfQuoterRepo: LastQuoteOfQuoterRepository.Type =
repo2Config(repo = external.lastQuoteOfQuoterRepo, name = "last_quote_of_quoter")
.observe()
.toRepo2
val mediaMetadataRepo: MediaMetadataRepository.Type =
repoConfig(repo = external.mediaMetadataRepo, name = "media_metadata")
.observe()
.toRepo
val perspectiveRepo: PerspectiveRepository.Type =
repoConfig(repo = external.perspectiveRepo, name = "perspective")
.observe()
.toRepo
val conversationMutedRepo: ConversationMutedRepository.Type =
TimelineService.GetPerspectives.getConversationMuted(perspectiveRepo)
// Because observe is applied before caching, only cache misses
// (i.e. calls to the underlying repo) are observed.
// Note that `newCaching` has stats around cache hit/miss but `caching` does not.
val deviceSourceRepo: DeviceSourceRepository.Type =
repoConfig(repo = external.deviceSourceRepo, name = "device_source")
.observe()
.newCaching(
keySerializer = appIdStr => DeviceSourceKey(appIdStr).toString,
valueSerializer = ServoCachedValueSerializer(
codec = DeviceSource,
expiry = Expiry.byAge(settings.deviceSourceMemcacheTtl),
softTtl = settings.deviceSourceMemcacheSoftTtl
)
)
.caching(
cache = caches.deviceSourceInProcessCache,
partialHandler = softTtlPartialHandler(_ => settings.deviceSourceInProcessSoftTtl)
)
.toRepo
// Because observe is applied before caching, only cache misses
// (i.e. calls to the underlying repo) are observed
// Note that `newCaching` has stats around cache hit/miss but `caching` does not.
val placeRepo: PlaceRepository.Type =
repoConfig(repo = external.placeRepo, name = "place")
.observe()
.newCaching(
keySerializer = placeKey => placeKey.toString,
valueSerializer = ServoCachedValueSerializer(
codec = Place,
expiry = Expiry.byAge(settings.placeMemcacheTtl),
softTtl = settings.placeMemcacheSoftTtl
)
)
.toRepo
val cardRepo: CardRepository.Type =
repoConfig(repo = external.cardRepo, name = "cards")
.observe()
.toRepo
val card2Repo: Card2Repository.Type =
repo2Config(repo = external.card2Repo, name = "card2")
.observe()
.toRepo2
val cardUsersRepo: CardUsersRepository.Type =
repo2Config(repo = external.cardUsersRepo, name = "card_users")
.observe()
.toRepo2
val relationshipRepo: RelationshipRepository.Type =
repoConfig(repo = external.relationshipRepo, name = "relationship")
.observe()
.toRepo
val conversationIdRepo: ConversationIdRepository.Type =
repoConfig(repo = external.conversationIdRepo, name = "conversation_id")
.observe()
.toRepo
val conversationControlRepo: ConversationControlRepository.Type =
repo2Config(
repo = ConversationControlRepository(tweetRepo, stats.scope("conversation_control")),
name = "conversation_control"
).observe().toRepo2
val containerAsGetTweetResultRepo: CreativesContainerMaterializationRepository.GetTweetType =
repo2Config(
repo = external.containerAsTweetRepo,
name = "container_as_tweet"
).observe().toRepo2
val containerAsGetTweetFieldsResultRepo: CreativesContainerMaterializationRepository.GetTweetFieldsType =
repo2Config(
repo = external.containerAsTweetFieldsRepo,
name = "container_as_tweet_fields"
).observe().toRepo2
val languageRepo: LanguageRepository.Type = {
val pool = FuturePool(Executors.newFixedThreadPool(settings.numPenguinThreads))
repoConfig(repo = PenguinLanguageRepository(pool), name = "language")
.observe()
.toRepo
}
// Because observe is applied before caching, only cache misses
// (i.e. calls to the underlying repo) are observed
// Note that `newCaching` has stats around cache hit/miss but `caching` does not.
val tweetCountsRepo: TweetCountsRepository.Type =
repoConfig(repo = external.tweetCountsRepo, name = "counts")
.observe()
.caching(
cache = caches.tweetCountsCache,
partialHandler = softTtlPartialHandler {
case Some(0) => settings.tweetCountsMemcacheZeroSoftTtl
case _ => settings.tweetCountsMemcacheNonZeroSoftTtl
},
maxCacheRequestSize = settings.tweetCountsCacheChunkSize
)
.toRepo
val pastedMediaRepo: PastedMediaRepository.Type =
repo2Config(repo = PastedMediaRepository(tweetRepo), name = "pasted_media")
.observe()
.toRepo2
val escherbirdAnnotationRepo: EscherbirdAnnotationRepository.Type =
repoConfig(repo = external.escherbirdAnnotationRepo, name = "escherbird_annotations")
.observe()
.toRepo
val stratoSafetyLabelsRepo: StratoSafetyLabelsRepository.Type =
repo2Config(repo = external.stratoSafetyLabelsRepo, name = "strato_safety_labels")
.observe()
.toRepo2
val stratoCommunityMembershipRepo: StratoCommunityMembershipRepository.Type =
repoConfig(
repo = external.stratoCommunityMembershipRepo,
name = "strato_community_memberships")
.observe()
.toRepo
val stratoCommunityAccessRepo: StratoCommunityAccessRepository.Type =
repoConfig(repo = external.stratoCommunityAccessRepo, name = "strato_community_access")
.observe()
.toRepo
val stratoSuperFollowEligibleRepo: StratoSuperFollowEligibleRepository.Type =
repoConfig(
repo = external.stratoSuperFollowEligibleRepo,
name = "strato_super_follow_eligible")
.observe()
.toRepo
val stratoSuperFollowRelationsRepo: StratoSuperFollowRelationsRepository.Type =
repo2Config(
repo = external.stratoSuperFollowRelationsRepo,
name = "strato_super_follow_relations")
.observe()
.toRepo2
val stratoPromotedTweetRepo: StratoPromotedTweetRepository.Type =
repoConfig(repo = external.stratoPromotedTweetRepo, name = "strato_promoted_tweet")
.observe()
.toRepo
val stratoSubscriptionVerificationRepo: StratoSubscriptionVerificationRepository.Type =
repo2Config(
repo = external.stratoSubscriptionVerificationRepo,
name = "strato_subscription_verification")
.observe()
.toRepo2
val unmentionedEntitiesRepo: UnmentionedEntitiesRepository.Type =
repo2Config(repo = external.unmentionedEntitiesRepo, name = "unmentioned_entities")
.observe()
.toRepo2
private[this] val userSource =
UserSource.fromRepo(
Repo { (k, _) =>
val opts = UserQueryOptions(k.fields, UserVisibility.All)
userRepo(UserKey(k.id), opts)
}
)
private[this] val userRelationshipSource =
UserRelationshipSource.fromRepo(
Repo[UserRelationshipSource.Key, Unit, Boolean] { (key, _) =>
relationshipRepo(
RelationshipKey(key.subjectId, key.objectId, key.relationship)
)
}
)
private[this] val tweetPerspectiveSource =
TweetPerspectiveSource.fromGetPerspectives(perspectiveRepo)
private[this] val tweetMediaMetadataSource =
TweetMediaMetadataSource.fromFunction(mediaMetadataRepo)
val userIsInvitedToConversationRepo: UserIsInvitedToConversationRepository.Type =
repo2Config(
repo = external.userIsInvitedToConversationRepo,
name = "user_is_invited_to_conversation")
.observe()
.toRepo2
private[this] val stringCenterClient: MultiProjectStringCenter = {
val stringCenterProjects = settings.flags.stringCenterProjects().toList
val languages: Languages = new YamlConfigLanguages(
new YamlConfig(settings.flags.languagesConfig()))
val loggingAbDecider = ABDeciderFactory("/usr/local/config/abdecider/abdecider.yml")
.withEnvironment("production")
.buildWithLogging()
MultiProjectStringCenter(
projects = stringCenterProjects,
defaultBundlePath = MultiProjectStringCenter.StandardDefaultBundlePath,
refreshingBundlePath = MultiProjectStringCenter.StandardRefreshingBundlePath,
refreshingInterval = MultiProjectStringCenter.StandardRefreshingInterval,
requireDefaultBundleExists = true,
languages = languages,
statsReceiver = tweetVisibilityStatsReceiver,
loggingABDecider = loggingAbDecider
)
}
private[this] val stringRegistry: ExternalStringRegistry = new ExternalStringRegistry()
private[this] val localizationSource: LocalizationSource =
LocalizationSource.fromMultiProjectStringCenterClient(stringCenterClient, stringRegistry)
val tweetVisibilityRepo: TweetVisibilityRepository.Type = {
val tweetVisibilityLibrary: TweetVisibilityLibrary.Type =
TweetVisibilityLibrary(
visibilityLibrary(tweetVisibilityStatsReceiver),
userSource = userSource,
userRelationshipSource = userRelationshipSource,
keywordMatcher = KeywordMatcher.defaultMatcher(stats),
stratoClient = stratoClient,
localizationSource = localizationSource,
decider = visibilityDecider,
invitedToConversationRepo = userIsInvitedToConversationRepo,
tweetPerspectiveSource = tweetPerspectiveSource,
tweetMediaMetadataSource = tweetMediaMetadataSource,
tombstoneGenerator = tombstoneGenerator(
countryNameGenerator(tweetVisibilityStatsReceiver),
tweetVisibilityStatsReceiver
),
interstitialGenerator =
LocalizedInterstitialGenerator(visibilityDecider, tweetVisibilityStatsReceiver),
limitedActionsFeatureSwitches =
FeatureSwitchUtil.mkLimitedActionsFeatureSwitches(tweetVisibilityStatsReceiver),
enableParityTest = deciderGates.tweetVisibilityLibraryEnableParityTest
)
val underlying =
TweetVisibilityRepository(
tweetVisibilityLibrary,
visibilityDeciderGates,
tweetVisibilityLogger,
repoStats.scope("tweet_visibility_repo")
)
repoConfig(repo = underlying, name = "tweet_visibility")
.observe()
.toRepo
}
val quotedTweetVisibilityRepo: QuotedTweetVisibilityRepository.Type = {
val quotedTweetVisibilityLibrary: QuotedTweetVisibilityLibrary.Type =
QuotedTweetVisibilityLibrary(
visibilityLibrary(quotedTweetVisibilityStatsReceiver),
userSource = userSource,
userRelationshipSource = userRelationshipSource,
visibilityDecider,
userStateVisibilityLibrary = userUnavailableVisibilityLibrary,
enableVfFeatureHydration = deciderGates.enableVfFeatureHydrationInQuotedTweetVLShim
)
val underlying =
QuotedTweetVisibilityRepository(quotedTweetVisibilityLibrary, visibilityDeciderGates)
repoConfig(repo = underlying, name = "quoted_tweet_visibility")
.observe()
.toRepo
}
val deletedTweetVisibilityRepo: DeletedTweetVisibilityRepository.Type = {
val deletedTweetVisibilityLibrary: DeletedTweetVisibilityLibrary.Type =
DeletedTweetVisibilityLibrary(
visibilityLibrary(deletedTweetVisibilityStatsReceiver),
visibilityDecider,
tombstoneGenerator(
countryNameGenerator(deletedTweetVisibilityStatsReceiver),
deletedTweetVisibilityStatsReceiver
)
)
val underlying = DeletedTweetVisibilityRepository.apply(
deletedTweetVisibilityLibrary
)
repoConfig(repo = underlying, name = "deleted_tweet_visibility")
.observe()
.toRepo
}
val takedownRepo: UserTakedownRepository.Type =
repoConfig(repo = UserTakedownRepository(userRepo), name = "takedowns")
.observe()
.toRepo
val tweetSpamCheckRepo: TweetSpamCheckRepository.Type =
repo2Config(repo = external.tweetSpamCheckRepo, name = "tweet_spam_check")
.observe()
.toRepo2
val retweetSpamCheckRepo: RetweetSpamCheckRepository.Type =
repoConfig(repo = external.retweetSpamCheckRepo, name = "retweet_spam_check")
.observe()
.toRepo
// Because observe is applied before caching, only cache misses
// (i.e. calls to the underlying repo) are observed
// Note that `newCaching` has stats around cache hit/miss but `caching` does not.
val geoScrubTimestampRepo: GeoScrubTimestampRepository.Type =
repoConfig(repo = external.geoScrubTimestampRepo, name = "geo_scrub")
.observe()
.caching(
cache = caches.geoScrubCache,
partialHandler = (_ => None)
)
.toRepo
val tweetHydrators: TweetHydrators =
TweetHydrators(
stats = stats,
deciderGates = deciderGates,
repos = this,
tweetDataCache = caches.tweetDataCache,
hasMedia = hasMedia,
featureSwitchesWithoutExperiments = featureSwitchesWithoutExperiments,
clientIdHelper = clientIdHelper,
)
val queryOptionsExpander: TweetQueryOptionsExpander.Type =
TweetQueryOptionsExpander.threadLocalMemoize(
TweetQueryOptionsExpander.expandDependencies
)
// mutations to tweets that we only need to apply when reading from the external
// repository, and not when reading from cache
val tweetMutation: Mutation[Tweet] =
Mutation
.all(
Seq(
EntityExtractor.mutationAll,
TextRepairer.BlankLineCollapser,
TextRepairer.CoreTextBugPatcher
)
).onlyIf(_.coreData.isDefined)
val cachingTweetRepo: TweetResultRepository.Type =
repo2Config(repo = external.tweetResultRepo, name = "saved_tweet")
.observe()
.withMiddleware { repo =>
// applies tweetMutation to the results of TweetResultRepository
val mutateResult = TweetResult.mutate(tweetMutation)
repo.andThen(stitchResult => stitchResult.map(mutateResult))
}
.withMiddleware(
tupledMiddleware(
CachingTweetRepository(
caches.tweetResultCache,
settings.tweetTombstoneTtl,
stats.scope("saved_tweet", "cache"),
clientIdHelper,
deciderGates.logCacheExceptions,
)
)
)
.toRepo2
finalTweetResultRepo = repo2Config(repo = cachingTweetRepo, name = "tweet")
.withMiddleware(
tupledMiddleware(
TweetHydration.hydrateRepo(
tweetHydrators.hydrator,
tweetHydrators.cacheChangesEffect,
queryOptionsExpander
)
)
)
.observe()
.withMiddleware(tupledMiddleware(TweetResultRepository.shortCircuitInvalidIds))
.toRepo2
}