in src/main/java/com/twitter/whiskey/nio/RunLoop.java [157:231]
void run(final boolean blocking) {
loops++;
signal.set(false);
// Check standard tasks
Runnable currentTask;
while ((currentTask = tasks.poll()) != null) {
executions++;
currentTask.run();
}
long selectTimeout = 0;
// Check scheduled tasks and setup maximum delay
ScheduledRunnable nextScheduledTask;
while (!scheduledTasks.isEmpty()) {
nextScheduledTask = scheduledTasks.peek();
long now = clock.now();
if (nextScheduledTask.tolerance > 0 &&
nextScheduledTask.triggerPoint <= now - nextScheduledTask.tolerance) {
// Discard the task - we missed the tolerance window
scheduledTasks.poll();
} else if (nextScheduledTask.triggerPoint <= now) {
// It's time to run the task
executions++;
nextScheduledTask.run();
scheduledTasks.poll();
} else {
// Determine the select timeout and break
selectTimeout = nextScheduledTask.triggerPoint - now;
break;
}
}
int readyChannels = 0;
// Select
try {
selecting = true;
if (blocking && tasks.isEmpty() && !signal.get()) {
readyChannels = selector.select(selectTimeout);
} else {
readyChannels = selector.selectNow();
}
selecting = false;
} catch (IOException e) {
// Recovery would have to involve re-registering all sockets
// on a new Selector. Consider this fatal for now.
throw new RuntimeException(e);
}
if (readyChannels > 0) {
Set<SelectionKey> selected = selector.selectedKeys();
for (Iterator<SelectionKey> iterator = selected.iterator(); iterator.hasNext(); ) {
SelectionKey key = iterator.next();
iterator.remove();
Object attachment = key.attachment();
if (attachment instanceof Selectable) {
Selectable selectable = (Selectable) attachment;
if (key.isConnectable()) {
executions++;
selectable.onConnect();
} else if (key.isReadable()) {
executions++;
selectable.onReadable();
} else if (key.isWritable()) {
executions++;
selectable.onWriteable();
}
}
}
}
}