def multiMergeFromMultiSet[K, V]()

in storehaus-algebra/src/main/scala/com/twitter/storehaus/algebra/MergeableStore.scala [44:104]


  def multiMergeFromMultiSet[K, V](store: Store[K, V], kvs: Map[K, V],
    missingfn: (K) => Future[Option[V]] = FutureOps.missingValueFor _)
    (implicit collect: FutureCollector, sg: Semigroup[V]): Map[K, Future[Option[V]]] =
    multiMergeFromMultiSet(store, kvs)

  /**
  * Implements multiMerge functionality in terms of an underlying
  * store's multiGet and multiSet.
  */
  def multiMergeFromMultiSet[K, V](store: Store[K, V], kvs: Map[K, V])
    (implicit sg: Semigroup[V]): Map[K, Future[Option[V]]] = {
    val keySet = kvs.keySet

    // Iterator to avoid creating an intermediate map
    val mGetResult: Iterator[(K, Future[(Option[V], Option[V])])] =
      store.multiGet(keySet).iterator.map { case (k, futureOptV) =>
        val newFOptV = futureOptV.map { oldV: Option[V] =>
          val incV = kvs(k)
          val newV = addOpt(oldV, incV)
          (oldV, newV)
        }
        k -> newFOptV
      }

    val collectedMGetResult = collectWithFailures(mGetResult, kvs.size)
    val getSuccesses = collectedMGetResult.map(_._1)
    val getFailures = collectedMGetResult.map(_._2)

    val mPutResultsFut: Future[Map[K, Future[Unit]]] =
      getSuccesses.map { ss: Map[K, (Option[V], Option[V])] =>
        store.multiPut(ss.mapValues(_._2))
      }

    @inline
    def mapToOldValue(k: K, fUnit: Future[Unit]): Future[Option[V]] =
      fUnit.flatMap { _ =>
        getSuccesses.map { ss => ss(k)._1 }
      }

    @inline
    def lookupGetFailure(k: K): Future[Option[V]] =
      getFailures.flatMap { failures =>
        Future.exception(failures.get(k).getOrElse(new MissingValueException[K](k)))
      }

    /**
     *  A bit complex but it saves us an intermediate map creation. Here's the logic:
     *  If key is present in put results map successful ones to old value, failures remain.
     *  If not in put results then map to corresponding get failure.
     *  Ultimately we will have successful puts(mapped to old value), put failures and get failures.
     */
    @inline
    def keyMapFn(k: K): Future[Option[V]] = mPutResultsFut.flatMap { mPutResults =>
      mPutResults.get(k) match {
        case Some(fUnit) => mapToOldValue(k, fUnit)
        case None => lookupGetFailure(k)
      }
    }

    CollectionOps.zipWith(keySet)(keyMapFn)
  }