public Observable call()

in mobius-rx/src/main/java/com/spotify/mobius/rx/RxConnectables.java [99:145]


      public Observable<O> call(final Observable<I> upstream) {
        return Observable.create(
            new Action1<Emitter<O>>() {
              @Override
              public void call(final Emitter<O> emitter) {
                Consumer<O> output =
                    new Consumer<O>() {
                      @Override
                      public void accept(O value) {
                        emitter.onNext(value);
                      }
                    };

                final Connection<I> input = connectable.connect(output);
                final Subscription subscription =
                    upstream.subscribe(
                        new Action1<I>() {
                          @Override
                          public void call(I f) {
                            input.accept(f);
                          }
                        },
                        new Action1<Throwable>() {
                          @Override
                          public void call(Throwable throwable) {
                            emitter.onError(throwable);
                          }
                        },
                        new Action0() {
                          @Override
                          public void call() {
                            emitter.onCompleted();
                          }
                        });

                emitter.setCancellation(
                    new Cancellable() {
                      @Override
                      public void cancel() throws Exception {
                        subscription.unsubscribe();
                        input.dispose();
                      }
                    });
              }
            },
            backpressureMode);
      }