in src/core/proxy/src/listener.rs [211:254]
fn session_event(&mut self, event: &Event) {
let token = event.token();
if event.is_error() {
LISTENER_EVENT_ERROR.increment();
self.close(token);
return;
}
if event.is_readable() {
LISTENER_EVENT_READ.increment();
if self.read(token).is_err() {
self.close(token);
return;
}
}
match self.handshake(token) {
Ok(_) => {
// handshake is complete, send the session to a worker thread
let mut session = self.sessions.remove(token.0);
for attempt in 1..=QUEUE_RETRIES {
if let Err(s) = self.session_queue.try_send_any(session) {
if attempt == QUEUE_RETRIES {
LISTENER_SESSION_DISCARD.increment();
} else {
let _ = self.session_queue.wake();
}
session = s;
} else {
break;
}
}
// if pushing to the session queues fails, the session will be
// closed on drop here
}
Err(e) => match e.kind() {
ErrorKind::WouldBlock => {}
_ => {
self.close(token);
}
},
}
}