override def multiPut[K1 <: MySqlValue]()

in storehaus-mysql/src/main/scala/com/twitter/storehaus/mysql/MySqlStore.scala [179:234]


  override def multiPut[K1 <: MySqlValue](
      kvs: Map[K1, Option[MySqlValue]]): Map[K1, Future[Unit]] = {
    // batched version of put. the batch is split into insert, update, and delete statements.
    // reduce your batch size if you are hitting mysql packet limit:
    // http://dev.mysql.com/doc/refman/5.1/en/packet-too-large.html
    val putResult = startTransaction.flatMap { t =>
      FutureOps.mapCollect(multiGet(kvs.keySet)).flatMap { result =>
        val existingKeys = result.filter(_._2.isDefined).keySet
        val newKeys = result.filter(_._2.isEmpty).keySet

        // handle inserts for new keys
        val insertF =
          if (newKeys.isEmpty) {
            Future.Unit
          } else {
            // do not include None values in insert query
            val insertKvs = newKeys.iterator
              .collect { case k if kvs.contains(k) && kvs(k).isDefined => k -> kvs(k) }
              .toMap.mapValues(_.get)
            if (insertKvs.isEmpty) Future.Unit
            else executeMultiInsert(insertKvs).unit
          }

        // handle update and/or delete for existing keys
        val existingKvs = existingKeys.map(k => k -> kvs.getOrElse(k, None))

        // do not include None values in update query
        val updateKvs = existingKvs.iterator.collect { case (k, Some(v)) => (k, v) }.toMap
        lazy val updateF =
          if (updateKvs.isEmpty) Future.Unit
          else executeMultiUpdate(updateKvs).unit

        // deletes
        val deleteKeys = existingKvs.iterator.collect { case (k, None) => k }.toSeq
        lazy val deleteF =
          if (deleteKeys.isEmpty) {
            Future.Unit
          } else {
            val deleteSql = MULTI_DELETE_SQL_PREFIX +
              buildPlaceholders("?", deleteKeys.size, ",", "(", ")")
            val deleteParams = deleteKeys
              .map(String2MySqlValueInjection.invert(_).map(s => Parameter.wrap(s.getBytes)))
            toTwitterFuture(deleteParams).flatMap(ps => client.prepare(deleteSql)(ps: _*).unit)
          }

        // sequence the three queries. the inner futures are lazy
        for {
          _ <- insertF
          _ <- updateF
          _ <- deleteF
          _ <- commitOrRollback()
        } yield ()
      }
    }
    kvs.mapValues(v => putResult)
  }