in storehaus-mysql/src/main/scala/com/twitter/storehaus/mysql/MySqlStore.scala [179:234]
override def multiPut[K1 <: MySqlValue](
kvs: Map[K1, Option[MySqlValue]]): Map[K1, Future[Unit]] = {
// batched version of put. the batch is split into insert, update, and delete statements.
// reduce your batch size if you are hitting mysql packet limit:
// http://dev.mysql.com/doc/refman/5.1/en/packet-too-large.html
val putResult = startTransaction.flatMap { t =>
FutureOps.mapCollect(multiGet(kvs.keySet)).flatMap { result =>
val existingKeys = result.filter(_._2.isDefined).keySet
val newKeys = result.filter(_._2.isEmpty).keySet
// handle inserts for new keys
val insertF =
if (newKeys.isEmpty) {
Future.Unit
} else {
// do not include None values in insert query
val insertKvs = newKeys.iterator
.collect { case k if kvs.contains(k) && kvs(k).isDefined => k -> kvs(k) }
.toMap.mapValues(_.get)
if (insertKvs.isEmpty) Future.Unit
else executeMultiInsert(insertKvs).unit
}
// handle update and/or delete for existing keys
val existingKvs = existingKeys.map(k => k -> kvs.getOrElse(k, None))
// do not include None values in update query
val updateKvs = existingKvs.iterator.collect { case (k, Some(v)) => (k, v) }.toMap
lazy val updateF =
if (updateKvs.isEmpty) Future.Unit
else executeMultiUpdate(updateKvs).unit
// deletes
val deleteKeys = existingKvs.iterator.collect { case (k, None) => k }.toSeq
lazy val deleteF =
if (deleteKeys.isEmpty) {
Future.Unit
} else {
val deleteSql = MULTI_DELETE_SQL_PREFIX +
buildPlaceholders("?", deleteKeys.size, ",", "(", ")")
val deleteParams = deleteKeys
.map(String2MySqlValueInjection.invert(_).map(s => Parameter.wrap(s.getBytes)))
toTwitterFuture(deleteParams).flatMap(ps => client.prepare(deleteSql)(ps: _*).unit)
}
// sequence the three queries. the inner futures are lazy
for {
_ <- insertF
_ <- updateF
_ <- deleteF
_ <- commitOrRollback()
} yield ()
}
}
kvs.mapValues(v => putResult)
}