in storehaus-core/src/main/scala/com/twitter/storehaus/PivotOps.scala [80:111]
def multiPut[K, K1 <: K, OuterK, InnerK, V](
store: Store[OuterK, Map[InnerK, V]], kvs: Map[K1, Option[V]])
(split: K => (OuterK, InnerK))
(implicit collect: FutureCollector): Map[K1, Future[Unit]] = {
val pivoted = CollectionOps.pivotMap[K1, OuterK, InnerK, Option[V]](kvs)(split)
// Input data merged with all relevant data from the underlying
// store.
val mergedResult: Map[OuterK, Future[Option[Map[InnerK, V]]]] =
plusM(
multiGetFiltered(store, pivoted.keySet) { case (outerK, innerK) =>
!pivoted(outerK).contains(innerK)
},
collectPivoted(pivoted)
).mapValues { _.map { _.map { _.toMap } } }
// Result of a multiPut of all affected pairs in the underlying
// store.
val submitted: Future[Map[OuterK, Future[Unit]]] =
FutureOps.mapCollect(mergedResult)(collect).map(store.multiPut)
// The final flatMap returns a map of K to the future responsible
// for writing K's value into the underlying store. Due to
// packing, many Ks will reference the same Future[Unit].
kvs.flatMap {
case (k, _) =>
val (outerK, _) = split(k)
(1 to pivoted(outerK).size).map { _ =>
k -> submitted.flatMap { _.apply(outerK) }
}
}
}