public void processElement()

in runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDoViaKeyedWorkItems.java [350:549]


    public void processElement(final ProcessContext c, BoundedWindow boundedWindow) {
      byte[] key = c.element().key();
      StateInternals stateInternals = stateInternalsFactory.stateInternalsForKey(key);
      TimerInternals timerInternals = timerInternalsFactory.timerInternalsForKey(key);

      // Initialize state (element and restriction) depending on whether this is the seed call.
      // The seed call is the first call for this element, which actually has the element.
      // Subsequent calls are timer firings and the element has to be retrieved from the state.
      TimerInternals.TimerData timer = Iterables.getOnlyElement(c.element().timersIterable(), null);
      boolean isSeedCall = (timer == null);
      StateNamespace stateNamespace;
      if (isSeedCall) {
        WindowedValue<KV<InputT, RestrictionT>> windowedValue =
            Iterables.getOnlyElement(c.element().elementsIterable());
        BoundedWindow window = Iterables.getOnlyElement(windowedValue.getWindows());
        stateNamespace =
            StateNamespaces.window(
                (Coder<BoundedWindow>) inputWindowingStrategy.getWindowFn().windowCoder(), window);
      } else {
        stateNamespace = timer.getNamespace();
      }

      ValueState<WindowedValue<InputT>> elementState =
          stateInternals.state(stateNamespace, elementTag);
      ValueState<RestrictionT> restrictionState =
          stateInternals.state(stateNamespace, restrictionTag);
      ValueState<WatermarkEstimatorStateT> watermarkEstimatorState =
          stateInternals.state(stateNamespace, watermarkEstimatorStateTag);
      WatermarkHoldState holdState = stateInternals.state(stateNamespace, watermarkHoldTag);

      KV<WindowedValue<InputT>, RestrictionT> elementAndRestriction;
      WatermarkEstimatorStateT watermarkEstimatorStateT;
      if (isSeedCall) {
        WindowedValue<KV<InputT, RestrictionT>> windowedValue =
            Iterables.getOnlyElement(c.element().elementsIterable());
        WindowedValue<InputT> element = windowedValue.withValue(windowedValue.getValue().getKey());
        elementState.write(element);
        elementAndRestriction = KV.of(element, windowedValue.getValue().getValue());
        watermarkEstimatorStateT =
            invoker.invokeGetInitialWatermarkEstimatorState(
                new BaseArgumentProvider<InputT, OutputT>() {
                  @Override
                  public InputT element(DoFn<InputT, OutputT> doFn) {
                    return elementAndRestriction.getKey().getValue();
                  }

                  @Override
                  public Object restriction() {
                    return elementAndRestriction.getValue();
                  }

                  @Override
                  public Instant timestamp(DoFn<InputT, OutputT> doFn) {
                    return c.timestamp();
                  }

                  @Override
                  public PipelineOptions pipelineOptions() {
                    return c.getPipelineOptions();
                  }

                  @Override
                  public PaneInfo paneInfo(DoFn<InputT, OutputT> doFn) {
                    return c.pane();
                  }

                  @Override
                  public BoundedWindow window() {
                    return boundedWindow;
                  }

                  @Override
                  public String getErrorContext() {
                    return ProcessFn.class.getSimpleName()
                        + ".invokeGetInitialWatermarkEstimatorState";
                  }
                });
      } else {
        // This is not the first ProcessElement call for this element/restriction - rather,
        // this is a timer firing, so we need to fetch the element and restriction from state.
        elementState.readLater();
        restrictionState.readLater();
        watermarkEstimatorState.readLater();
        elementAndRestriction = KV.of(elementState.read(), restrictionState.read());
        watermarkEstimatorStateT = watermarkEstimatorState.read();
      }

      final WatermarkEstimator<WatermarkEstimatorStateT> watermarkEstimator =
          invoker.invokeNewWatermarkEstimator(
              new BaseArgumentProvider<InputT, OutputT>() {
                @Override
                public InputT element(DoFn<InputT, OutputT> doFn) {
                  return elementAndRestriction.getKey().getValue();
                }

                @Override
                public Object restriction() {
                  return elementAndRestriction.getValue();
                }

                @Override
                public Instant timestamp(DoFn<InputT, OutputT> doFn) {
                  return c.timestamp();
                }

                @Override
                public PipelineOptions pipelineOptions() {
                  return c.getPipelineOptions();
                }

                @Override
                public PaneInfo paneInfo(DoFn<InputT, OutputT> doFn) {
                  return c.pane();
                }

                @Override
                public BoundedWindow window() {
                  return boundedWindow;
                }

                @Override
                public Object watermarkEstimatorState() {
                  return watermarkEstimatorStateT;
                }

                @Override
                public String getErrorContext() {
                  return ProcessFn.class.getSimpleName() + ".invokeNewWatermarkEstimator";
                }
              });

      final RestrictionTracker<RestrictionT, PositionT> tracker =
          invoker.invokeNewTracker(
              new BaseArgumentProvider<InputT, OutputT>() {
                @Override
                public InputT element(DoFn<InputT, OutputT> doFn) {
                  return elementAndRestriction.getKey().getValue();
                }

                @Override
                public Object restriction() {
                  return elementAndRestriction.getValue();
                }

                @Override
                public Instant timestamp(DoFn<InputT, OutputT> doFn) {
                  return c.timestamp();
                }

                @Override
                public PipelineOptions pipelineOptions() {
                  return c.getPipelineOptions();
                }

                @Override
                public PaneInfo paneInfo(DoFn<InputT, OutputT> doFn) {
                  return c.pane();
                }

                @Override
                public BoundedWindow window() {
                  return boundedWindow;
                }

                @Override
                public String getErrorContext() {
                  return ProcessFn.class.getSimpleName() + ".invokeNewTracker";
                }
              });
      SplittableProcessElementInvoker<
                  InputT, OutputT, RestrictionT, PositionT, WatermarkEstimatorStateT>
              .Result
          result =
              processElementInvoker.invokeProcessElement(
                  invoker, elementAndRestriction.getKey(), tracker, watermarkEstimator);

      // Save state for resuming.
      if (result.getResidualRestriction() == null) {
        // All work for this element/restriction is completed. Clear state and release hold.
        elementState.clear();
        restrictionState.clear();
        watermarkEstimatorState.clear();
        holdState.clear();
        return;
      }

      restrictionState.write(result.getResidualRestriction());
      watermarkEstimatorState.write(result.getFutureWatermarkEstimatorState());
      @Nullable Instant futureOutputWatermark = result.getFutureOutputWatermark();
      if (futureOutputWatermark == null) {
        futureOutputWatermark = elementAndRestriction.getKey().getTimestamp();
      }
      Instant wakeupTime =
          timerInternals.currentProcessingTime().plus(result.getContinuation().resumeDelay());
      holdState.add(futureOutputWatermark);
      // Set a timer to continue processing this element.
      timerInternals.setTimer(
          TimerInternals.TimerData.of(
              stateNamespace, wakeupTime, wakeupTime, TimeDomain.PROCESSING_TIME));
    }