in mobius-rx3/src/main/java/com/spotify/mobius/rx3/RxMobiusLoop.java [55:99]
public ObservableSource<M> apply(@NonNull Observable<E> upstream) {
return Observable.create(
new ObservableOnSubscribe<M>() {
@Override
public void subscribe(@NonNull ObservableEmitter<M> emitter) throws Throwable {
final MobiusLoop<M, E, ?> loop;
if (startEffects == null) {
loop = loopFactory.startFrom(startModel);
} else {
loop = loopFactory.startFrom(startModel, startEffects);
}
loop.observe(
new com.spotify.mobius.functions.Consumer<M>() {
@Override
public void accept(M value) {
emitter.onNext(value);
}
});
final Disposable eventsDisposable =
upstream.subscribe(
new Consumer<E>() {
@Override
public void accept(E event) throws Throwable {
loop.dispatchEvent(event);
}
},
new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) throws Throwable {
emitter.onError(new UnrecoverableIncomingException(throwable));
}
});
emitter.setCancellable(
new Cancellable() {
@Override
public void cancel() throws Throwable {
loop.dispose();
eventsDisposable.dispose();
}
});
}
});
}