in mobius-rx2/src/main/java/com/spotify/mobius/rx2/RxConnectables.java [46:91]
public static <I, O> Connectable<I, O> fromTransformer(
final ObservableTransformer<I, O> transformer) {
checkNotNull(transformer);
Connectable<I, O> actualConnectable =
new Connectable<I, O>() {
@Nonnull
@Override
public Connection<I> connect(final Consumer<O> output) {
final PublishSubject<I> subject = PublishSubject.create();
final AtomicBoolean disposed = new AtomicBoolean();
final Disposable disposable =
subject
.compose(transformer)
.subscribe(
new io.reactivex.functions.Consumer<O>() {
@Override
public void accept(O e) {
synchronized (disposed) {
if (!disposed.get()) {
output.accept(e);
}
}
}
});
return new Connection<I>() {
@Override
public void accept(I effect) {
subject.onNext(effect);
}
@Override
public void dispose() {
synchronized (disposed) {
disposed.set(true);
}
disposable.dispose();
}
};
}
};
return new DiscardAfterDisposeConnectable<>(actualConnectable);
}