in mobius-rx/src/main/java/com/spotify/mobius/rx/RxConnectables.java [99:145]
public Observable<O> call(final Observable<I> upstream) {
return Observable.create(
new Action1<Emitter<O>>() {
@Override
public void call(final Emitter<O> emitter) {
Consumer<O> output =
new Consumer<O>() {
@Override
public void accept(O value) {
emitter.onNext(value);
}
};
final Connection<I> input = connectable.connect(output);
final Subscription subscription =
upstream.subscribe(
new Action1<I>() {
@Override
public void call(I f) {
input.accept(f);
}
},
new Action1<Throwable>() {
@Override
public void call(Throwable throwable) {
emitter.onError(throwable);
}
},
new Action0() {
@Override
public void call() {
emitter.onCompleted();
}
});
emitter.setCancellation(
new Cancellable() {
@Override
public void cancel() throws Exception {
subscription.unsubscribe();
input.dispose();
}
});
}
},
backpressureMode);
}