override def put()

in storehaus-core/src/main/scala/com/twitter/storehaus/TunableReplicatedStore.scala [170:198]


  override def put(kv: (K, Option[V])): Future[Unit] = {
    if (stores.isEmpty) {
      Future.value(())
    } else {
      val expected = writeConsistency.expectedSuccesses(stores)
      val success = new AtomicInteger(0)
      val fail = new AtomicInteger(0)
      val futures = stores.map { s => s.put(kv) }
      val promise = Promise.interrupts[Unit](futures: _*)
      for (f <- futures) {
        f.onSuccess { result =>
          if (success.incrementAndGet >= expected) promise.updateIfEmpty(Return(()))
        } onFailure { e =>
          if (fail.incrementAndGet > stores.size - expected) {
            // optionally delete key in all replicas as part of rollback (best effort)
            if (writeRollback && (
              writeConsistency == ConsistencyLevel.Quorum ||
                writeConsistency == ConsistencyLevel.All)) {
              stores.foreach { s => s.put((kv._1, None)) }
            }
            promise.updateIfEmpty(Throw(new WriteFailedException(kv._1)))
          }
        } ensure {
          if (success.get + fail.get >= stores.size) promise.updateIfEmpty(Return(()))
        }
      }
      promise
    }
  }