public ObservableSource apply()

in mobius-rx2/src/main/java/com/spotify/mobius/rx2/RxMobiusLoop.java [53:98]


  public ObservableSource<M> apply(final Observable<E> events) {
    return Observable.create(
        new ObservableOnSubscribe<M>() {
          @Override
          public void subscribe(final ObservableEmitter<M> emitter) throws Exception {
            final MobiusLoop<M, E, ?> loop;
            if (startEffects == null) {
              loop = loopFactory.startFrom(startModel);
            } else {
              loop = loopFactory.startFrom(startModel, startEffects);
            }

            loop.observe(
                new com.spotify.mobius.functions.Consumer<M>() {
                  @Override
                  public void accept(M newModel) {
                    emitter.onNext(newModel);
                  }
                });

            final Disposable eventsDisposable =
                events.subscribe(
                    new Consumer<E>() {
                      @Override
                      public void accept(E event) throws Exception {
                        loop.dispatchEvent(event);
                      }
                    },
                    new Consumer<Throwable>() {
                      @Override
                      public void accept(Throwable throwable) throws Exception {
                        emitter.onError(new UnrecoverableIncomingException(throwable));
                      }
                    });

            emitter.setCancellable(
                new Cancellable() {
                  @Override
                  public void cancel() throws Exception {
                    loop.dispose();
                    eventsDisposable.dispose();
                  }
                });
          }
        });
  }