in storehaus-core/src/main/scala/com/twitter/storehaus/WriteThroughStore.scala [56:92]
override def multiPut[K1 <: K](kvs: Map[K1, Option[V]]): Map[K1, Future[Unit]] = {
val f : Future[Map[K1, Either[Unit, Exception]]] = {
// write keys to backing store first
val storeResults : Map[K1, Future[Either[Unit, Exception]]] =
backingStore.multiPut(kvs).map { case (k, fut) =>
(k, fut.map { u: Unit => Left(u) }.rescue { case x: Exception => Future.value(Right(x)) })
}
// perform cache operations based on how writes to backing store go
FutureOps.mapCollect(storeResults).flatMap { storeResult : Map[K1, Either[Unit, Exception]] =>
val failedKeys = storeResult.filter { _._2.isRight }.keySet
val succeededKeys = storeResult.filter { _._2.isLeft }.keySet
// write updated keys to cache, best effort
val keysToUpdate = kvs.filterKeys(succeededKeys.contains) ++ {
// optionally invalidate cached copy when write to backing store fails
if (invalidate) { failedKeys.map { k => (k, None) }.toMap }
else { Map.empty }
}
FutureOps.mapCollect(cache.multiPut(keysToUpdate))(FutureCollector.bestEffort)
.map { f => storeResult }
// return original writes made to backing store
// once cache operations are complete
}
}
// throw original exception for any writes that failed
FutureOps.liftValues(kvs.keySet, f, { (k: K1) => Future.None })
.map { case kv: (K1, Future[Either[Unit, Exception]]) =>
val transform = kv._2.map {
case Left(optv) => optv
case Right(x) => throw x
}
(kv._1, transform)
}
}