in storehaus-core/src/main/scala/com/twitter/storehaus/ReadThroughStore.scala [52:79]
override def multiGet[K1 <: K](ks: Set[K1]): Map[K1, Future[Option[V]]] = {
// attempt to read from cache first
val cacheResults : Map[K1, Future[Either[Option[V], Exception]]] =
cache.multiGet(ks).map { case (k, fut) =>
(k, fut.map { optv => Left(optv) } rescue { case x: Exception => Future.value(Right(x)) })
}
// attempt to read all failed keys and cache misses from backing store
val f: Future[Map[K1, Option[V]]] =
FutureOps.mapCollect(cacheResults).flatMap { cacheResult =>
val failedKeys = cacheResult.filter { _._2.isRight }.keySet
val responses = cacheResult.filter { _._2.isLeft }.map { case (k, r) => (k, r.left.get) }
val hits = responses.filter(_._2.isDefined)
val missedKeys = responses.filter { _._2.isEmpty }.keySet
val remaining = missedKeys ++ failedKeys
if (remaining.isEmpty) {
Future.value(hits) // no cache misses
} else {
FutureOps.mapCollect(backingStore.multiGet(remaining)).flatMap { storeResult =>
// write fetched keys to cache, best effort
FutureOps.mapCollect(cache.multiPut(storeResult))(FutureCollector.bestEffort)
.map { u => hits ++ storeResult }
}
}
}
FutureOps.liftValues(ks, f, { (k: K1) => Future.None })
}