in storehaus-mysql/src/main/scala/com/twitter/storehaus/mysql/MergeableMySqlStore.scala [43:89]
override def multiMerge[K1 <: MySqlValue](kvs: Map[K1, V]): Map[K1, Future[Option[V]]] = {
val mergeResult : Future[Map[K1, Option[V]]] = underlying.startTransaction.flatMap { u: Unit =>
FutureOps.mapCollect(multiGet(kvs.keySet)).flatMap { result: Map[K1, Option[V]] =>
val existingKeys = result.filter(_._2.isDefined).keySet
val newKeys = result.filter(_._2.isEmpty).keySet
// handle inserts for new keys
val insertF =
if (newKeys.isEmpty) {
Future.Unit
} else {
val insertKvs = newKeys.map(k => k -> kvs(k))
if (insertKvs.isEmpty) Future.Unit
else underlying.executeMultiInsert(insertKvs.toMap.mapValues(inj)).unit
}
// handle update/merge for existing keys
// lazy val realized inside of insertF.flatMap
lazy val updateF =
if (existingKeys.isEmpty) {
Future.Unit
} else {
val existingKvs = existingKeys.map(k => k -> kvs(k))
underlying.executeMultiUpdate(existingKvs.map { case (k, v) =>
k -> inj(semigroup.plus(result(k).get, v))
}.toMap).unit
}
// insert, update and commit or rollback accordingly
insertF.flatMap { f =>
updateF.flatMap { f =>
underlying.commitTransaction.map { f =>
// return values before the merge
result
}
}
.onFailure { case e: Exception =>
underlying.rollbackTransaction.map { f =>
// map values to exception
result.mapValues { v => e }
}
}
}
}
}
FutureOps.liftValues(kvs.keySet, mergeResult)
}