override def multiMerge[K1 <: MySqlValue]()

in storehaus-mysql/src/main/scala/com/twitter/storehaus/mysql/MergeableMySqlStore.scala [43:89]


  override def multiMerge[K1 <: MySqlValue](kvs: Map[K1, V]): Map[K1, Future[Option[V]]] = {
    val mergeResult : Future[Map[K1, Option[V]]] = underlying.startTransaction.flatMap { u: Unit =>
      FutureOps.mapCollect(multiGet(kvs.keySet)).flatMap { result: Map[K1, Option[V]] =>
        val existingKeys = result.filter(_._2.isDefined).keySet
        val newKeys = result.filter(_._2.isEmpty).keySet

        // handle inserts for new keys
        val insertF =
          if (newKeys.isEmpty) {
            Future.Unit
          } else {
            val insertKvs = newKeys.map(k => k -> kvs(k))
            if (insertKvs.isEmpty) Future.Unit
            else underlying.executeMultiInsert(insertKvs.toMap.mapValues(inj)).unit
          }

        // handle update/merge for existing keys
        // lazy val realized inside of insertF.flatMap
        lazy val updateF =
          if (existingKeys.isEmpty) {
            Future.Unit
          } else {
            val existingKvs = existingKeys.map(k => k -> kvs(k))
            underlying.executeMultiUpdate(existingKvs.map { case (k, v) =>
              k -> inj(semigroup.plus(result(k).get, v))
            }.toMap).unit
          }

        // insert, update and commit or rollback accordingly
        insertF.flatMap { f =>
          updateF.flatMap { f =>
            underlying.commitTransaction.map { f =>
              // return values before the merge
              result
            }
          }
          .onFailure { case e: Exception =>
            underlying.rollbackTransaction.map { f =>
              // map values to exception
              result.mapValues { v => e }
            }
          }
        }
      }
    }
    FutureOps.liftValues(kvs.keySet, mergeResult)
  }