in storehaus-algebra/src/main/scala/com/twitter/storehaus/algebra/MergeableStore.scala [109:152]
private[algebra] def collectWithFailures[K, V](
fs: Iterator[(K, Future[V])], size: Int): Future[(Map[K, V], Map[K, Throwable])] = {
if (!fs.hasNext) {
Future.value((Map.empty[K, V], Map.empty[K, Throwable]))
} else {
val results = new AtomicReferenceArray[(K, Try[V])](size)
val countdown = new AtomicInteger(size)
val pResult = new Promise[(Map[K, V], Map[K, Throwable])]
@inline
def collectResults() = {
if (countdown.decrementAndGet() == 0) {
var successes = Map.empty[K, V]
var failures = Map.empty[K, Throwable]
var ri = 0
while (ri < size) {
results.get(ri) match {
case (k, Return(v)) => successes = successes + (k -> v)
case (k, Throw(t)) => failures = failures + (k -> t)
}
ri += 1
}
pResult.setValue((successes, failures))
}
}
var i = 0
while(fs.hasNext) {
val (k, fv) = fs.next
val j = i // Need to make sure we close over a copy of i and not i itself
fv respond {
case Return(v) =>
results.set(j, k -> Return(v))
collectResults()
case Throw(cause) =>
results.set(j, k -> Throw(cause))
collectResults()
}
i += 1
}
pResult
}
}