def apply()

in tweetypie/server/src/main/scala/com/twitter/tweetypie/config/BackendClients.scala [198:795]


  def apply(
    settings: TweetServiceSettings,
    deciderGates: TweetypieDeciderGates,
    statsReceiver: StatsReceiver,
    hostStatsReceiver: StatsReceiver,
    timer: Timer,
    clientIdHelper: ClientIdHelper,
  ): BackendClients = {
    val thriftClientId = settings.thriftClientId
    val tracer = DefaultTracer

    val env = settings.env.toString
    val zone = settings.zone
    val log = Logger(getClass)
    val backendsScope = statsReceiver.scope("backends")

    /** a Seq builder of finagle.Names loaded via getName */
    val referencedNamesBuilder = Seq.newBuilder[Name]

    /** the default set of exceptions we believe are safe for Tweetypie to retry */
    val defaultResponseClassifier: ResponseClassifier =
      ResponseClassifier.RetryOnChannelClosed.orElse(ResponseClassifier.RetryOnTimeout)

    /**
     * Resolve a string into a Finagle Name and record it
     * in referencedNames.
     */
    def eval(address: String): Name = {
      val name = Resolver.eval(address)
      referencedNamesBuilder += name
      name
    }

    def backendContext(name: String) =
      Backend.Context(timer, backendsScope.scope(name))

    // by default, retries on most exceptions (see defaultRetryExceptions).  if an rpc is not
    // idempotent, it should use a different retry policy.
    def clientBuilder(name: String) = {
      ClientBuilder()
        .name(name)
        .reportTo(statsReceiver)
        .reportHostStats(hostStatsReceiver)
        .tracer(tracer)
        .daemon(true)
        .tcpConnectTimeout(defaultTcpConnectTimeout)
        .connectTimeout(defaultConnectTimeout)
        .retryPolicy(retry())
    }

    def thriftMuxClientBuilder(name: String, address: String, clazz: Class[_]) = {
      clientBuilder(name)
        .stack(
          ThriftMux.client
            .withClientId(thriftClientId)
            .withOpportunisticTls(OpportunisticTls.Required)
            .withServiceClass(clazz))
        .loadBalancer(balancer())
        .dest(eval(address))
        .mutualTls(settings.serviceIdentifier)
    }

    // Our base ThriftMux.Client
    // Prefer using thriftMuxMethodBuilder below but
    // can be used to build custom clients (re: darkTrafficClient)
    def thriftMuxClient(name: String, propagateDeadlines: Boolean = true): ThriftMux.Client = {
      ThriftMux.client
        .withClientId(thriftClientId)
        .withLabel(name)
        .withStatsReceiver(statsReceiver)
        .withTracer(tracer)
        .withTransport.connectTimeout(defaultTcpConnectTimeout)
        .withSession.acquisitionTimeout(defaultConnectTimeout)
        .withMutualTls(settings.serviceIdentifier)
        .withOpportunisticTls(OpportunisticTls.Required)
        .configured(PropagateDeadlines(enabled = propagateDeadlines))
    }

    // If an endpoint is non-idempotent you should add .nonidempotent and
    // leave off any ResponseClassifiers (it will remove any placed before but not after)
    // If it is unequivocally idempotent you should add .idempotent and
    // leave off any ResponseClassifiers (it will retry on all Throws).  This will also
    // enable backup requests
    def thriftMuxMethodBuilder(
      name: String,
      dest: String,
    ): MethodBuilder = {
      thriftMuxClient(name)
        .withLoadBalancer(balancer(minAperture = 2))
        .methodBuilder(dest)
        .withRetryForClassifier(defaultResponseClassifier)
        .withTimeoutTotal(2.seconds) // total timeout including 1st attempt and up to 2 retries
    }

    def balancer(minAperture: Int = 2) = Balancers.aperture(minAperture = minAperture)

    val eventBusPublisherBuilder =
      EventBusPublisherBuilder()
        .dest(eval("/s/eventbus/provisioning"))
        .clientId(settings.thriftClientId)
        // eventbus stats are further scoped by stream, so put all
        // publishers under the same stats namespace
        .statsReceiver(backendsScope.scope("event_bus"))
        // This makes the underlying kps-client to be resolved over WilyNs vs DNS
        .serviceIdentifier(settings.serviceIdentifier)

    new BackendClients {
      def referencedNames: Seq[Name] = referencedNamesBuilder.result()

      val memcacheClient: memcached.Client =
        Memcached.client
          .withMutualTls(settings.serviceIdentifier)
          .connectionsPerEndpoint(2)
          .configured(param.KeyHasher(KeyHasher.FNV1_32))
          .configured(Transporter.ConnectTimeout(100.milliseconds))
          .configured(TimeoutFilter.Param(200.milliseconds))
          .configured(TimeoutFactory.Param(200.milliseconds))
          .configured(param.EjectFailedHost(false))
          .configured(FailureAccrualFactory.Param(numFailures = 20, markDeadFor = 30.second))
          .configured(
            PendingRequestFilter.Param(limit = Some(settings.cacheClientPendingRequestLimit))
          )
          .filtered(new MemcacheExceptionLoggingFilter)
          .newRichClient(dest = eval(settings.twemcacheDest), label = "memcache")

      /* clients */
      val tweetStorageClient: TweetStorageClient =
        Manhattan.fromClient(
          new ManhattanTweetStorageClient(
            settings.tweetStorageConfig,
            statsReceiver = backendsScope.scope("tweet_storage"),
            clientIdHelper = clientIdHelper,
          )
        )

      val socialGraphService: SocialGraphService = {
        val finagleClient =
          new SocialGraphService$FinagleClient(
            thriftMuxClientBuilder(
              "socialgraph",
              "/s/socialgraph/socialgraph",
              classOf[SocialGraphScroogeIface.MethodPerEndpoint]
            ).loadBalancer(Balancers.aperturePeakEwma(minAperture = 16))
              .build()
          )

        settings.socialGraphSeviceConfig(
          SocialGraphService.fromClient(finagleClient),
          backendContext("socialgraph")
        )
      }

      val tflockClient =
        new FlockDB.FinagledClient(
          thriftMuxClientBuilder("tflock", "/s/tflock/tflock", classOf[FlockDB.MethodPerEndpoint])
            .loadBalancer(balancer(minAperture = 5))
            .responseClassifier(FlockResponse.classifier)
            .build(),
          serviceName = "tflock",
          stats = statsReceiver
        )

      val tflockReadClient: TFlockClient =
        settings.tflockReadConfig(tflockClient, backendContext("tflock"))

      val tflockWriteClient: TFlockClient =
        settings.tflockWriteConfig(tflockClient, backendContext("tflock"))

      val gizmoduck: Gizmoduck = {
        val clientBuilder =
          thriftMuxClientBuilder(
            "gizmoduck",
            "/s/gizmoduck/gizmoduck",
            classOf[UserService.MethodPerEndpoint])
            .loadBalancer(balancer(minAperture = 63))
        val mb = MethodBuilder
          .from(clientBuilder)
          .idempotent(maxExtraLoad = 1.percent)
          .servicePerEndpoint[UserService.ServicePerEndpoint]

        val gizmoduckClient = ThriftMux.Client.methodPerEndpoint(mb)
        settings.gizmoduckConfig(Gizmoduck.fromClient(gizmoduckClient), backendContext("gizmoduck"))
      }

      val merlin: UserRolesService.MethodPerEndpoint = {
        val thriftClient = thriftMuxMethodBuilder("merlin", "/s/merlin/merlin")
          .withTimeoutPerRequest(100.milliseconds)
          .withTimeoutTotal(400.milliseconds)
          .idempotent(0.01)
          .servicePerEndpoint[UserRolesService.ServicePerEndpoint]

        ThriftMux.Client.methodPerEndpoint(thriftClient)
      }

      val talon: Talon = {
        val talonClient =
          new Talon$FinagleClient(
            thriftMuxClientBuilder(
              "talon",
              "/s/talon/backend",
              classOf[TalonScroogeIface.MethodPerEndpoint])
              .build()
          )

        settings.talonConfig(Talon.fromClient(talonClient), backendContext("talon"))
      }

      val guano = Guano()

      val mediaInfoService: MediaInfoService = {
        val finagleClient =
          new MediaInfoService$FinagleClient(
            thriftMuxClientBuilder(
              "mediainfo",
              "/s/photurkey/mediainfo",
              classOf[MediaInfoScroogeIface.MethodPerEndpoint])
              .loadBalancer(balancer(minAperture = 75))
              .build()
          )

        settings.mediaInfoServiceConfig(
          MediaInfoService.fromClient(finagleClient),
          backendContext("mediainfo")
        )
      }

      val userImageService: UserImageService = {
        val finagleClient =
          new UserImageService$FinagleClient(
            thriftMuxClientBuilder(
              "userImage",
              "/s/user-image-service/uis",
              classOf[UserImageScroogeIface.MethodPerEndpoint])
              .build()
          )

        settings.userImageServiceConfig(
          UserImageService.fromClient(finagleClient),
          backendContext("userImage")
        )
      }

      val mediaClient: MediaClient =
        MediaClient.fromBackends(
          userImageService = userImageService,
          mediaInfoService = mediaInfoService
        )

      val timelineService: TimelineService = {
        val timelineServiceClient =
          new tls.TimelineService$FinagleClient(
            thriftMuxClientBuilder(
              "timelineService",
              "/s/timelineservice/timelineservice",
              classOf[tls.TimelineService.MethodPerEndpoint])
              .loadBalancer(balancer(minAperture = 13))
              .build()
          )

        settings.timelineServiceConfig(
          TimelineService.fromClient(timelineServiceClient),
          backendContext("timelineService")
        )
      }

      val expandodo: Expandodo = {
        val cardsServiceClient =
          new CardsService$FinagleClient(
            thriftMuxClientBuilder(
              "expandodo",
              "/s/expandodo/server",
              classOf[CardsScroogeIface.MethodPerEndpoint])
              .loadBalancer(balancer(minAperture = 6))
              .build()
          )

        settings.expandodoConfig(
          Expandodo.fromClient(cardsServiceClient),
          backendContext("expandodo")
        )
      }

      val creativesContainerService: CreativesContainerService = {
        val mb = thriftMuxMethodBuilder(
          "creativesContainerService",
          "/s/creatives-container/creatives-container",
        ).withTimeoutTotal(300.milliseconds)
          .idempotent(maxExtraLoad = 1.percent)
          .servicePerEndpoint[ccs.CreativesContainerService.ServicePerEndpoint]

        settings.creativesContainerServiceConfig(
          CreativesContainerService.fromClient(ccs.CreativesContainerService.MethodPerEndpoint(mb)),
          backendContext("creativesContainerService")
        )
      }

      val scarecrow: Scarecrow = {
        val scarecrowClient = new ScarecrowService$FinagleClient(
          thriftMuxClientBuilder(
            "scarecrow",
            "/s/abuse/scarecrow",
            classOf[ScarecrowScroogeIface.MethodPerEndpoint])
            .loadBalancer(balancer(minAperture = 6))
            .build(),
          serviceName = "scarecrow",
          stats = statsReceiver
        )

        settings.scarecrowConfig(Scarecrow.fromClient(scarecrowClient), backendContext("scarecrow"))
      }

      val snowflakeClient: Snowflake.MethodPerEndpoint = {
        eval("/s/snowflake/snowflake") // eagerly resolve the serverset
        val mb = thriftMuxMethodBuilder(
          "snowflake",
          "/s/snowflake/snowflake"
        ).withTimeoutTotal(300.milliseconds)
          .withTimeoutPerRequest(100.milliseconds)
          .idempotent(maxExtraLoad = 1.percent)

        SnowflakeClient.snowflakeClient(mb)
      }

      val deferredRpcClient =
        new DeferredRPC.FinagledClient(
          thriftMuxClientBuilder(
            "deferredrpc",
            "/s/kafka-shared/krpc-server-main",
            classOf[DeferredRPC.MethodPerEndpoint])
            .requestTimeout(200.milliseconds)
            .retryPolicy(retry(timeouts = 3))
            .build(),
          serviceName = "deferredrpc",
          stats = statsReceiver
        )

      def deferredTweetypie(target: Target): ThriftTweetService = {
        // When deferring back to the local datacenter, preserve the finagle
        // context and dtabs. This will ensure that developer dtabs are honored
        // and that context is preserved in eventbus. (eventbus enqueues only
        // happen in async requests within the same datacenter.)
        //
        // Effectively, this means we consider deferredrpc requests within the
        // same datacenter to be part of the same request, but replicated
        // requests are not.
        val isLocal: Boolean = target.datacenter == Datacenter.Local

        val deferredThriftService: Service[ThriftClientRequest, Array[Byte]] =
          new DeferredThriftService(
            deferredRpcClient,
            target,
            serializeFinagleContexts = isLocal,
            serializeFinagleDtabs = isLocal
          )

        new TweetServiceInternal$FinagleClient(deferredThriftService)
      }

      val replicationClient: ThriftTweetService =
        deferredTweetypie(Target(Datacenter.AllOthers, "tweetypie-replication"))

      // used for read endpoints replication
      val lowQoSReplicationClients: Seq[GatedReplicationClient] = {
        val rampUpGate = Gate.linearRampUp(Time.now, settings.forkingRampUp)

        // Gates to avoid sending replicated reads from a cluster to itself
        val inATLA = if (settings.zone == "atla") Gate.True else Gate.False
        val inPDXA = if (settings.zone == "pdxa") Gate.True else Gate.False

        Seq(
          GatedReplicationClient(
            client = deferredTweetypie(Target(Datacenter.Atla, "tweetypie-lowqos")),
            gate = rampUpGate & deciderGates.replicateReadsToATLA & !inATLA
          ),
          GatedReplicationClient(
            client = deferredTweetypie(Target(Datacenter.Pdxa, "tweetypie-lowqos")),
            gate = rampUpGate & deciderGates.replicateReadsToPDXA & !inPDXA
          )
        )
      }

      // used for async operations in the write path
      val asyncTweetService: ThriftTweetService =
        deferredTweetypie(Target(Datacenter.Local, "tweetypie"))

      // used to trigger asyncEraseUserTweetsRequest
      val asyncTweetDeletionService: ThriftTweetService =
        deferredTweetypie(Target(Datacenter.Local, "tweetypie-retweet-deletion"))

      // used for async retries
      val asyncRetryTweetService: ThriftTweetService =
        deferredTweetypie(Target(Datacenter.Local, "tweetypie-async-retry"))

      val darkTrafficClient: Service[Array[Byte], Array[Byte]] = {
        val thriftService =
          thriftMuxClient(
            "tweetypie.dark",
            propagateDeadlines = false
          ).withRequestTimeout(100.milliseconds)
            .newService("/s/tweetypie/proxy")

        val transformer =
          new Filter[Array[Byte], Array[Byte], ThriftClientRequest, Array[Byte]] {
            override def apply(
              request: Array[Byte],
              service: Service[ThriftClientRequest, Array[Byte]]
            ): Future[Array[Byte]] =
              service(new ThriftClientRequest(request, false))
          }

        transformer andThen thriftService
      }

      val geoHydrationClient: GeoduckHydration.MethodPerEndpoint = {
        val mb = thriftMuxMethodBuilder("geoduck_hydration", "/s/geo/hydration")
          .withTimeoutPerRequest(100.millis)
          .idempotent(maxExtraLoad = 1.percent)
        ThriftMux.Client.methodPerEndpoint(
          mb.servicePerEndpoint[GeoduckHydration.ServicePerEndpoint])
      }

      val geoHydrationLocate: GeoduckLocate = geoHydrationClient.locate

      val geoReverseGeocoderClient: ReverseGeocoder.MethodPerEndpoint = {
        val mb = thriftMuxMethodBuilder("geoduck_reversegeocoder", "/s/geo/geoduck_reversegeocoder")
          .withTimeoutPerRequest(100.millis)
          .idempotent(maxExtraLoad = 1.percent)
        ThriftMux.Client.methodPerEndpoint(
          mb.servicePerEndpoint[ReverseGeocoder.ServicePerEndpoint])
      }

      val geoduckGeohashLocate: GeoduckGeohashLocate = {
        new GeoduckGeohashLocate(
          reverseGeocoderClient = geoReverseGeocoderClient,
          hydrationClient = geoHydrationClient,
          classScopedStatsReceiver = statsReceiver.scope("geo_geohash_locate"))
      }

      val geoRelevance =
        new Relevance$FinagleClient(
          thriftMuxClientBuilder(
            "geoduck_relevance",
            "/s/geo/relevance",
            classOf[Relevance.MethodPerEndpoint])
            .requestTimeout(100.milliseconds)
            .retryPolicy(retry(timeouts = 1))
            .build(),
          stats = statsReceiver
        )

      val fanoutServiceClient =
        new FanoutService$FinagleClient(
          new DeferredThriftService(deferredRpcClient, Target(Datacenter.Local, "fanoutservice")),
          serviceName = "fanoutservice",
          stats = statsReceiver
        )

      val limiterService: LimiterService = {
        val limiterClient =
          new LimiterClientFactory(
            name = "limiter",
            clientId = thriftClientId,
            tracer = tracer,
            statsReceiver = statsReceiver,
            serviceIdentifier = settings.serviceIdentifier,
            opportunisticTlsLevel = OpportunisticTls.Required,
            daemonize = true
          )(eval("/s/limiter/limiter"))

        val limiterBackend = settings.limiterBackendConfig(
          LimiterBackend.fromClient(limiterClient),
          backendContext("limiter")
        )

        LimiterService.fromBackend(
          limiterBackend.incrementFeature,
          limiterBackend.getFeatureUsage,
          getAppId,
          backendsScope.scope("limiter")
        )
      }

      val passbirdClient =
        new PassbirdService$FinagleClient(
          thriftMuxClientBuilder(
            "passbird",
            "/s/passbird/passbird",
            classOf[PassbirdService.MethodPerEndpoint])
            .requestTimeout(100.milliseconds)
            .retryPolicy(retry(timeouts = 1))
            .build(),
          serviceName = "passbird",
          stats = statsReceiver
        )

      val escherbird: Escherbird = {
        val escherbirdClient =
          new TweetEntityAnnotationService$FinagleClient(
            thriftMuxClientBuilder(
              "escherbird",
              "/s/escherbird/annotationservice",
              classOf[TweetEntityAnnotationScroogeIface.MethodPerEndpoint])
              .build()
          )
        settings.escherbirdConfig(
          Escherbird.fromClient(escherbirdClient),
          backendContext("escherbird")
        )
      }

      val geoScrubEventStore: GeoScrubEventStore = {
        val mhMtlsParams =
          if (settings.serviceIdentifier == EmptyServiceIdentifier) NoMtlsParams
          else
            ManhattanKVClientMtlsParams(
              serviceIdentifier = settings.serviceIdentifier,
              opportunisticTls = OpportunisticTls.Required)

        val mhClient =
          new ManhattanKVClient(
            appId = "geoduck_scrub_datastore",
            dest = "/s/manhattan/omega.native-thrift",
            mtlsParams = mhMtlsParams,
            label = "mh_omega",
            Seq(Experiments.ApertureLoadBalancer)
          )

        GeoScrubEventStore(
          mhClient,
          settings.geoScrubEventStoreConfig,
          backendContext("geoScrubEventStore")
        )
      }

      val tweetEventsPublisher: EventBusPublisher[TweetEvent] =
        eventBusPublisherBuilder
          .streamName("tweet_events")
          .thriftStruct(TweetEvent)
          .publishTimeout(500.milliseconds)
          .serializeFinagleDtabs(true)
          .build()

      val deleteLocationDataPublisher: EventBusPublisher[DeleteLocationData] =
        eventBusPublisherBuilder
          .streamName("tweetypie_delete_location_data_prod")
          .thriftStruct(DeleteLocationData)
          // deleteLocationData is relatively rare, and publishing to
          // eventbus is all that the endpoint does. This means that it
          // is much more likely that we will have to make a connection,
          // which has much greater latency, and also makes us more
          // tolerant of slow requests, so we choose a long timeout.
          .publishTimeout(2.seconds)
          .build()

      val retweetArchivalEventPublisher: EventBusPublisher[RetweetArchivalEvent] =
        eventBusPublisherBuilder
          .streamName("retweet_archival_events")
          .thriftStruct(RetweetArchivalEvent)
          .publishTimeout(500.milliseconds)
          .build()

      val gnipEnricherator: GnipEnricherator = {
        val gnipEnricherator =
          thriftMuxMethodBuilder(
            "enricherator",
            "/s/datadelivery-enrichments/enricherator"
          )
        GnipEnricherator.fromMethod(gnipEnricherator)
      }

      val stratoserverClient: StratoClient = Strato.client
        .withMutualTls(
          serviceIdentifier = settings.serviceIdentifier,
          opportunisticLevel = OpportunisticTls.Required)
        .withLabel("stratoserver")
        .withRequestTimeout(100.milliseconds)
        .build()

      val configBus: ConfigBus =
        ConfigBus(backendsScope.scope("config_bus"), settings.instanceId, settings.instanceCount)

      val callbackPromotedContentLogger: CallbackPromotedContentLogger = {
        val publisher =
          eventBusPublisherBuilder
            .streamName(settings.adsLoggingClientTopicName)
            .thriftStruct(AdCallbackEvent)
            .publishTimeout(500.milliseconds)
            .serializeFinagleDtabs(true)
            .maxQueuedEvents(1000)
            .kafkaDest("/s/kafka/ads-callback:kafka-tls")
            .build()

        val stats = backendsScope.scope("promoted_content")
        val adsLoggingClient = AdsLoggingClient(publisher, stats, "Tweetypie")
        new CallbackPromotedContentLogger(adsLoggingClient, stats)
      }
    }
  }