in mobius-rx2/src/main/java/com/spotify/mobius/rx2/RxEventSources.java [51:76]
public static <E> EventSource<E> fromObservables(ObservableSource<E>... sources) {
final Observable<E> eventSource = Observable.mergeArray(sources);
return new EventSource<E>() {
@Nonnull
@Override
public Disposable subscribe(final Consumer<E> eventConsumer) {
final AtomicBoolean disposed = new AtomicBoolean();
final io.reactivex.disposables.Disposable disposable =
eventSource.subscribe(
value -> {
synchronized (disposed) {
if (!disposed.get()) {
eventConsumer.accept(value);
}
}
});
return () -> {
synchronized (disposed) {
disposable.dispose();
disposed.set(true);
}
};
}
};
}