in finagle-memcached/src/main/scala/com/twitter/finagle/memcached/protocol/text/client/ClientDecoder.scala [80:150]
final def decodeData(buffer: Buf, results: mutable.Buffer[R]): Unit = state match {
case AwaitingData(valuesSoFar, tokens, bytesNeeded) =>
// The framer should have given us the right sized Buf
if (buffer.length != bytesNeeded) {
throw new IllegalArgumentException(
s"Expected to receive a buffer of $bytesNeeded bytes but " +
s"only received ${buffer.length} bytes"
)
}
state = AwaitingResponseOrEnd(valuesSoFar :+ parseValue(tokens, buffer))
case AwaitingResponse =>
val tokens = ParserUtils.splitOnWhitespace(buffer)
val dataBytes = needsData(tokens)
if (dataBytes == -1) {
if (isEnd(tokens)) {
results += parseResponseValues(Nil)
} else if (isStats(tokens)) {
state = AwaitingStatsOrEnd(Vector(tokens))
} else {
results += parseResponse(tokens)
}
} else {
// We are waiting for data next
state = AwaitingData(Nil, tokens, dataBytes)
}
case AwaitingStatsOrEnd(linesSoFar) =>
val tokens = ParserUtils.splitOnWhitespace(buffer)
if (isEnd(tokens)) {
state = AwaitingResponse
results += parseStatLines(linesSoFar)
} else if (isStats(tokens)) {
state = AwaitingStatsOrEnd(linesSoFar :+ tokens)
} else {
val ex = new ServerError("Invalid reply from STATS command")
state = Failed(ex)
throw ex
}
case AwaitingResponseOrEnd(valuesSoFar) =>
val tokens = ParserUtils.splitOnWhitespace(buffer)
val bytesNeeded = needsData(tokens)
if (bytesNeeded == -1) {
if (isEnd(tokens)) {
state = AwaitingResponse
results += parseResponseValues(valuesSoFar)
} else {
// This is a problem: if it wasn't a value line, it should have been an END.
val bufString =
tokens.foldLeft("") { (acc, buffer) =>
acc + Buf.Utf8.unapply(buffer).getOrElse("<non-string token>") + " "
}
val ex = new ServerError(
s"Server returned invalid response when values or END was expected: $bufString"
)
state = Failed(ex)
throw ex
}
} else {
state = AwaitingData(valuesSoFar, tokens, bytesNeeded)
}
case Failed(cause) =>
val msg = "Failed Memcached decoder called after previous decoding failure."
val ex = new IllegalStateException(msg, cause)
log.error(msg, ex)
throw ex
}