in mobius-rx/src/main/java/com/spotify/mobius/rx/RxMobiusLoop.java [51:102]
public Observable<M> call(final Observable<E> events) {
return Observable.create(
new Action1<Emitter<M>>() {
@Override
public void call(final Emitter<M> emitter) {
final MobiusLoop<M, E, ?> loop;
if (startEffects == null) {
loop = loopFactory.startFrom(startModel);
} else {
loop = loopFactory.startFrom(startModel, startEffects);
}
loop.observe(
new Consumer<M>() {
@Override
public void accept(M newModel) {
emitter.onNext(newModel);
}
});
final Subscription eventSubscription =
events.subscribe(
new Observer<E>() {
@Override
public void onCompleted() {
// TODO: complain loudly! shouldn't ever complete
}
@Override
public void onError(Throwable e) {
emitter.onError(new UnrecoverableIncomingException(e));
}
@Override
public void onNext(E event) {
loop.dispatchEvent(event);
}
});
emitter.setCancellation(
new Cancellable() {
@Override
public void cancel() throws Exception {
loop.dispose();
eventSubscription.unsubscribe();
}
});
}
},
Emitter.BackpressureMode.NONE);
}