in tweetypie/server/src/main/scala/com/twitter/tweetypie/config/BackendClients.scala [198:795]
def apply(
settings: TweetServiceSettings,
deciderGates: TweetypieDeciderGates,
statsReceiver: StatsReceiver,
hostStatsReceiver: StatsReceiver,
timer: Timer,
clientIdHelper: ClientIdHelper,
): BackendClients = {
val thriftClientId = settings.thriftClientId
val tracer = DefaultTracer
val env = settings.env.toString
val zone = settings.zone
val log = Logger(getClass)
val backendsScope = statsReceiver.scope("backends")
/** a Seq builder of finagle.Names loaded via getName */
val referencedNamesBuilder = Seq.newBuilder[Name]
/** the default set of exceptions we believe are safe for Tweetypie to retry */
val defaultResponseClassifier: ResponseClassifier =
ResponseClassifier.RetryOnChannelClosed.orElse(ResponseClassifier.RetryOnTimeout)
/**
* Resolve a string into a Finagle Name and record it
* in referencedNames.
*/
def eval(address: String): Name = {
val name = Resolver.eval(address)
referencedNamesBuilder += name
name
}
def backendContext(name: String) =
Backend.Context(timer, backendsScope.scope(name))
// by default, retries on most exceptions (see defaultRetryExceptions). if an rpc is not
// idempotent, it should use a different retry policy.
def clientBuilder(name: String) = {
ClientBuilder()
.name(name)
.reportTo(statsReceiver)
.reportHostStats(hostStatsReceiver)
.tracer(tracer)
.daemon(true)
.tcpConnectTimeout(defaultTcpConnectTimeout)
.connectTimeout(defaultConnectTimeout)
.retryPolicy(retry())
}
def thriftMuxClientBuilder(name: String, address: String, clazz: Class[_]) = {
clientBuilder(name)
.stack(
ThriftMux.client
.withClientId(thriftClientId)
.withOpportunisticTls(OpportunisticTls.Required)
.withServiceClass(clazz))
.loadBalancer(balancer())
.dest(eval(address))
.mutualTls(settings.serviceIdentifier)
}
// Our base ThriftMux.Client
// Prefer using thriftMuxMethodBuilder below but
// can be used to build custom clients (re: darkTrafficClient)
def thriftMuxClient(name: String, propagateDeadlines: Boolean = true): ThriftMux.Client = {
ThriftMux.client
.withClientId(thriftClientId)
.withLabel(name)
.withStatsReceiver(statsReceiver)
.withTracer(tracer)
.withTransport.connectTimeout(defaultTcpConnectTimeout)
.withSession.acquisitionTimeout(defaultConnectTimeout)
.withMutualTls(settings.serviceIdentifier)
.withOpportunisticTls(OpportunisticTls.Required)
.configured(PropagateDeadlines(enabled = propagateDeadlines))
}
// If an endpoint is non-idempotent you should add .nonidempotent and
// leave off any ResponseClassifiers (it will remove any placed before but not after)
// If it is unequivocally idempotent you should add .idempotent and
// leave off any ResponseClassifiers (it will retry on all Throws). This will also
// enable backup requests
def thriftMuxMethodBuilder(
name: String,
dest: String,
): MethodBuilder = {
thriftMuxClient(name)
.withLoadBalancer(balancer(minAperture = 2))
.methodBuilder(dest)
.withRetryForClassifier(defaultResponseClassifier)
.withTimeoutTotal(2.seconds) // total timeout including 1st attempt and up to 2 retries
}
def balancer(minAperture: Int = 2) = Balancers.aperture(minAperture = minAperture)
val eventBusPublisherBuilder =
EventBusPublisherBuilder()
.dest(eval("/s/eventbus/provisioning"))
.clientId(settings.thriftClientId)
// eventbus stats are further scoped by stream, so put all
// publishers under the same stats namespace
.statsReceiver(backendsScope.scope("event_bus"))
// This makes the underlying kps-client to be resolved over WilyNs vs DNS
.serviceIdentifier(settings.serviceIdentifier)
new BackendClients {
def referencedNames: Seq[Name] = referencedNamesBuilder.result()
val memcacheClient: memcached.Client =
Memcached.client
.withMutualTls(settings.serviceIdentifier)
.connectionsPerEndpoint(2)
.configured(param.KeyHasher(KeyHasher.FNV1_32))
.configured(Transporter.ConnectTimeout(100.milliseconds))
.configured(TimeoutFilter.Param(200.milliseconds))
.configured(TimeoutFactory.Param(200.milliseconds))
.configured(param.EjectFailedHost(false))
.configured(FailureAccrualFactory.Param(numFailures = 20, markDeadFor = 30.second))
.configured(
PendingRequestFilter.Param(limit = Some(settings.cacheClientPendingRequestLimit))
)
.filtered(new MemcacheExceptionLoggingFilter)
.newRichClient(dest = eval(settings.twemcacheDest), label = "memcache")
/* clients */
val tweetStorageClient: TweetStorageClient =
Manhattan.fromClient(
new ManhattanTweetStorageClient(
settings.tweetStorageConfig,
statsReceiver = backendsScope.scope("tweet_storage"),
clientIdHelper = clientIdHelper,
)
)
val socialGraphService: SocialGraphService = {
val finagleClient =
new SocialGraphService$FinagleClient(
thriftMuxClientBuilder(
"socialgraph",
"/s/socialgraph/socialgraph",
classOf[SocialGraphScroogeIface.MethodPerEndpoint]
).loadBalancer(Balancers.aperturePeakEwma(minAperture = 16))
.build()
)
settings.socialGraphSeviceConfig(
SocialGraphService.fromClient(finagleClient),
backendContext("socialgraph")
)
}
val tflockClient =
new FlockDB.FinagledClient(
thriftMuxClientBuilder("tflock", "/s/tflock/tflock", classOf[FlockDB.MethodPerEndpoint])
.loadBalancer(balancer(minAperture = 5))
.responseClassifier(FlockResponse.classifier)
.build(),
serviceName = "tflock",
stats = statsReceiver
)
val tflockReadClient: TFlockClient =
settings.tflockReadConfig(tflockClient, backendContext("tflock"))
val tflockWriteClient: TFlockClient =
settings.tflockWriteConfig(tflockClient, backendContext("tflock"))
val gizmoduck: Gizmoduck = {
val clientBuilder =
thriftMuxClientBuilder(
"gizmoduck",
"/s/gizmoduck/gizmoduck",
classOf[UserService.MethodPerEndpoint])
.loadBalancer(balancer(minAperture = 63))
val mb = MethodBuilder
.from(clientBuilder)
.idempotent(maxExtraLoad = 1.percent)
.servicePerEndpoint[UserService.ServicePerEndpoint]
val gizmoduckClient = ThriftMux.Client.methodPerEndpoint(mb)
settings.gizmoduckConfig(Gizmoduck.fromClient(gizmoduckClient), backendContext("gizmoduck"))
}
val merlin: UserRolesService.MethodPerEndpoint = {
val thriftClient = thriftMuxMethodBuilder("merlin", "/s/merlin/merlin")
.withTimeoutPerRequest(100.milliseconds)
.withTimeoutTotal(400.milliseconds)
.idempotent(0.01)
.servicePerEndpoint[UserRolesService.ServicePerEndpoint]
ThriftMux.Client.methodPerEndpoint(thriftClient)
}
val talon: Talon = {
val talonClient =
new Talon$FinagleClient(
thriftMuxClientBuilder(
"talon",
"/s/talon/backend",
classOf[TalonScroogeIface.MethodPerEndpoint])
.build()
)
settings.talonConfig(Talon.fromClient(talonClient), backendContext("talon"))
}
val guano = Guano()
val mediaInfoService: MediaInfoService = {
val finagleClient =
new MediaInfoService$FinagleClient(
thriftMuxClientBuilder(
"mediainfo",
"/s/photurkey/mediainfo",
classOf[MediaInfoScroogeIface.MethodPerEndpoint])
.loadBalancer(balancer(minAperture = 75))
.build()
)
settings.mediaInfoServiceConfig(
MediaInfoService.fromClient(finagleClient),
backendContext("mediainfo")
)
}
val userImageService: UserImageService = {
val finagleClient =
new UserImageService$FinagleClient(
thriftMuxClientBuilder(
"userImage",
"/s/user-image-service/uis",
classOf[UserImageScroogeIface.MethodPerEndpoint])
.build()
)
settings.userImageServiceConfig(
UserImageService.fromClient(finagleClient),
backendContext("userImage")
)
}
val mediaClient: MediaClient =
MediaClient.fromBackends(
userImageService = userImageService,
mediaInfoService = mediaInfoService
)
val timelineService: TimelineService = {
val timelineServiceClient =
new tls.TimelineService$FinagleClient(
thriftMuxClientBuilder(
"timelineService",
"/s/timelineservice/timelineservice",
classOf[tls.TimelineService.MethodPerEndpoint])
.loadBalancer(balancer(minAperture = 13))
.build()
)
settings.timelineServiceConfig(
TimelineService.fromClient(timelineServiceClient),
backendContext("timelineService")
)
}
val expandodo: Expandodo = {
val cardsServiceClient =
new CardsService$FinagleClient(
thriftMuxClientBuilder(
"expandodo",
"/s/expandodo/server",
classOf[CardsScroogeIface.MethodPerEndpoint])
.loadBalancer(balancer(minAperture = 6))
.build()
)
settings.expandodoConfig(
Expandodo.fromClient(cardsServiceClient),
backendContext("expandodo")
)
}
val creativesContainerService: CreativesContainerService = {
val mb = thriftMuxMethodBuilder(
"creativesContainerService",
"/s/creatives-container/creatives-container",
).withTimeoutTotal(300.milliseconds)
.idempotent(maxExtraLoad = 1.percent)
.servicePerEndpoint[ccs.CreativesContainerService.ServicePerEndpoint]
settings.creativesContainerServiceConfig(
CreativesContainerService.fromClient(ccs.CreativesContainerService.MethodPerEndpoint(mb)),
backendContext("creativesContainerService")
)
}
val scarecrow: Scarecrow = {
val scarecrowClient = new ScarecrowService$FinagleClient(
thriftMuxClientBuilder(
"scarecrow",
"/s/abuse/scarecrow",
classOf[ScarecrowScroogeIface.MethodPerEndpoint])
.loadBalancer(balancer(minAperture = 6))
.build(),
serviceName = "scarecrow",
stats = statsReceiver
)
settings.scarecrowConfig(Scarecrow.fromClient(scarecrowClient), backendContext("scarecrow"))
}
val snowflakeClient: Snowflake.MethodPerEndpoint = {
eval("/s/snowflake/snowflake") // eagerly resolve the serverset
val mb = thriftMuxMethodBuilder(
"snowflake",
"/s/snowflake/snowflake"
).withTimeoutTotal(300.milliseconds)
.withTimeoutPerRequest(100.milliseconds)
.idempotent(maxExtraLoad = 1.percent)
SnowflakeClient.snowflakeClient(mb)
}
val deferredRpcClient =
new DeferredRPC.FinagledClient(
thriftMuxClientBuilder(
"deferredrpc",
"/s/kafka-shared/krpc-server-main",
classOf[DeferredRPC.MethodPerEndpoint])
.requestTimeout(200.milliseconds)
.retryPolicy(retry(timeouts = 3))
.build(),
serviceName = "deferredrpc",
stats = statsReceiver
)
def deferredTweetypie(target: Target): ThriftTweetService = {
// When deferring back to the local datacenter, preserve the finagle
// context and dtabs. This will ensure that developer dtabs are honored
// and that context is preserved in eventbus. (eventbus enqueues only
// happen in async requests within the same datacenter.)
//
// Effectively, this means we consider deferredrpc requests within the
// same datacenter to be part of the same request, but replicated
// requests are not.
val isLocal: Boolean = target.datacenter == Datacenter.Local
val deferredThriftService: Service[ThriftClientRequest, Array[Byte]] =
new DeferredThriftService(
deferredRpcClient,
target,
serializeFinagleContexts = isLocal,
serializeFinagleDtabs = isLocal
)
new TweetServiceInternal$FinagleClient(deferredThriftService)
}
val replicationClient: ThriftTweetService =
deferredTweetypie(Target(Datacenter.AllOthers, "tweetypie-replication"))
// used for read endpoints replication
val lowQoSReplicationClients: Seq[GatedReplicationClient] = {
val rampUpGate = Gate.linearRampUp(Time.now, settings.forkingRampUp)
// Gates to avoid sending replicated reads from a cluster to itself
val inATLA = if (settings.zone == "atla") Gate.True else Gate.False
val inPDXA = if (settings.zone == "pdxa") Gate.True else Gate.False
Seq(
GatedReplicationClient(
client = deferredTweetypie(Target(Datacenter.Atla, "tweetypie-lowqos")),
gate = rampUpGate & deciderGates.replicateReadsToATLA & !inATLA
),
GatedReplicationClient(
client = deferredTweetypie(Target(Datacenter.Pdxa, "tweetypie-lowqos")),
gate = rampUpGate & deciderGates.replicateReadsToPDXA & !inPDXA
)
)
}
// used for async operations in the write path
val asyncTweetService: ThriftTweetService =
deferredTweetypie(Target(Datacenter.Local, "tweetypie"))
// used to trigger asyncEraseUserTweetsRequest
val asyncTweetDeletionService: ThriftTweetService =
deferredTweetypie(Target(Datacenter.Local, "tweetypie-retweet-deletion"))
// used for async retries
val asyncRetryTweetService: ThriftTweetService =
deferredTweetypie(Target(Datacenter.Local, "tweetypie-async-retry"))
val darkTrafficClient: Service[Array[Byte], Array[Byte]] = {
val thriftService =
thriftMuxClient(
"tweetypie.dark",
propagateDeadlines = false
).withRequestTimeout(100.milliseconds)
.newService("/s/tweetypie/proxy")
val transformer =
new Filter[Array[Byte], Array[Byte], ThriftClientRequest, Array[Byte]] {
override def apply(
request: Array[Byte],
service: Service[ThriftClientRequest, Array[Byte]]
): Future[Array[Byte]] =
service(new ThriftClientRequest(request, false))
}
transformer andThen thriftService
}
val geoHydrationClient: GeoduckHydration.MethodPerEndpoint = {
val mb = thriftMuxMethodBuilder("geoduck_hydration", "/s/geo/hydration")
.withTimeoutPerRequest(100.millis)
.idempotent(maxExtraLoad = 1.percent)
ThriftMux.Client.methodPerEndpoint(
mb.servicePerEndpoint[GeoduckHydration.ServicePerEndpoint])
}
val geoHydrationLocate: GeoduckLocate = geoHydrationClient.locate
val geoReverseGeocoderClient: ReverseGeocoder.MethodPerEndpoint = {
val mb = thriftMuxMethodBuilder("geoduck_reversegeocoder", "/s/geo/geoduck_reversegeocoder")
.withTimeoutPerRequest(100.millis)
.idempotent(maxExtraLoad = 1.percent)
ThriftMux.Client.methodPerEndpoint(
mb.servicePerEndpoint[ReverseGeocoder.ServicePerEndpoint])
}
val geoduckGeohashLocate: GeoduckGeohashLocate = {
new GeoduckGeohashLocate(
reverseGeocoderClient = geoReverseGeocoderClient,
hydrationClient = geoHydrationClient,
classScopedStatsReceiver = statsReceiver.scope("geo_geohash_locate"))
}
val geoRelevance =
new Relevance$FinagleClient(
thriftMuxClientBuilder(
"geoduck_relevance",
"/s/geo/relevance",
classOf[Relevance.MethodPerEndpoint])
.requestTimeout(100.milliseconds)
.retryPolicy(retry(timeouts = 1))
.build(),
stats = statsReceiver
)
val fanoutServiceClient =
new FanoutService$FinagleClient(
new DeferredThriftService(deferredRpcClient, Target(Datacenter.Local, "fanoutservice")),
serviceName = "fanoutservice",
stats = statsReceiver
)
val limiterService: LimiterService = {
val limiterClient =
new LimiterClientFactory(
name = "limiter",
clientId = thriftClientId,
tracer = tracer,
statsReceiver = statsReceiver,
serviceIdentifier = settings.serviceIdentifier,
opportunisticTlsLevel = OpportunisticTls.Required,
daemonize = true
)(eval("/s/limiter/limiter"))
val limiterBackend = settings.limiterBackendConfig(
LimiterBackend.fromClient(limiterClient),
backendContext("limiter")
)
LimiterService.fromBackend(
limiterBackend.incrementFeature,
limiterBackend.getFeatureUsage,
getAppId,
backendsScope.scope("limiter")
)
}
val passbirdClient =
new PassbirdService$FinagleClient(
thriftMuxClientBuilder(
"passbird",
"/s/passbird/passbird",
classOf[PassbirdService.MethodPerEndpoint])
.requestTimeout(100.milliseconds)
.retryPolicy(retry(timeouts = 1))
.build(),
serviceName = "passbird",
stats = statsReceiver
)
val escherbird: Escherbird = {
val escherbirdClient =
new TweetEntityAnnotationService$FinagleClient(
thriftMuxClientBuilder(
"escherbird",
"/s/escherbird/annotationservice",
classOf[TweetEntityAnnotationScroogeIface.MethodPerEndpoint])
.build()
)
settings.escherbirdConfig(
Escherbird.fromClient(escherbirdClient),
backendContext("escherbird")
)
}
val geoScrubEventStore: GeoScrubEventStore = {
val mhMtlsParams =
if (settings.serviceIdentifier == EmptyServiceIdentifier) NoMtlsParams
else
ManhattanKVClientMtlsParams(
serviceIdentifier = settings.serviceIdentifier,
opportunisticTls = OpportunisticTls.Required)
val mhClient =
new ManhattanKVClient(
appId = "geoduck_scrub_datastore",
dest = "/s/manhattan/omega.native-thrift",
mtlsParams = mhMtlsParams,
label = "mh_omega",
Seq(Experiments.ApertureLoadBalancer)
)
GeoScrubEventStore(
mhClient,
settings.geoScrubEventStoreConfig,
backendContext("geoScrubEventStore")
)
}
val tweetEventsPublisher: EventBusPublisher[TweetEvent] =
eventBusPublisherBuilder
.streamName("tweet_events")
.thriftStruct(TweetEvent)
.publishTimeout(500.milliseconds)
.serializeFinagleDtabs(true)
.build()
val deleteLocationDataPublisher: EventBusPublisher[DeleteLocationData] =
eventBusPublisherBuilder
.streamName("tweetypie_delete_location_data_prod")
.thriftStruct(DeleteLocationData)
// deleteLocationData is relatively rare, and publishing to
// eventbus is all that the endpoint does. This means that it
// is much more likely that we will have to make a connection,
// which has much greater latency, and also makes us more
// tolerant of slow requests, so we choose a long timeout.
.publishTimeout(2.seconds)
.build()
val retweetArchivalEventPublisher: EventBusPublisher[RetweetArchivalEvent] =
eventBusPublisherBuilder
.streamName("retweet_archival_events")
.thriftStruct(RetweetArchivalEvent)
.publishTimeout(500.milliseconds)
.build()
val gnipEnricherator: GnipEnricherator = {
val gnipEnricherator =
thriftMuxMethodBuilder(
"enricherator",
"/s/datadelivery-enrichments/enricherator"
)
GnipEnricherator.fromMethod(gnipEnricherator)
}
val stratoserverClient: StratoClient = Strato.client
.withMutualTls(
serviceIdentifier = settings.serviceIdentifier,
opportunisticLevel = OpportunisticTls.Required)
.withLabel("stratoserver")
.withRequestTimeout(100.milliseconds)
.build()
val configBus: ConfigBus =
ConfigBus(backendsScope.scope("config_bus"), settings.instanceId, settings.instanceCount)
val callbackPromotedContentLogger: CallbackPromotedContentLogger = {
val publisher =
eventBusPublisherBuilder
.streamName(settings.adsLoggingClientTopicName)
.thriftStruct(AdCallbackEvent)
.publishTimeout(500.milliseconds)
.serializeFinagleDtabs(true)
.maxQueuedEvents(1000)
.kafkaDest("/s/kafka/ads-callback:kafka-tls")
.build()
val stats = backendsScope.scope("promoted_content")
val adsLoggingClient = AdsLoggingClient(publisher, stats, "Tweetypie")
new CallbackPromotedContentLogger(adsLoggingClient, stats)
}
}
}