private static void evaluateHelper()

in sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/ParDo.java [1093:1173]


  private static <InputT, OutputT, ActualInputT extends InputT> void evaluateHelper(
      DoFn<InputT, OutputT> doFn,
      String stepName,
      PCollection<ActualInputT> input,
      List<PCollectionView<?>> sideInputs,
      TupleTag<OutputT> mainOutputTag,
      List<TupleTag<?>> sideOutputTags,
      PCollectionTuple outputs,
      DirectPipelineRunner.EvaluationContext context,
      DirectModeExecutionContext executionContext) {
    // TODO: Run multiple shards?
    DoFn<InputT, OutputT> fn = context.ensureSerializable(doFn);

    SideInputReader sideInputReader = makeSideInputReader(context, sideInputs);

    // When evaluating via the DirectPipelineRunner, this output manager checks each output for
    // illegal mutations when the next output comes along. We then verify again after finishBundle()
    // The common case we expect this to catch is a user mutating an input in order to repeatedly
    // emit "variations".
    ImmutabilityCheckingOutputManager<ActualInputT> outputManager =
        new ImmutabilityCheckingOutputManager<>(
            fn.getClass().getSimpleName(),
            new DoFnRunner.ListOutputManager(),
            outputs);

    DoFnRunner<InputT, OutputT> fnRunner =
        DoFnRunner.create(
            context.getPipelineOptions(),
            fn,
            sideInputReader,
            outputManager,
            mainOutputTag,
            sideOutputTags,
            executionContext.getOrCreateStepContext(stepName, stepName, null),
            context.getAddCounterMutator(),
            input.getWindowingStrategy());

    fnRunner.startBundle();

    for (DirectPipelineRunner.ValueWithMetadata<ActualInputT> elem
             : context.getPCollectionValuesWithMetadata(input)) {
      if (elem.getValue() instanceof KV) {
        // In case the DoFn needs keyed state, set the implicit keys to the keys
        // in the input elements.
        @SuppressWarnings("unchecked")
        KV<?, ?> kvElem = (KV<?, ?>) elem.getValue();
        executionContext.setKey(kvElem.getKey());
      } else {
        executionContext.setKey(elem.getKey());
      }

      // We check the input for mutations only through the call span of processElement.
      // This will miss some cases, but the check is ad hoc and best effort. The common case
      // is that the input is mutated to be used for output.
      try {
        MutationDetector inputMutationDetector = MutationDetectors.forValueWithCoder(
            elem.getWindowedValue().getValue(), input.getCoder());
        @SuppressWarnings("unchecked")
        WindowedValue<InputT> windowedElem = ((WindowedValue<InputT>) elem.getWindowedValue());
        fnRunner.processElement(windowedElem);
        inputMutationDetector.verifyUnmodified();
      } catch (CoderException e) {
        throw new UserCodeException(e);
      } catch (IllegalMutationException exn) {
        throw new IllegalMutationException(
            String.format("DoFn %s mutated input value %s of class %s (new value was %s)."
                + " Input values must not be mutated in any way.",
                fn.getClass().getSimpleName(),
                exn.getSavedValue(), exn.getSavedValue().getClass(), exn.getNewValue()),
            exn.getSavedValue(),
            exn.getNewValue(),
            exn);
      }
    }

    // Note that the input could have been retained and mutated prior to this final output,
    // but for now it degrades readability too much to be worth trying to catch that particular
    // corner case.
    fnRunner.finishBundle();
    outputManager.verifyLatestOutputsUnmodified();
  }