in src/core/server/src/workers/single.rs [99:167]
fn read(&mut self, token: Token) -> Result<()> {
let session = self
.sessions
.get_mut(token.0)
.ok_or_else(|| Error::new(ErrorKind::Other, "non-existant session"))?;
// fill the session
map_result(session.fill())?;
// process up to one pending request
match session.receive() {
Ok(request) => {
let response = self.storage.execute(&request);
PROCESS_REQ.increment();
if response.should_hangup() {
let _ = session.send(response);
return Err(Error::new(ErrorKind::Other, "should hangup"));
}
request.klog(&response);
match session.send(response) {
Ok(_) => {
// attempt to flush immediately if there's now data in
// the write buffer
if session.write_pending() > 0 {
match session.flush() {
Ok(_) => Ok(()),
Err(e) => map_err(e),
}?;
}
// reregister to get writable event
if session.write_pending() > 0 {
let interest = session.interest();
if self
.poll
.registry()
.reregister(session, token, interest)
.is_err()
{
return Err(Error::new(ErrorKind::Other, "failed to reregister"));
}
}
// if there's still data to read, put the token on the
// pending queue
if session.remaining() > 0 {
self.pending.push_back(token);
}
Ok(())
}
Err(e) => {
if e.kind() == ErrorKind::WouldBlock {
Ok(())
} else {
Err(e)
}
}
}
}
Err(e) => {
if e.kind() == ErrorKind::WouldBlock {
Ok(())
} else {
Err(e)
}
}
}
}