in src/codec/redis.rs [173:267]
fn decode(&self, buffer: &mut Session) -> Result<(), ParseError> {
// no-copy borrow as a slice
let buf: &[u8] = (*buffer).buffer();
if buf.len() < 1 {
return Err(ParseError::Incomplete);
}
let first_char = &buf[0..1];
match str::from_utf8(first_char) {
Ok("+") => {
// simple string response
let mut lines = buf.windows(2);
if let Some(line_end) = lines.position(|w| w == b"\r\n") {
let msg = &buf[1..line_end];
match str::from_utf8(msg) {
Ok("OK") | Ok("PONG") => {
let _ = buffer.consume(line_end + 2);
Ok(())
}
_ => Err(ParseError::Unknown),
}
} else {
Err(ParseError::Incomplete)
}
}
Ok("-") => {
// error response
Err(ParseError::Error)
}
Ok(":") => {
// numeric response
let mut lines = buf.windows(2);
if let Some(line_end) = lines.position(|w| w == b"\r\n") {
let msg = &buf[1..line_end];
match str::from_utf8(msg) {
Ok(msg) => match msg.parse::<i64>() {
Ok(_) => {
let _ = buffer.consume(line_end + 2);
Ok(())
}
Err(_) => Err(ParseError::Unknown),
},
Err(_) => Err(ParseError::Unknown),
}
} else {
Err(ParseError::Incomplete)
}
}
Ok("$") => {
// bulk string
let mut lines = buf.windows(2);
if let Some(line_end) = lines.position(|w| w == b"\r\n") {
let msg = &buf[1..line_end];
match str::from_utf8(msg) {
Ok("-1") => {
let _ = buffer.consume(line_end + 2);
Ok(())
}
Ok(n) => {
let len = n.parse::<usize>().map_err(|_| ParseError::Unknown)?;
let response_end = len + line_end + 4;
if response_end <= buf.len() {
metrics::RESPONSE_HIT.increment();
let _ = buffer.consume(response_end);
Ok(())
} else {
Err(ParseError::Incomplete)
}
}
Err(_) => Err(ParseError::Unknown),
}
} else {
Err(ParseError::Incomplete)
}
}
Ok("*") => {
// arrays
let msg = &buf[1..buf.len() - 2];
match str::from_utf8(msg) {
Ok("-1") => {
let response_end = buf.len();
let _ = buffer.consume(response_end);
Ok(())
}
Ok(_) => {
// TODO: implement array parsing
Err(ParseError::Unknown)
}
Err(_) => Err(ParseError::Unknown),
}
}
_ => Err(ParseError::Unknown),
}
}