protected def doMerge()

in storehaus-memcache/src/main/scala/com/twitter/storehaus/memcache/MergeableMemcacheStore.scala [78:116]


  protected def doMerge(kv: (K, V), currentRetry: Int) : Future[Option[V]] = {
    val key = kfn(kv._1)
    currentRetry > maxRetries match {
      case false => // use 'gets' api to obtain casunique token
        underlying.client.gets(key).flatMap {
          case Some((cbValue, casunique)) =>
            inj.invert(BufChannelBuffer(cbValue)) match {
              case Success(v) => // attempt cas
                val resV = semigroup.plus(v, kv._2)
                val buf = ChannelBufferBuf.Owned(inj.apply(resV))
                underlying.client.checkAndSet(
                  key,
                  underlying.flag,
                  underlying.ttl.fromNow,
                  buf,
                  casunique
                ).flatMap {
                  case CasResult.Stored => Future.value(Some(v))
                  case _ => doMerge(kv, currentRetry + 1) // retry
                }
              case Failure(ex) => Future.exception(ex)
            }
          // no pre-existing value, try to 'add' it
          case None =>
            val buf = ChannelBufferBuf.Owned(inj.apply(kv._2))
            underlying.client.add(
              key,
              underlying.flag,
              underlying.ttl.fromNow,
              buf
            ).flatMap { success =>
              if (success.booleanValue) Future.None
              else doMerge(kv, currentRetry + 1) // retry, next retry should call cas
            }
        }
      // no more retries
      case true => Future.exception(new MergeFailedException(key))
    }
  }