in mobius-core/src/main/java/com/spotify/mobius/FireAtLeastOnceObserver.java [58:92]
private void drainQueue() {
if (!processing.compareAndSet(false, true)) {
// already draining the queue
return;
}
// We are now in a safe section that can only execute on one thread at the time.
// If this is the first time, try to emit a value that only can be emitted if it is first.
if (!hasStartedEmitting) {
hasStartedEmitting = true;
AtomicReference<V> wrappedValue = firstValue.get();
if (wrappedValue != null) {
delegate.accept(wrappedValue.get());
}
}
boolean done = false;
while (!done) {
try {
for (V toSend = queue.poll(); toSend != null; toSend = queue.poll()) {
delegate.accept(toSend);
}
} finally {
processing.set(false); // leave the safe section
// If the queue is empty or if we can't reacquire the processing lock, we're done,
// because either there is nothing to do, or someone else will process the queue.
// Note: it's important that we check the queue first, otherwise we might leak the lock.
done = queue.isEmpty() || !processing.compareAndSet(false, true);
}
}
}