in storehaus-algebra/src/main/scala/com/twitter/storehaus/algebra/MergeableStore.scala [44:104]
def multiMergeFromMultiSet[K, V](store: Store[K, V], kvs: Map[K, V],
missingfn: (K) => Future[Option[V]] = FutureOps.missingValueFor _)
(implicit collect: FutureCollector, sg: Semigroup[V]): Map[K, Future[Option[V]]] =
multiMergeFromMultiSet(store, kvs)
/**
* Implements multiMerge functionality in terms of an underlying
* store's multiGet and multiSet.
*/
def multiMergeFromMultiSet[K, V](store: Store[K, V], kvs: Map[K, V])
(implicit sg: Semigroup[V]): Map[K, Future[Option[V]]] = {
val keySet = kvs.keySet
// Iterator to avoid creating an intermediate map
val mGetResult: Iterator[(K, Future[(Option[V], Option[V])])] =
store.multiGet(keySet).iterator.map { case (k, futureOptV) =>
val newFOptV = futureOptV.map { oldV: Option[V] =>
val incV = kvs(k)
val newV = addOpt(oldV, incV)
(oldV, newV)
}
k -> newFOptV
}
val collectedMGetResult = collectWithFailures(mGetResult, kvs.size)
val getSuccesses = collectedMGetResult.map(_._1)
val getFailures = collectedMGetResult.map(_._2)
val mPutResultsFut: Future[Map[K, Future[Unit]]] =
getSuccesses.map { ss: Map[K, (Option[V], Option[V])] =>
store.multiPut(ss.mapValues(_._2))
}
@inline
def mapToOldValue(k: K, fUnit: Future[Unit]): Future[Option[V]] =
fUnit.flatMap { _ =>
getSuccesses.map { ss => ss(k)._1 }
}
@inline
def lookupGetFailure(k: K): Future[Option[V]] =
getFailures.flatMap { failures =>
Future.exception(failures.get(k).getOrElse(new MissingValueException[K](k)))
}
/**
* A bit complex but it saves us an intermediate map creation. Here's the logic:
* If key is present in put results map successful ones to old value, failures remain.
* If not in put results then map to corresponding get failure.
* Ultimately we will have successful puts(mapped to old value), put failures and get failures.
*/
@inline
def keyMapFn(k: K): Future[Option[V]] = mPutResultsFut.flatMap { mPutResults =>
mPutResults.get(k) match {
case Some(fUnit) => mapToOldValue(k, fUnit)
case None => lookupGetFailure(k)
}
}
CollectionOps.zipWith(keySet)(keyMapFn)
}