in sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/ParDo.java [1093:1173]
private static <InputT, OutputT, ActualInputT extends InputT> void evaluateHelper(
DoFn<InputT, OutputT> doFn,
String stepName,
PCollection<ActualInputT> input,
List<PCollectionView<?>> sideInputs,
TupleTag<OutputT> mainOutputTag,
List<TupleTag<?>> sideOutputTags,
PCollectionTuple outputs,
DirectPipelineRunner.EvaluationContext context,
DirectModeExecutionContext executionContext) {
// TODO: Run multiple shards?
DoFn<InputT, OutputT> fn = context.ensureSerializable(doFn);
SideInputReader sideInputReader = makeSideInputReader(context, sideInputs);
// When evaluating via the DirectPipelineRunner, this output manager checks each output for
// illegal mutations when the next output comes along. We then verify again after finishBundle()
// The common case we expect this to catch is a user mutating an input in order to repeatedly
// emit "variations".
ImmutabilityCheckingOutputManager<ActualInputT> outputManager =
new ImmutabilityCheckingOutputManager<>(
fn.getClass().getSimpleName(),
new DoFnRunner.ListOutputManager(),
outputs);
DoFnRunner<InputT, OutputT> fnRunner =
DoFnRunner.create(
context.getPipelineOptions(),
fn,
sideInputReader,
outputManager,
mainOutputTag,
sideOutputTags,
executionContext.getOrCreateStepContext(stepName, stepName, null),
context.getAddCounterMutator(),
input.getWindowingStrategy());
fnRunner.startBundle();
for (DirectPipelineRunner.ValueWithMetadata<ActualInputT> elem
: context.getPCollectionValuesWithMetadata(input)) {
if (elem.getValue() instanceof KV) {
// In case the DoFn needs keyed state, set the implicit keys to the keys
// in the input elements.
@SuppressWarnings("unchecked")
KV<?, ?> kvElem = (KV<?, ?>) elem.getValue();
executionContext.setKey(kvElem.getKey());
} else {
executionContext.setKey(elem.getKey());
}
// We check the input for mutations only through the call span of processElement.
// This will miss some cases, but the check is ad hoc and best effort. The common case
// is that the input is mutated to be used for output.
try {
MutationDetector inputMutationDetector = MutationDetectors.forValueWithCoder(
elem.getWindowedValue().getValue(), input.getCoder());
@SuppressWarnings("unchecked")
WindowedValue<InputT> windowedElem = ((WindowedValue<InputT>) elem.getWindowedValue());
fnRunner.processElement(windowedElem);
inputMutationDetector.verifyUnmodified();
} catch (CoderException e) {
throw new UserCodeException(e);
} catch (IllegalMutationException exn) {
throw new IllegalMutationException(
String.format("DoFn %s mutated input value %s of class %s (new value was %s)."
+ " Input values must not be mutated in any way.",
fn.getClass().getSimpleName(),
exn.getSavedValue(), exn.getSavedValue().getClass(), exn.getNewValue()),
exn.getSavedValue(),
exn.getNewValue(),
exn);
}
}
// Note that the input could have been retained and mutated prior to this final output,
// but for now it degrades readability too much to be worth trying to catch that particular
// corner case.
fnRunner.finishBundle();
outputManager.verifyLatestOutputsUnmodified();
}