in finagle-core/src/main/scala/com/twitter/finagle/addr/StabilizingAddr.scala [36:134]
def apply(
addr: Offer[Addr],
pulse: Offer[State.Health],
grace: Duration,
statsReceiver: StatsReceiver = NullStatsReceiver,
timer: Timer = DefaultTimer
): Offer[Addr] = new Offer[Addr] {
import State._
implicit val injectTimer = timer
@volatile var nq = 0
@volatile var healthStat = Healthy.id
val health = statsReceiver.addGauge("health") { healthStat }
val limbo = statsReceiver.addGauge("limbo") { nq }
val stabilized = new Broker[Addr]
/**
* Exclusively maintains the elements in current
* based on adds, removes, and health transitions.
* Removes are delayed for grace period and each health
* transition resets the grace period.
*/
def loop(
remq: Queue[(Address, Time)],
h: Health,
active: Set[Address],
needPush: Boolean,
srcAddr: Addr
): Future[Unit] = {
nq = remq.size
Offer.select(
pulse map { newh =>
healthStat = newh.id
// If our health transitions into healthy, reset removal
// times foreach elem in remq.
newh match {
case newh if h == newh =>
loop(remq, newh, active, needPush, srcAddr)
case Healthy =>
// Transitioned to healthy: push back
val newTime = Time.now + grace
val newq = remq map { case (elem, _) => (elem, newTime) }
loop(newq, Healthy, active, needPush, srcAddr)
case newh =>
loop(remq, newh, active, needPush, srcAddr)
}
},
addr map {
case addr @ Addr.Bound(newSet, _) =>
// Update our pending queue so that newly added
// entries aren't later removed.
var q = remq filter { case (e, _) => !(newSet contains e) }
// Add newly removed elements to the remove queue.
val until = Time.now + grace
for (el <- active &~ newSet if !qcontains(q, el))
q = q.enqueue((el, until))
loop(q, h, active ++ newSet, true, addr)
case addr =>
// A nonbound address will enqueue all active members
// for removal, so that if we become bound again, we can
// continue on merrily.
val until = Time.now + grace
val q = remq.enqueue(active.map(el => (el, until)))
loop(q, h, active, true, addr)
},
if (h != Healthy || remq.isEmpty) Offer.never
else {
// Note: remq is ordered by 'until' time.
val ((elem, until), nextq) = remq.dequeue
Offer.timeout(until - Time.now) map { _ => loop(nextq, h, active - elem, true, srcAddr) }
},
if (!needPush) Offer.never
else {
// We always bind if active is nonempty. Otherwise we
// pass through the current active address.
val attrs = srcAddr match {
case Addr.Bound(_, attrs) => attrs
case _ => Addr.Metadata.empty
}
val addr =
if (active.nonEmpty) Addr.Bound(active, attrs)
else srcAddr
stabilized.send(addr) map { _ => loop(remq, h, active, false, srcAddr) }
}
)
}
loop(Queue.empty, Healthy, Set.empty, false, Addr.Pending)
// Defer to the underlying Offer.
def prepare() = stabilized.recv.prepare()
}