in mobius-rx3/src/main/java/com/spotify/mobius/rx3/RxConnectables.java [48:91]
public static <I, O> Connectable<I, O> fromTransformer(
@NonNull final ObservableTransformer<I, O> transformer) {
checkNotNull(transformer);
final Connectable<I, O> actualConnectable =
new Connectable<I, O>() {
@Nonnull
@Override
public Connection<I> connect(com.spotify.mobius.functions.Consumer<O> output) {
final PublishSubject<I> subject = PublishSubject.create();
final AtomicBoolean disposed = new AtomicBoolean();
final Disposable disposable =
subject
.compose(transformer)
.subscribe(
new Consumer<O>() {
@Override
public void accept(O value) throws Throwable {
synchronized (disposed) {
if (!disposed.get()) {
output.accept(value);
}
}
}
});
return new Connection<I>() {
@Override
public void accept(I effect) {
subject.onNext(effect);
}
@Override
public void dispose() {
synchronized (disposed) {
disposed.set(true);
}
disposable.dispose();
}
};
}
};
return new DiscardAfterDisposeConnectable<>(actualConnectable);
}