def apply()

in finagle-core/src/main/scala/com/twitter/finagle/addr/StabilizingAddr.scala [36:134]


  def apply(
    addr: Offer[Addr],
    pulse: Offer[State.Health],
    grace: Duration,
    statsReceiver: StatsReceiver = NullStatsReceiver,
    timer: Timer = DefaultTimer
  ): Offer[Addr] = new Offer[Addr] {
    import State._

    implicit val injectTimer = timer

    @volatile var nq = 0
    @volatile var healthStat = Healthy.id

    val health = statsReceiver.addGauge("health") { healthStat }
    val limbo = statsReceiver.addGauge("limbo") { nq }
    val stabilized = new Broker[Addr]

    /**
     * Exclusively maintains the elements in current
     * based on adds, removes, and health transitions.
     * Removes are delayed for grace period and each health
     * transition resets the grace period.
     */
    def loop(
      remq: Queue[(Address, Time)],
      h: Health,
      active: Set[Address],
      needPush: Boolean,
      srcAddr: Addr
    ): Future[Unit] = {
      nq = remq.size
      Offer.select(
        pulse map { newh =>
          healthStat = newh.id

          // If our health transitions into healthy, reset removal
          // times foreach elem in remq.
          newh match {
            case newh if h == newh =>
              loop(remq, newh, active, needPush, srcAddr)
            case Healthy =>
              // Transitioned to healthy: push back
              val newTime = Time.now + grace
              val newq = remq map { case (elem, _) => (elem, newTime) }
              loop(newq, Healthy, active, needPush, srcAddr)
            case newh =>
              loop(remq, newh, active, needPush, srcAddr)
          }
        },
        addr map {
          case addr @ Addr.Bound(newSet, _) =>
            // Update our pending queue so that newly added
            // entries aren't later removed.
            var q = remq filter { case (e, _) => !(newSet contains e) }

            // Add newly removed elements to the remove queue.
            val until = Time.now + grace
            for (el <- active &~ newSet if !qcontains(q, el))
              q = q.enqueue((el, until))

            loop(q, h, active ++ newSet, true, addr)

          case addr =>
            // A nonbound address will enqueue all active members
            // for removal, so that if we become bound again, we can
            // continue on merrily.
            val until = Time.now + grace
            val q = remq.enqueue(active.map(el => (el, until)))

            loop(q, h, active, true, addr)
        },
        if (h != Healthy || remq.isEmpty) Offer.never
        else {
          // Note: remq is ordered by 'until' time.
          val ((elem, until), nextq) = remq.dequeue
          Offer.timeout(until - Time.now) map { _ => loop(nextq, h, active - elem, true, srcAddr) }
        },
        if (!needPush) Offer.never
        else {
          // We always bind if active is nonempty. Otherwise we
          // pass through the current active address.
          val attrs = srcAddr match {
            case Addr.Bound(_, attrs) => attrs
            case _ => Addr.Metadata.empty
          }
          val addr =
            if (active.nonEmpty) Addr.Bound(active, attrs)
            else srcAddr
          stabilized.send(addr) map { _ => loop(remq, h, active, false, srcAddr) }
        }
      )
    }

    loop(Queue.empty, Healthy, Set.empty, false, Addr.Pending)

    // Defer to the underlying Offer.
    def prepare() = stabilized.recv.prepare()
  }