in src/main/scala/com/twitter/iago/server/RequestConsumer.scala [130:158]
private[this] def takeOne(draining: Boolean = false): Promise[Unit] = {
val p = Promise[Unit]()
def takeAnother() = p.become(takeOne(draining))
futurePool {
if (draining) {
queue.pop()
} else {
queue.take()
}
}.onSuccess { request =>
send(request)
val waitTime = schedule.next()
val now = Time.now
// Most timers latch on millisecondly intervals due to the behavior
// of `Object.wait(long)`. Events are processed immediately until
// one is scheduled for the future. This allows handling rates higher
// than 1/ms in a way that is faithful to the underlying schedule.
if (waitTime <= now) {
// Resetting the schedule in the event that the application is paused
// for a long time prevents extended bursts of traffic.
if (waitTime < now.minus(5.seconds)) { schedule.jumpToTime(now) }
takeAnother()
} else {
timer.doAt(waitTime) { takeAnother() }
}
}
.handle { case _: NoSuchElementException => p.setDone() }
p
}