in mobius-rx2/src/main/java/com/spotify/mobius/rx2/RxMobiusLoop.java [53:98]
public ObservableSource<M> apply(final Observable<E> events) {
return Observable.create(
new ObservableOnSubscribe<M>() {
@Override
public void subscribe(final ObservableEmitter<M> emitter) throws Exception {
final MobiusLoop<M, E, ?> loop;
if (startEffects == null) {
loop = loopFactory.startFrom(startModel);
} else {
loop = loopFactory.startFrom(startModel, startEffects);
}
loop.observe(
new com.spotify.mobius.functions.Consumer<M>() {
@Override
public void accept(M newModel) {
emitter.onNext(newModel);
}
});
final Disposable eventsDisposable =
events.subscribe(
new Consumer<E>() {
@Override
public void accept(E event) throws Exception {
loop.dispatchEvent(event);
}
},
new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) throws Exception {
emitter.onError(new UnrecoverableIncomingException(throwable));
}
});
emitter.setCancellable(
new Cancellable() {
@Override
public void cancel() throws Exception {
loop.dispose();
eventsDisposable.dispose();
}
});
}
});
}