in mobius-rx/src/main/java/com/spotify/mobius/rx/RxConnectables.java [46:93]
public static <I, O> Connectable<I, O> fromTransformer(
final Observable.Transformer<I, O> transformer) {
checkNotNull(transformer);
return new Connectable<I, O>() {
@Nonnull
@Override
public Connection<I> connect(final Consumer<O> output) {
final PublishSubject<I> subject = PublishSubject.create();
final Subscription subscription =
subject
.compose(transformer)
.subscribe(
new Observer<O>() {
@Override
public void onCompleted() {
// TODO: complain loudly! shouldn't ever complete
}
@Override
public void onError(Throwable e) {
RxJavaHooks.onError(e);
}
@Override
public void onNext(O e) {
output.accept(e);
}
});
return new Connection<I>() {
@Override
public void accept(I effect) {
if (subscription.isUnsubscribed())
throw new IllegalStateException(
"Effect handlers cannot perform effects after they've been disposed of");
subject.onNext(effect);
}
@Override
public void dispose() {
subscription.unsubscribe();
}
};
}
};
}