private void drainQueue()

in mobius-core/src/main/java/com/spotify/mobius/FireAtLeastOnceObserver.java [58:92]


  private void drainQueue() {
    if (!processing.compareAndSet(false, true)) {
      // already draining the queue
      return;
    }

    // We are now in a safe section that can only execute on one thread at the time.

    // If this is the first time, try to emit a value that only can be emitted if it is first.
    if (!hasStartedEmitting) {
      hasStartedEmitting = true;
      AtomicReference<V> wrappedValue = firstValue.get();
      if (wrappedValue != null) {
        delegate.accept(wrappedValue.get());
      }
    }

    boolean done = false;

    while (!done) {
      try {
        for (V toSend = queue.poll(); toSend != null; toSend = queue.poll()) {
          delegate.accept(toSend);
        }

      } finally {
        processing.set(false); // leave the safe section

        // If the queue is empty or if we can't reacquire the processing lock, we're done,
        // because either there is nothing to do, or someone else will process the queue.
        // Note: it's important that we check the queue first, otherwise we might leak the lock.
        done = queue.isEmpty() || !processing.compareAndSet(false, true);
      }
    }
  }