in src/worker.rs [263:317]
fn do_read(&mut self, token: Token) -> Result<(), Error> {
let session = get_session_mut!(self, token)?;
match session.fill_buf().map(|b| b.len()) {
Ok(0) => {
// server hangup
Err(Error::new(ErrorKind::Other, "server hangup"))
}
Ok(_) => {
// request parsing
while session.outstanding() > 0 {
let response = self.codec.decode(session);
match response {
Ok(()) => {
session.set_outstanding(session.outstanding() - 1);
RESPONSE.increment();
if let Some(ref heatmap) = self.request_heatmap {
let now = Instant::now();
let elapsed = now - session.timestamp();
let us = elapsed.as_nanos() as u64 / 1_000;
heatmap.increment(now, us, 1);
if let Some(ref waterfall) = self.request_waterfall {
waterfall.increment(now, elapsed.as_nanos() as u64, 1);
}
}
}
Err(e) => match e {
ParseError::Incomplete => {
return Ok(());
}
_ => {
return Err(Error::from(std::io::ErrorKind::InvalidData));
}
},
}
}
self.ready_queue.push_back(token);
Ok(())
}
Err(e) => {
match e.kind() {
ErrorKind::WouldBlock => {
// spurious read
let _ = self.reregister(token);
Ok(())
}
ErrorKind::Interrupted => self.do_read(token),
_ => {
trace!("error reading for session: {:?} {:?}", session, e);
Err(e)
}
}
}
}
}