in mobius-rx/src/main/java/com/spotify/mobius/rx/RxEventSources.java [90:118]
public static <E> Observable<E> toObservable(
final EventSource<E> eventSource, BackpressureMode backpressureMode) {
checkNotNull(eventSource);
checkNotNull(backpressureMode);
return Observable.create(
new Action1<Emitter<E>>() {
@Override
public void call(final Emitter<E> emitter) {
final Disposable disposable =
eventSource.subscribe(
new Consumer<E>() {
@Override
public void accept(E value) {
emitter.onNext(value);
}
});
emitter.setCancellation(
new Cancellable() {
@Override
public void cancel() throws Exception {
disposable.dispose();
}
});
}
},
backpressureMode);
}