def build()

in finagle-scribe/src/main/scala/com/twitter/finagle/scribe/Publisher.scala [77:253]


  def build(category: String, label: String): Publisher =
    builder.build(category, label)

  /**
   * Builder for a Scribe [[Publisher]]
   * ==Usage==
   * {{{
   *   val publisher: Publisher = Publisher.build("category", "label")
   * }}}
   *
   * Or to apply configuration:
   *
   * {{{
   *   val publisher: Publisher =
   *     Publisher.builder
   *       .withRetryPolicy(customPolicy)
   *       .withResponseClassifier(customClassifier)
   *       .build("category", "label")
   * }}}
   *
   * @param dest Resolvable destination of the Scribe host to which to write entries.
   * @param statsReceiver [[StatsReceiver]] for collections of success/failure/error metrics.
   * @param retryPolicy the Finagle client [[RetryPolicy]] for retrying failures.
   * @param responseClassifier how Finagle should classify responses.
   * @param filter a user provided Filter chain which is applied directly before writing the entries to Scribe.
   */
  class Builder private[scribe] (
    dest: String = DefaultDest,
    statsReceiver: StatsReceiver = DefaultStatsReceiver,
    retryPolicy: RetryPolicy[(Log.Args, Try[Log.SuccessType])] = DefaultRetryPolicy,
    responseClassifier: ResponseClassifier = DefaultResponseClassifier,
    filter: Filter[Log.Args, Log.SuccessType, Log.Args, Log.SuccessType] = Filter.identity,
    logServiceOverride: Option[Service[Log.Args, Log.SuccessType]] = None) {

    def withDest(dest: String): Builder = {
      new Builder(
        dest = dest,
        this.statsReceiver,
        this.retryPolicy,
        this.responseClassifier,
        this.filter,
        this.logServiceOverride)
    }

    def withStatsReceiver(statsReceiver: StatsReceiver): Builder = {
      new Builder(
        this.dest,
        statsReceiver = statsReceiver,
        this.retryPolicy,
        this.responseClassifier,
        this.filter,
        this.logServiceOverride)
    }

    def withRetryPolicy(retryPolicy: RetryPolicy[(Log.Args, Try[Log.SuccessType])]): Builder = {
      new Builder(
        this.dest,
        this.statsReceiver,
        retryPolicy = retryPolicy,
        this.responseClassifier,
        this.filter,
        this.logServiceOverride)
    }

    def withResponseClassifier(responseClassifier: ResponseClassifier): Builder = {
      new Builder(
        this.dest,
        this.statsReceiver,
        this.retryPolicy,
        responseClassifier = responseClassifier,
        this.filter,
        this.logServiceOverride)
    }

    /** APPEND (not replace) the given Filter to the current Filter */
    def withFilter(
      filter: Filter[Log.Args, Log.SuccessType, Log.Args, Log.SuccessType]
    ): Builder = {
      new Builder(
        this.dest,
        this.statsReceiver,
        this.retryPolicy,
        this.responseClassifier,
        this.filter.andThen(filter),
        this.logServiceOverride)
    }

    /* exposed for testing */
    private[scribe] def withLogServiceOverride(
      logServiceOverride: Option[Service[Log.Args, Log.SuccessType]]
    ): Builder = {
      new Builder(
        this.dest,
        this.statsReceiver,
        this.retryPolicy,
        this.responseClassifier,
        this.filter,
        logServiceOverride = logServiceOverride
      )
    }

    /**
     * Build a new Scribe [[Publisher]] from the given category and label.
     * @param category the Scribe category to which to publish entries.
     * @param label the client label used in metrics
     * @return a new [[Publisher]] configured from the current state this [[Builder]].
     */
    def build(category: String, label: String): Publisher = {
      val stats = new ScribeStats(statsReceiver.scope(label))
      val client = newClient(label, stats, this.statsReceiver.scope("clnt"))
      new Publisher(
        category = category,
        stats = stats,
        client = client
      )
    }

    private def newClient(
      label: String,
      stats: ScribeStats, // captures per-request Scribe stats (not logical)
      clientStatsReceiver: StatsReceiver // Finagle client stats -- filtered & will be use for logical and per-request
    ): Scribe.MethodPerEndpoint = {
      val statsReceiver = filteredStatsReceiver(clientStatsReceiver)
      val scopedStatsReceiver = statsReceiver.scope(label)
      val logicalStatsReceiver = scopedStatsReceiver.scope("logical")

      // share the RetryBudget between the retry and requeue filters
      val retryBudget = RetryBudget()
      val retryFilter = new RetryFilter[Log.Args, ResultCode](
        retryPolicy = this.retryPolicy,
        retryBudget = retryBudget,
        timer = DefaultTimer,
        statsReceiver = scopedStatsReceiver
      )

      val statsFilter = StatsFilter
        .typeAgnostic(
          logicalStatsReceiver, // client stats receiver, filtered and scoped to the label + logical, e.g., clnt/label/logical
          this.responseClassifier,
          StatsFilter.DefaultExceptions,
          TimeUnit.MILLISECONDS
        ).toFilter[Log.Args, Log.SuccessType]

      val servicePerEndpoint: Scribe.ServicePerEndpoint = this.logServiceOverride match {
        case Some(svc) =>
          Scribe.ServicePerEndpoint(log = svc)
        case _ =>
          Thrift.client
            .withRetryBudget(retryBudget)
            .withSessionPool.maxSize(5)
            .withSessionPool.maxWaiters(10000)
            .withSessionQualifier.noFailFast
            .withSessionQualifier.noFailureAccrual
            // Client stats receiver, filtered, will be scoped to label by Finagle, e.g., clnt/label/
            .withStatsReceiver(statsReceiver)
            // We disable Tracing for this client to prevent creating a recursive tracing storm.
            .withTracer(NullTracer)
            .withRequestTimeout(1.second) // each retry will have this timeout
            .servicePerEndpoint[Scribe.ServicePerEndpoint](this.dest, label)
      }

      val tracelessFilter = new TracelessFilter
      val scribeMetricsFilter = new ScribeMetricsFilter(stats)
      val filteredServicePerEndpoint = servicePerEndpoint.withLog(
        log = tracelessFilter
          .andThen(statsFilter)
          .andThen(retryFilter)
          // this is placed after the retry filter so
          // that we update stats on retried requests
          .andThen(scribeMetricsFilter)
          // user provided filter
          .andThen(this.filter)
          .andThen(servicePerEndpoint.log)
      )
      Thrift.Client.methodPerEndpoint(filteredServicePerEndpoint)
    }
  }