private[algebra] def collectWithFailures[K, V]()

in storehaus-algebra/src/main/scala/com/twitter/storehaus/algebra/MergeableStore.scala [109:152]


  private[algebra] def collectWithFailures[K, V](
    fs: Iterator[(K, Future[V])], size: Int): Future[(Map[K, V], Map[K, Throwable])] = {

    if (!fs.hasNext) {
      Future.value((Map.empty[K, V], Map.empty[K, Throwable]))
    } else {
      val results = new AtomicReferenceArray[(K, Try[V])](size)
      val countdown = new AtomicInteger(size)
      val pResult = new Promise[(Map[K, V], Map[K, Throwable])]

      @inline
      def collectResults() = {
        if (countdown.decrementAndGet() == 0) {
          var successes = Map.empty[K, V]
          var failures = Map.empty[K, Throwable]
          var ri = 0
          while (ri < size) {
            results.get(ri) match {
              case (k, Return(v)) => successes = successes + (k -> v)
              case (k, Throw(t)) =>  failures = failures + (k -> t)
            }
            ri += 1
          }
          pResult.setValue((successes, failures))
        }
      }

      var i = 0
      while(fs.hasNext) {
        val (k, fv) = fs.next
        val j = i // Need to make sure we close over a copy of i and not i itself
        fv respond {
          case Return(v) =>
            results.set(j, k -> Return(v))
            collectResults()
          case Throw(cause) =>
            results.set(j, k -> Throw(cause))
            collectResults()
        }
        i += 1
      }
      pResult
    }
  }