public ObservableSource apply()

in mobius-rx3/src/main/java/com/spotify/mobius/rx3/RxMobiusLoop.java [55:99]


  public ObservableSource<M> apply(@NonNull Observable<E> upstream) {
    return Observable.create(
        new ObservableOnSubscribe<M>() {
          @Override
          public void subscribe(@NonNull ObservableEmitter<M> emitter) throws Throwable {
            final MobiusLoop<M, E, ?> loop;

            if (startEffects == null) {
              loop = loopFactory.startFrom(startModel);
            } else {
              loop = loopFactory.startFrom(startModel, startEffects);
            }

            loop.observe(
                new com.spotify.mobius.functions.Consumer<M>() {
                  @Override
                  public void accept(M value) {
                    emitter.onNext(value);
                  }
                });
            final Disposable eventsDisposable =
                upstream.subscribe(
                    new Consumer<E>() {
                      @Override
                      public void accept(E event) throws Throwable {
                        loop.dispatchEvent(event);
                      }
                    },
                    new Consumer<Throwable>() {
                      @Override
                      public void accept(Throwable throwable) throws Throwable {
                        emitter.onError(new UnrecoverableIncomingException(throwable));
                      }
                    });
            emitter.setCancellable(
                new Cancellable() {
                  @Override
                  public void cancel() throws Throwable {
                    loop.dispose();
                    eventsDisposable.dispose();
                  }
                });
          }
        });
  }