public static Connectable fromTransformer()

in mobius-rx/src/main/java/com/spotify/mobius/rx/RxConnectables.java [46:93]


  public static <I, O> Connectable<I, O> fromTransformer(
      final Observable.Transformer<I, O> transformer) {
    checkNotNull(transformer);

    return new Connectable<I, O>() {
      @Nonnull
      @Override
      public Connection<I> connect(final Consumer<O> output) {
        final PublishSubject<I> subject = PublishSubject.create();

        final Subscription subscription =
            subject
                .compose(transformer)
                .subscribe(
                    new Observer<O>() {
                      @Override
                      public void onCompleted() {
                        // TODO: complain loudly! shouldn't ever complete
                      }

                      @Override
                      public void onError(Throwable e) {
                        RxJavaHooks.onError(e);
                      }

                      @Override
                      public void onNext(O e) {
                        output.accept(e);
                      }
                    });

        return new Connection<I>() {
          @Override
          public void accept(I effect) {
            if (subscription.isUnsubscribed())
              throw new IllegalStateException(
                  "Effect handlers cannot perform effects after they've been disposed of");
            subject.onNext(effect);
          }

          @Override
          public void dispose() {
            subscription.unsubscribe();
          }
        };
      }
    };
  }