in mobius-rx3/src/main/java/com/spotify/mobius/rx3/RxEventSources.java [45:71]
public static <E> EventSource<E> fromObservables(@NonNull final ObservableSource<E>... sources) {
final Observable<E> eventSource = Observable.mergeArray(sources);
return new EventSource<E>() {
@Nonnull
@Override
public com.spotify.mobius.disposables.Disposable subscribe(
com.spotify.mobius.functions.Consumer<E> eventConsumer) {
final AtomicBoolean disposed = new AtomicBoolean();
final Disposable disposable =
eventSource.subscribe(
value -> {
synchronized (disposed) {
if (!disposed.get()) {
eventConsumer.accept(value);
}
}
});
return () -> {
synchronized (disposed) {
disposable.dispose();
disposed.set(true);
}
};
}
};
}