in storehaus-core/src/main/scala/com/twitter/storehaus/TunableReplicatedStore.scala [170:198]
override def put(kv: (K, Option[V])): Future[Unit] = {
if (stores.isEmpty) {
Future.value(())
} else {
val expected = writeConsistency.expectedSuccesses(stores)
val success = new AtomicInteger(0)
val fail = new AtomicInteger(0)
val futures = stores.map { s => s.put(kv) }
val promise = Promise.interrupts[Unit](futures: _*)
for (f <- futures) {
f.onSuccess { result =>
if (success.incrementAndGet >= expected) promise.updateIfEmpty(Return(()))
} onFailure { e =>
if (fail.incrementAndGet > stores.size - expected) {
// optionally delete key in all replicas as part of rollback (best effort)
if (writeRollback && (
writeConsistency == ConsistencyLevel.Quorum ||
writeConsistency == ConsistencyLevel.All)) {
stores.foreach { s => s.put((kv._1, None)) }
}
promise.updateIfEmpty(Throw(new WriteFailedException(kv._1)))
}
} ensure {
if (success.get + fail.get >= stores.size) promise.updateIfEmpty(Return(()))
}
}
promise
}
}