in storehaus-memcache/src/main/scala/com/twitter/storehaus/memcache/MergeableMemcacheStore.scala [78:116]
protected def doMerge(kv: (K, V), currentRetry: Int) : Future[Option[V]] = {
val key = kfn(kv._1)
currentRetry > maxRetries match {
case false => // use 'gets' api to obtain casunique token
underlying.client.gets(key).flatMap {
case Some((cbValue, casunique)) =>
inj.invert(BufChannelBuffer(cbValue)) match {
case Success(v) => // attempt cas
val resV = semigroup.plus(v, kv._2)
val buf = ChannelBufferBuf.Owned(inj.apply(resV))
underlying.client.checkAndSet(
key,
underlying.flag,
underlying.ttl.fromNow,
buf,
casunique
).flatMap {
case CasResult.Stored => Future.value(Some(v))
case _ => doMerge(kv, currentRetry + 1) // retry
}
case Failure(ex) => Future.exception(ex)
}
// no pre-existing value, try to 'add' it
case None =>
val buf = ChannelBufferBuf.Owned(inj.apply(kv._2))
underlying.client.add(
key,
underlying.flag,
underlying.ttl.fromNow,
buf
).flatMap { success =>
if (success.booleanValue) Future.None
else doMerge(kv, currentRetry + 1) // retry, next retry should call cas
}
}
// no more retries
case true => Future.exception(new MergeFailedException(key))
}
}