public static EventSource fromObservables()

in mobius-rx/src/main/java/com/spotify/mobius/rx/RxEventSources.java [55:81]


  public static <E> EventSource<E> fromObservables(Observable<E>... observables) {
    final Observable<E> eventSource = Observable.merge(observables);

    return new EventSource<E>() {
      @Nonnull
      @Override
      public Disposable subscribe(final Consumer<E> eventConsumer) {
        final AtomicBoolean disposed = new AtomicBoolean();
        final Subscription subscription =
            eventSource.subscribe(
                value -> {
                  synchronized (disposed) {
                    if (!disposed.get()) {
                      eventConsumer.accept(value);
                    }
                  }
                },
                RxJavaHooks::onError);
        return () -> {
          synchronized (disposed) {
            subscription.unsubscribe();
            disposed.set(true);
          }
        };
      }
    };
  }