public static ObservableTransformer toTransformer()

in mobius-rx3/src/main/java/com/spotify/mobius/rx3/RxConnectables.java [94:150]


  public static <I, O> ObservableTransformer<I, O> toTransformer(
      final Connectable<I, O> connectable) {
    return new ObservableTransformer<I, O>() {
      @Override
      public @NonNull ObservableSource<O> apply(@NonNull Observable<I> upstream) {
        return Observable.create(
            new ObservableOnSubscribe<O>() {
              @Override
              public void subscribe(@NonNull ObservableEmitter<O> emitter) throws Throwable {
                com.spotify.mobius.functions.Consumer<O> output = emitter::onNext;
                final AtomicBoolean disposed = new AtomicBoolean();
                final Connection<I> input =
                    connectable.connect(
                        i -> {
                          synchronized (disposed) {
                            if (!disposed.get()) {
                              output.accept(i);
                            }
                          }
                        });
                final Disposable disposable =
                    upstream.subscribe(
                        new Consumer<I>() {
                          @Override
                          public void accept(I value) throws Throwable {
                            input.accept(value);
                          }
                        },
                        new Consumer<Throwable>() {
                          @Override
                          public void accept(Throwable error) throws Throwable {
                            emitter.onError(error);
                          }
                        },
                        new Action() {
                          @Override
                          public void run() throws Throwable {
                            emitter.onComplete();
                          }
                        });

                emitter.setCancellable(
                    new Cancellable() {
                      @Override
                      public void cancel() throws Throwable {
                        synchronized (disposed) {
                          disposed.set(true);
                        }
                        disposable.dispose();
                        input.dispose();
                      }
                    });
              }
            });
      }
    };
  }