private[this] def takeOne()

in src/main/scala/com/twitter/iago/server/RequestConsumer.scala [130:158]


  private[this] def takeOne(draining: Boolean = false): Promise[Unit] = {
    val p = Promise[Unit]()
    def takeAnother() = p.become(takeOne(draining))
    futurePool {
      if (draining) {
        queue.pop()
      } else {
        queue.take()
      }
    }.onSuccess { request =>
        send(request)
        val waitTime = schedule.next()
        val now = Time.now
        // Most timers latch on millisecondly intervals due to the behavior
        // of `Object.wait(long)`. Events are processed immediately until
        // one is scheduled for the future. This allows handling rates higher
        // than 1/ms in a way that is faithful to the underlying schedule.
        if (waitTime <= now) {
          // Resetting the schedule in the event that the application is paused
          // for a long time prevents extended bursts of traffic.
          if (waitTime < now.minus(5.seconds)) { schedule.jumpToTime(now) }
          takeAnother()
        } else {
          timer.doAt(waitTime) { takeAnother() }
        }
      }
      .handle { case _: NoSuchElementException => p.setDone() }
    p
  }