in util-core/src/main/scala/com/twitter/util/BatchExecutor.scala [136:165]
def executeBatch(batch: Iterable[(In, Promise[Out])]): Unit = {
val uncancelled = batch.filter {
case (in, p) =>
p.isInterrupted match {
case Some(_cause) =>
p.setException(new CancellationException)
false
case scala.None => true
}
}
val ins = uncancelled map { case (in, _) => in }
// N.B. intentionally not linking cancellation of these promises to the execution of the batch
// because it seems that in most cases you would be canceling mostly uncanceled work for an
// outlier.
val promises = uncancelled map { case (_, promise) => promise }
f(ins) respond {
case Return(outs) =>
(outs zip promises) foreach {
case (out, p) =>
p() = Return(out)
}
case Throw(e) =>
val t = Throw(e)
promises foreach { _() = t }
}
}