public static EventSource fromObservables()

in mobius-rx2/src/main/java/com/spotify/mobius/rx2/RxEventSources.java [51:76]


  public static <E> EventSource<E> fromObservables(ObservableSource<E>... sources) {
    final Observable<E> eventSource = Observable.mergeArray(sources);

    return new EventSource<E>() {
      @Nonnull
      @Override
      public Disposable subscribe(final Consumer<E> eventConsumer) {
        final AtomicBoolean disposed = new AtomicBoolean();
        final io.reactivex.disposables.Disposable disposable =
            eventSource.subscribe(
                value -> {
                  synchronized (disposed) {
                    if (!disposed.get()) {
                      eventConsumer.accept(value);
                    }
                  }
                });
        return () -> {
          synchronized (disposed) {
            disposable.dispose();
            disposed.set(true);
          }
        };
      }
    };
  }