in mobius-rx/src/main/java/com/spotify/mobius/rx/RxEventSources.java [55:81]
public static <E> EventSource<E> fromObservables(Observable<E>... observables) {
final Observable<E> eventSource = Observable.merge(observables);
return new EventSource<E>() {
@Nonnull
@Override
public Disposable subscribe(final Consumer<E> eventConsumer) {
final AtomicBoolean disposed = new AtomicBoolean();
final Subscription subscription =
eventSource.subscribe(
value -> {
synchronized (disposed) {
if (!disposed.get()) {
eventConsumer.accept(value);
}
}
},
RxJavaHooks::onError);
return () -> {
synchronized (disposed) {
subscription.unsubscribe();
disposed.set(true);
}
};
}
};
}