public void process()

in runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SplittableParDoNaiveBounded.java [181:338]


    public void process(ProcessContext c, BoundedWindow w) {
      WatermarkEstimatorStateT initialWatermarkEstimatorState =
          (WatermarkEstimatorStateT)
              invoker.invokeGetInitialWatermarkEstimatorState(
                  new BaseArgumentProvider<InputT, OutputT>() {
                    @Override
                    public InputT element(DoFn<InputT, OutputT> doFn) {
                      return c.element().getKey();
                    }

                    @Override
                    public Object restriction() {
                      return c.element().getValue();
                    }

                    @Override
                    public Instant timestamp(DoFn<InputT, OutputT> doFn) {
                      return c.timestamp();
                    }

                    @Override
                    public PipelineOptions pipelineOptions() {
                      return c.getPipelineOptions();
                    }

                    @Override
                    public PaneInfo paneInfo(DoFn<InputT, OutputT> doFn) {
                      return c.pane();
                    }

                    @Override
                    public BoundedWindow window() {
                      return w;
                    }

                    @Override
                    public String getErrorContext() {
                      return NaiveProcessFn.class.getSimpleName()
                          + ".invokeGetInitialWatermarkEstimatorState";
                    }
                  });

      RestrictionT restriction = c.element().getValue();
      WatermarkEstimatorStateT watermarkEstimatorState = initialWatermarkEstimatorState;
      while (true) {
        RestrictionT currentRestriction = restriction;
        WatermarkEstimatorStateT currentWatermarkEstimatorState = watermarkEstimatorState;

        RestrictionTracker<RestrictionT, PositionT> tracker =
            RestrictionTrackers.observe(
                invoker.invokeNewTracker(
                    new BaseArgumentProvider<InputT, OutputT>() {
                      @Override
                      public InputT element(DoFn<InputT, OutputT> doFn) {
                        return c.element().getKey();
                      }

                      @Override
                      public RestrictionT restriction() {
                        return currentRestriction;
                      }

                      @Override
                      public Instant timestamp(DoFn<InputT, OutputT> doFn) {
                        return c.timestamp();
                      }

                      @Override
                      public PipelineOptions pipelineOptions() {
                        return c.getPipelineOptions();
                      }

                      @Override
                      public PaneInfo paneInfo(DoFn<InputT, OutputT> doFn) {
                        return c.pane();
                      }

                      @Override
                      public BoundedWindow window() {
                        return w;
                      }

                      @Override
                      public String getErrorContext() {
                        return NaiveProcessFn.class.getSimpleName() + ".invokeNewTracker";
                      }
                    }),
                new ClaimObserver<PositionT>() {
                  @Override
                  public void onClaimed(PositionT position) {}

                  @Override
                  public void onClaimFailed(PositionT position) {}
                });

        WatermarkEstimator<WatermarkEstimatorStateT> watermarkEstimator =
            invoker.invokeNewWatermarkEstimator(
                new BaseArgumentProvider<InputT, OutputT>() {
                  @Override
                  public InputT element(DoFn<InputT, OutputT> doFn) {
                    return c.element().getKey();
                  }

                  @Override
                  public RestrictionT restriction() {
                    return currentRestriction;
                  }

                  @Override
                  public WatermarkEstimatorStateT watermarkEstimatorState() {
                    return currentWatermarkEstimatorState;
                  }

                  @Override
                  public Instant timestamp(DoFn<InputT, OutputT> doFn) {
                    return c.timestamp();
                  }

                  @Override
                  public PipelineOptions pipelineOptions() {
                    return c.getPipelineOptions();
                  }

                  @Override
                  public PaneInfo paneInfo(DoFn<InputT, OutputT> doFn) {
                    return c.pane();
                  }

                  @Override
                  public BoundedWindow window() {
                    return w;
                  }

                  @Override
                  public String getErrorContext() {
                    return NaiveProcessFn.class.getSimpleName() + ".invokeNewWatermarkEstimator";
                  }
                });
        ProcessContinuation continuation =
            invoker.invokeProcessElement(
                new NestedProcessContext<>(
                    fn, c, c.element().getKey(), w, tracker, watermarkEstimator));
        if (continuation.shouldResume()) {
          // Fetch the watermark before splitting to ensure that the watermark applies to both
          // the primary and the residual.
          watermarkEstimatorState = watermarkEstimator.getState();
          SplitResult<RestrictionT> split = tracker.trySplit(0);
          if (split == null) {
            break;
          }
          restriction = split.getResidual();
          Uninterruptibles.sleepUninterruptibly(
              continuation.resumeDelay().getMillis(), TimeUnit.MILLISECONDS);
        } else {
          break;
        }
      }
    }