override def multiPut[K1 <: K]()

in storehaus-core/src/main/scala/com/twitter/storehaus/WriteThroughStore.scala [56:92]


  override def multiPut[K1 <: K](kvs: Map[K1, Option[V]]): Map[K1, Future[Unit]] = {
    val f : Future[Map[K1, Either[Unit, Exception]]] = {
      // write keys to backing store first
      val storeResults : Map[K1, Future[Either[Unit, Exception]]] =
        backingStore.multiPut(kvs).map { case (k, fut) =>
          (k, fut.map { u: Unit => Left(u) }.rescue { case x: Exception => Future.value(Right(x)) })
        }

      // perform cache operations based on how writes to backing store go
      FutureOps.mapCollect(storeResults).flatMap { storeResult : Map[K1, Either[Unit, Exception]] =>
        val failedKeys = storeResult.filter { _._2.isRight }.keySet
        val succeededKeys = storeResult.filter { _._2.isLeft }.keySet

        // write updated keys to cache, best effort
        val keysToUpdate = kvs.filterKeys(succeededKeys.contains) ++ {
            // optionally invalidate cached copy when write to backing store fails
            if (invalidate) { failedKeys.map { k => (k, None) }.toMap }
            else { Map.empty }
          }

        FutureOps.mapCollect(cache.multiPut(keysToUpdate))(FutureCollector.bestEffort)
          .map { f => storeResult }
        // return original writes made to backing store
        // once cache operations are complete
      }
    }

    // throw original exception for any writes that failed
    FutureOps.liftValues(kvs.keySet, f, { (k: K1) => Future.None })
      .map { case kv: (K1, Future[Either[Unit, Exception]]) =>
        val transform = kv._2.map {
          case Left(optv) => optv
          case Right(x) => throw x
        }
        (kv._1, transform)
      }
  }