in mobius-rx2/src/main/java/com/spotify/mobius/rx2/RxConnectables.java [93:155]
public static <I, O> ObservableTransformer<I, O> toTransformer(
final Connectable<I, O> connectable) {
return new ObservableTransformer<I, O>() {
@Override
public ObservableSource<O> apply(final Observable<I> upstream) {
return Observable.create(
new ObservableOnSubscribe<O>() {
@Override
public void subscribe(final ObservableEmitter<O> emitter) throws Exception {
Consumer<O> output =
new Consumer<O>() {
@Override
public void accept(O value) {
emitter.onNext(value);
}
};
final AtomicBoolean disposed = new AtomicBoolean();
final Connection<I> input =
connectable.connect(
i -> {
synchronized (disposed) {
if (!disposed.get()) {
output.accept(i);
}
}
});
final Disposable disposable =
upstream.subscribe(
new io.reactivex.functions.Consumer<I>() {
@Override
public void accept(I f) {
input.accept(f);
}
},
new io.reactivex.functions.Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) throws Exception {
emitter.onError(throwable);
}
},
new Action() {
@Override
public void run() {
emitter.onComplete();
}
});
emitter.setCancellable(
new Cancellable() {
@Override
public void cancel() throws Exception {
synchronized (disposed) {
disposed.set(true);
}
disposable.dispose();
input.dispose();
}
});
}
});
}
};
}