in storehaus-core/src/main/scala/com/twitter/storehaus/BatchedStore.scala [44:64]
override def put(kv: (K, Option[V])): Future[Unit] = store.put(kv)
override def multiPut[K1 <: K](kvs: Map[K1, Option[V]]): Map[K1, Future[Unit]] = {
kvs
.grouped(maxMultiPutSize)
.map{ keyBatch: Map[K1, Option[V]] =>
// mapCollect the result of the multiput so we can release the permit at the end
val batchResult: Future[Map[K1, Unit]] = writeConnectionLock
.acquire()
.flatMap { permit =>
FutureOps.mapCollect(store.multiPut(keyBatch)).ensure{ permit.release() }
}
// now undo the mapCollect to yield a Map of future
FutureOps.liftValues(keyBatch.keySet, batchResult)
}
.reduceOption(_ ++ _)
.getOrElse(Map.empty)
}