in mobius-rx3/src/main/java/com/spotify/mobius/rx3/RxConnectables.java [94:150]
public static <I, O> ObservableTransformer<I, O> toTransformer(
final Connectable<I, O> connectable) {
return new ObservableTransformer<I, O>() {
@Override
public @NonNull ObservableSource<O> apply(@NonNull Observable<I> upstream) {
return Observable.create(
new ObservableOnSubscribe<O>() {
@Override
public void subscribe(@NonNull ObservableEmitter<O> emitter) throws Throwable {
com.spotify.mobius.functions.Consumer<O> output = emitter::onNext;
final AtomicBoolean disposed = new AtomicBoolean();
final Connection<I> input =
connectable.connect(
i -> {
synchronized (disposed) {
if (!disposed.get()) {
output.accept(i);
}
}
});
final Disposable disposable =
upstream.subscribe(
new Consumer<I>() {
@Override
public void accept(I value) throws Throwable {
input.accept(value);
}
},
new Consumer<Throwable>() {
@Override
public void accept(Throwable error) throws Throwable {
emitter.onError(error);
}
},
new Action() {
@Override
public void run() throws Throwable {
emitter.onComplete();
}
});
emitter.setCancellable(
new Cancellable() {
@Override
public void cancel() throws Throwable {
synchronized (disposed) {
disposed.set(true);
}
disposable.dispose();
input.dispose();
}
});
}
});
}
};
}