in finagle-scribe/src/main/scala/com/twitter/finagle/scribe/Publisher.scala [77:253]
def build(category: String, label: String): Publisher =
builder.build(category, label)
/**
* Builder for a Scribe [[Publisher]]
* ==Usage==
* {{{
* val publisher: Publisher = Publisher.build("category", "label")
* }}}
*
* Or to apply configuration:
*
* {{{
* val publisher: Publisher =
* Publisher.builder
* .withRetryPolicy(customPolicy)
* .withResponseClassifier(customClassifier)
* .build("category", "label")
* }}}
*
* @param dest Resolvable destination of the Scribe host to which to write entries.
* @param statsReceiver [[StatsReceiver]] for collections of success/failure/error metrics.
* @param retryPolicy the Finagle client [[RetryPolicy]] for retrying failures.
* @param responseClassifier how Finagle should classify responses.
* @param filter a user provided Filter chain which is applied directly before writing the entries to Scribe.
*/
class Builder private[scribe] (
dest: String = DefaultDest,
statsReceiver: StatsReceiver = DefaultStatsReceiver,
retryPolicy: RetryPolicy[(Log.Args, Try[Log.SuccessType])] = DefaultRetryPolicy,
responseClassifier: ResponseClassifier = DefaultResponseClassifier,
filter: Filter[Log.Args, Log.SuccessType, Log.Args, Log.SuccessType] = Filter.identity,
logServiceOverride: Option[Service[Log.Args, Log.SuccessType]] = None) {
def withDest(dest: String): Builder = {
new Builder(
dest = dest,
this.statsReceiver,
this.retryPolicy,
this.responseClassifier,
this.filter,
this.logServiceOverride)
}
def withStatsReceiver(statsReceiver: StatsReceiver): Builder = {
new Builder(
this.dest,
statsReceiver = statsReceiver,
this.retryPolicy,
this.responseClassifier,
this.filter,
this.logServiceOverride)
}
def withRetryPolicy(retryPolicy: RetryPolicy[(Log.Args, Try[Log.SuccessType])]): Builder = {
new Builder(
this.dest,
this.statsReceiver,
retryPolicy = retryPolicy,
this.responseClassifier,
this.filter,
this.logServiceOverride)
}
def withResponseClassifier(responseClassifier: ResponseClassifier): Builder = {
new Builder(
this.dest,
this.statsReceiver,
this.retryPolicy,
responseClassifier = responseClassifier,
this.filter,
this.logServiceOverride)
}
/** APPEND (not replace) the given Filter to the current Filter */
def withFilter(
filter: Filter[Log.Args, Log.SuccessType, Log.Args, Log.SuccessType]
): Builder = {
new Builder(
this.dest,
this.statsReceiver,
this.retryPolicy,
this.responseClassifier,
this.filter.andThen(filter),
this.logServiceOverride)
}
/* exposed for testing */
private[scribe] def withLogServiceOverride(
logServiceOverride: Option[Service[Log.Args, Log.SuccessType]]
): Builder = {
new Builder(
this.dest,
this.statsReceiver,
this.retryPolicy,
this.responseClassifier,
this.filter,
logServiceOverride = logServiceOverride
)
}
/**
* Build a new Scribe [[Publisher]] from the given category and label.
* @param category the Scribe category to which to publish entries.
* @param label the client label used in metrics
* @return a new [[Publisher]] configured from the current state this [[Builder]].
*/
def build(category: String, label: String): Publisher = {
val stats = new ScribeStats(statsReceiver.scope(label))
val client = newClient(label, stats, this.statsReceiver.scope("clnt"))
new Publisher(
category = category,
stats = stats,
client = client
)
}
private def newClient(
label: String,
stats: ScribeStats, // captures per-request Scribe stats (not logical)
clientStatsReceiver: StatsReceiver // Finagle client stats -- filtered & will be use for logical and per-request
): Scribe.MethodPerEndpoint = {
val statsReceiver = filteredStatsReceiver(clientStatsReceiver)
val scopedStatsReceiver = statsReceiver.scope(label)
val logicalStatsReceiver = scopedStatsReceiver.scope("logical")
// share the RetryBudget between the retry and requeue filters
val retryBudget = RetryBudget()
val retryFilter = new RetryFilter[Log.Args, ResultCode](
retryPolicy = this.retryPolicy,
retryBudget = retryBudget,
timer = DefaultTimer,
statsReceiver = scopedStatsReceiver
)
val statsFilter = StatsFilter
.typeAgnostic(
logicalStatsReceiver, // client stats receiver, filtered and scoped to the label + logical, e.g., clnt/label/logical
this.responseClassifier,
StatsFilter.DefaultExceptions,
TimeUnit.MILLISECONDS
).toFilter[Log.Args, Log.SuccessType]
val servicePerEndpoint: Scribe.ServicePerEndpoint = this.logServiceOverride match {
case Some(svc) =>
Scribe.ServicePerEndpoint(log = svc)
case _ =>
Thrift.client
.withRetryBudget(retryBudget)
.withSessionPool.maxSize(5)
.withSessionPool.maxWaiters(10000)
.withSessionQualifier.noFailFast
.withSessionQualifier.noFailureAccrual
// Client stats receiver, filtered, will be scoped to label by Finagle, e.g., clnt/label/
.withStatsReceiver(statsReceiver)
// We disable Tracing for this client to prevent creating a recursive tracing storm.
.withTracer(NullTracer)
.withRequestTimeout(1.second) // each retry will have this timeout
.servicePerEndpoint[Scribe.ServicePerEndpoint](this.dest, label)
}
val tracelessFilter = new TracelessFilter
val scribeMetricsFilter = new ScribeMetricsFilter(stats)
val filteredServicePerEndpoint = servicePerEndpoint.withLog(
log = tracelessFilter
.andThen(statsFilter)
.andThen(retryFilter)
// this is placed after the retry filter so
// that we update stats on retried requests
.andThen(scribeMetricsFilter)
// user provided filter
.andThen(this.filter)
.andThen(servicePerEndpoint.log)
)
Thrift.Client.methodPerEndpoint(filteredServicePerEndpoint)
}
}