public Observable call()

in mobius-rx/src/main/java/com/spotify/mobius/rx/RxMobiusLoop.java [51:102]


  public Observable<M> call(final Observable<E> events) {
    return Observable.create(
        new Action1<Emitter<M>>() {
          @Override
          public void call(final Emitter<M> emitter) {
            final MobiusLoop<M, E, ?> loop;

            if (startEffects == null) {
              loop = loopFactory.startFrom(startModel);
            } else {
              loop = loopFactory.startFrom(startModel, startEffects);
            }

            loop.observe(
                new Consumer<M>() {
                  @Override
                  public void accept(M newModel) {
                    emitter.onNext(newModel);
                  }
                });

            final Subscription eventSubscription =
                events.subscribe(
                    new Observer<E>() {
                      @Override
                      public void onCompleted() {
                        // TODO: complain loudly! shouldn't ever complete
                      }

                      @Override
                      public void onError(Throwable e) {
                        emitter.onError(new UnrecoverableIncomingException(e));
                      }

                      @Override
                      public void onNext(E event) {
                        loop.dispatchEvent(event);
                      }
                    });

            emitter.setCancellation(
                new Cancellable() {
                  @Override
                  public void cancel() throws Exception {
                    loop.dispose();
                    eventSubscription.unsubscribe();
                  }
                });
          }
        },
        Emitter.BackpressureMode.NONE);
  }