public static ObservableTransformer toTransformer()

in mobius-rx2/src/main/java/com/spotify/mobius/rx2/RxConnectables.java [93:155]


  public static <I, O> ObservableTransformer<I, O> toTransformer(
      final Connectable<I, O> connectable) {
    return new ObservableTransformer<I, O>() {
      @Override
      public ObservableSource<O> apply(final Observable<I> upstream) {
        return Observable.create(
            new ObservableOnSubscribe<O>() {
              @Override
              public void subscribe(final ObservableEmitter<O> emitter) throws Exception {
                Consumer<O> output =
                    new Consumer<O>() {
                      @Override
                      public void accept(O value) {
                        emitter.onNext(value);
                      }
                    };
                final AtomicBoolean disposed = new AtomicBoolean();
                final Connection<I> input =
                    connectable.connect(
                        i -> {
                          synchronized (disposed) {
                            if (!disposed.get()) {
                              output.accept(i);
                            }
                          }
                        });
                final Disposable disposable =
                    upstream.subscribe(
                        new io.reactivex.functions.Consumer<I>() {
                          @Override
                          public void accept(I f) {
                            input.accept(f);
                          }
                        },
                        new io.reactivex.functions.Consumer<Throwable>() {
                          @Override
                          public void accept(Throwable throwable) throws Exception {
                            emitter.onError(throwable);
                          }
                        },
                        new Action() {
                          @Override
                          public void run() {
                            emitter.onComplete();
                          }
                        });

                emitter.setCancellable(
                    new Cancellable() {
                      @Override
                      public void cancel() throws Exception {
                        synchronized (disposed) {
                          disposed.set(true);
                        }
                        disposable.dispose();
                        input.dispose();
                      }
                    });
              }
            });
      }
    };
  }