public static Connectable fromTransformer()

in mobius-rx3/src/main/java/com/spotify/mobius/rx3/RxConnectables.java [48:91]


  public static <I, O> Connectable<I, O> fromTransformer(
      @NonNull final ObservableTransformer<I, O> transformer) {
    checkNotNull(transformer);
    final Connectable<I, O> actualConnectable =
        new Connectable<I, O>() {
          @Nonnull
          @Override
          public Connection<I> connect(com.spotify.mobius.functions.Consumer<O> output) {
            final PublishSubject<I> subject = PublishSubject.create();
            final AtomicBoolean disposed = new AtomicBoolean();

            final Disposable disposable =
                subject
                    .compose(transformer)
                    .subscribe(
                        new Consumer<O>() {
                          @Override
                          public void accept(O value) throws Throwable {
                            synchronized (disposed) {
                              if (!disposed.get()) {
                                output.accept(value);
                              }
                            }
                          }
                        });

            return new Connection<I>() {
              @Override
              public void accept(I effect) {
                subject.onNext(effect);
              }

              @Override
              public void dispose() {
                synchronized (disposed) {
                  disposed.set(true);
                }
                disposable.dispose();
              }
            };
          }
        };
    return new DiscardAfterDisposeConnectable<>(actualConnectable);
  }