def make()

in finagle-core/src/main/scala/com/twitter/finagle/loadbalancer/LoadBalancerFactory.scala [296:382]


    def make(
      params: Stack.Params,
      next: Stack[ServiceFactory[Req, Rep]]
    ): Stack[ServiceFactory[Req, Rep]] = {

      val dest = getDest(params)

      val Param(loadBalancerFactory) = params[Param]
      val EnableProbation(probationEnabled) = params[EnableProbation]

      val param.Stats(statsReceiver) = params[param.Stats]
      val param.Label(label) = params[param.Label]

      val rawStatsReceiver = statsReceiver match {
        case sr: RollupStatsReceiver => sr.underlying.head
        case sr => sr
      }

      val balancerStats = rawStatsReceiver.scope("loadbalancer")
      val balancerExc = new NoBrokersAvailableException(params[ErrorLabel].label)

      def newBalancer(
        endpoints: Activity[Set[EndpointFactory[Req, Rep]]],
        disableEagerConnections: Boolean
      ): ServiceFactory[Req, Rep] = {
        val ordering = params[AddressOrdering].ordering
        val orderedEndpoints = endpoints.map { set =>
          try set.toVector.sortBy(_.address)(ordering)
          catch {
            case NonFatal(exc) =>
              val res = set.toVector
              DefaultLogger.log(
                Level.WARNING,
                s"Unable to order endpoints via ($ordering): \n${res.mkString("\n")}",
                exc
              )
              res
          }
        }

        var finalParams = params + param.Stats(balancerStats)
        if (disableEagerConnections) {
          finalParams = finalParams + EagerConnections(false)
        }

        loadBalancerFactory.newBalancer(
          orderedEndpoints,
          balancerExc,
          finalParams
        )
      }

      // we directly pass in these endpoints, instead of keeping track of them ourselves.
      // this allows higher abstractions (like partitioners) to move endpoints from one
      // cluster to another, and crucially, to share data between endpoints
      val endpoints = if (params.contains[LoadBalancerFactory.Endpoints]) {
        params[LoadBalancerFactory.Endpoints].va
          .asInstanceOf[Event[Activity.State[Set[EndpointFactory[Req, Rep]]]]]
      } else {
        TrafficDistributor.weightEndpoints(
          AddrLifecycle.varAddrToActivity(dest, label),
          newEndpointFn(params, next),
          !probationEnabled
        )
      }

      // If weight-aware aperture load balancers are enabled, we do not wrap the
      // newBalancer in a TrafficDistributor.
      if (loadBalancerFactory.supportsWeighted) {
        // Add the newBalancer to the stack
        Stack.leaf(
          role,
          newBalancer(Activity(endpoints), disableEagerConnections = false)
        )
      } else {
        // Instead of simply creating a newBalancer here, we defer to the
        // TrafficDistributor to interpret weighted `Addresses`.
        Stack.leaf(
          role,
          new TrafficDistributor[Req, Rep](
            dest = endpoints,
            newBalancer = newBalancer(_, _),
            statsReceiver = balancerStats
          )
        )
      }
    }