in src/main/java/com/twitter/whiskey/net/SessionManager.java [59:112]
void queue(final RequestOperation operation) {
final int currentConnectivity = connectivity;
if (currentConnectivity == OFFLINE) {
// TODO: determine exception/message
operation.fail(new ConnectException("unable to connect to host"));
return;
}
int openSessionCount = openSessionMap.get(currentConnectivity).size();
// If an active session with capacity is available in the pool, dispatch the request
// operation to it. Rotate sessions to distribute load across the pool.
Session session;
for (int i = 0; i < openSessionCount; i++) {
session = openSessionMap.removeFirst(currentConnectivity);
if (!session.isClosed()) {
openSessionMap.put(currentConnectivity, session);
if (session.isActive() && session.getCapacity() > 0) {
session.queue(operation);
return;
}
}
}
// If no active sessions are available, queue the operation locally.
// Listen for cancellation/timeout.
operation.addListener(new Listener<Response>() {
@Override
public void onComplete(Response result) {}
@Override
public void onError(Throwable throwable) {
if (pendingOperations.contains(operation)) {
pendingOperations.remove(operation);
}
}
@Override
public Executor getExecutor() {
return RunLoop.instance();
}
});
pendingOperations.add(operation);
// Open new socket connection(s) as necessary.
openSessionCount = openSessionMap.get(currentConnectivity).size();
final int pendingSocketCount = pendingSocketMap.get(currentConnectivity).size();
final int availableCount = maxConnectionsToOrigin - pendingSocketCount - openSessionCount;
for (int i = 0; i < availableCount; i++) {
createSocket(currentConnectivity);
}
}