in finagle-core/src/main/scala/com/twitter/finagle/loadbalancer/LoadBalancerFactory.scala [296:382]
def make(
params: Stack.Params,
next: Stack[ServiceFactory[Req, Rep]]
): Stack[ServiceFactory[Req, Rep]] = {
val dest = getDest(params)
val Param(loadBalancerFactory) = params[Param]
val EnableProbation(probationEnabled) = params[EnableProbation]
val param.Stats(statsReceiver) = params[param.Stats]
val param.Label(label) = params[param.Label]
val rawStatsReceiver = statsReceiver match {
case sr: RollupStatsReceiver => sr.underlying.head
case sr => sr
}
val balancerStats = rawStatsReceiver.scope("loadbalancer")
val balancerExc = new NoBrokersAvailableException(params[ErrorLabel].label)
def newBalancer(
endpoints: Activity[Set[EndpointFactory[Req, Rep]]],
disableEagerConnections: Boolean
): ServiceFactory[Req, Rep] = {
val ordering = params[AddressOrdering].ordering
val orderedEndpoints = endpoints.map { set =>
try set.toVector.sortBy(_.address)(ordering)
catch {
case NonFatal(exc) =>
val res = set.toVector
DefaultLogger.log(
Level.WARNING,
s"Unable to order endpoints via ($ordering): \n${res.mkString("\n")}",
exc
)
res
}
}
var finalParams = params + param.Stats(balancerStats)
if (disableEagerConnections) {
finalParams = finalParams + EagerConnections(false)
}
loadBalancerFactory.newBalancer(
orderedEndpoints,
balancerExc,
finalParams
)
}
// we directly pass in these endpoints, instead of keeping track of them ourselves.
// this allows higher abstractions (like partitioners) to move endpoints from one
// cluster to another, and crucially, to share data between endpoints
val endpoints = if (params.contains[LoadBalancerFactory.Endpoints]) {
params[LoadBalancerFactory.Endpoints].va
.asInstanceOf[Event[Activity.State[Set[EndpointFactory[Req, Rep]]]]]
} else {
TrafficDistributor.weightEndpoints(
AddrLifecycle.varAddrToActivity(dest, label),
newEndpointFn(params, next),
!probationEnabled
)
}
// If weight-aware aperture load balancers are enabled, we do not wrap the
// newBalancer in a TrafficDistributor.
if (loadBalancerFactory.supportsWeighted) {
// Add the newBalancer to the stack
Stack.leaf(
role,
newBalancer(Activity(endpoints), disableEagerConnections = false)
)
} else {
// Instead of simply creating a newBalancer here, we defer to the
// TrafficDistributor to interpret weighted `Addresses`.
Stack.leaf(
role,
new TrafficDistributor[Req, Rep](
dest = endpoints,
newBalancer = newBalancer(_, _),
statsReceiver = balancerStats
)
)
}
}