in runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchTransformTranslators.java [629:786]
public void translateNode(
PTransform<PCollection<InputT>, PCollectionTuple> transform,
FlinkBatchTranslationContext context) {
DoFn<InputT, OutputT> doFn;
try {
doFn = (DoFn<InputT, OutputT>) ParDoTranslation.getDoFn(context.getCurrentTransform());
} catch (IOException e) {
throw new RuntimeException(e);
}
DoFnSignature signature = DoFnSignatures.signatureForDoFn(doFn);
checkState(
!signature.processElement().isSplittable(),
"Not expected to directly translate splittable DoFn, should have been overridden: %s",
doFn);
DataSet<WindowedValue<InputT>> inputDataSet =
context.getInputDataSet(context.getInput(transform));
Map<TupleTag<?>, PCollection<?>> outputs = context.getOutputs(transform);
final TupleTag<OutputT> mainOutputTag;
DoFnSchemaInformation doFnSchemaInformation;
Map<String, PCollectionView<?>> sideInputMapping;
try {
mainOutputTag =
(TupleTag<OutputT>) ParDoTranslation.getMainOutputTag(context.getCurrentTransform());
} catch (IOException e) {
throw new RuntimeException(e);
}
doFnSchemaInformation = ParDoTranslation.getSchemaInformation(context.getCurrentTransform());
sideInputMapping = ParDoTranslation.getSideInputMapping(context.getCurrentTransform());
Map<TupleTag<?>, Integer> outputMap = Maps.newHashMap();
// put the main output at index 0, FlinkMultiOutputDoFnFunction expects this
outputMap.put(mainOutputTag, 0);
int count = 1;
for (TupleTag<?> tag : outputs.keySet()) {
if (!outputMap.containsKey(tag)) {
outputMap.put(tag, count++);
}
}
// Union coder elements must match the order of the output tags.
Map<Integer, TupleTag<?>> indexMap = Maps.newTreeMap();
for (Map.Entry<TupleTag<?>, Integer> entry : outputMap.entrySet()) {
indexMap.put(entry.getValue(), entry.getKey());
}
// assume that the windowing strategy is the same for all outputs
WindowingStrategy<?, ?> windowingStrategy = null;
// collect all output Coders and create a UnionCoder for our tagged outputs
List<Coder<?>> outputCoders = Lists.newArrayList();
for (TupleTag<?> tag : indexMap.values()) {
PValue taggedValue = outputs.get(tag);
checkState(
taggedValue instanceof PCollection,
"Within ParDo, got a non-PCollection output %s of type %s",
taggedValue,
taggedValue.getClass().getSimpleName());
PCollection<?> coll = (PCollection<?>) taggedValue;
outputCoders.add(coll.getCoder());
windowingStrategy = coll.getWindowingStrategy();
}
if (windowingStrategy == null) {
throw new IllegalStateException("No outputs defined.");
}
UnionCoder unionCoder = UnionCoder.of(outputCoders);
TypeInformation<WindowedValue<RawUnionValue>> typeInformation =
new CoderTypeInformation<>(
WindowedValue.getFullCoder(unionCoder, windowingStrategy.getWindowFn().windowCoder()),
context.getPipelineOptions());
List<PCollectionView<?>> sideInputs;
try {
sideInputs = ParDoTranslation.getSideInputs(context.getCurrentTransform());
} catch (IOException e) {
throw new RuntimeException(e);
}
// construct a map from side input to WindowingStrategy so that
// the DoFn runner can map main-input windows to side input windows
Map<PCollectionView<?>, WindowingStrategy<?, ?>> sideInputStrategies = new HashMap<>();
for (PCollectionView<?> sideInput : sideInputs) {
sideInputStrategies.put(sideInput, sideInput.getWindowingStrategyInternal());
}
SingleInputUdfOperator<WindowedValue<InputT>, WindowedValue<RawUnionValue>, ?> outputDataSet;
boolean usesStateOrTimers;
try {
usesStateOrTimers = ParDoTranslation.usesStateOrTimers(context.getCurrentTransform());
} catch (IOException e) {
throw new RuntimeException(e);
}
final Map<TupleTag<?>, Coder<?>> outputCoderMap = context.getOutputCoders(transform);
String fullName = getCurrentTransformName(context);
if (usesStateOrTimers) {
KvCoder<?, ?> inputCoder = (KvCoder<?, ?>) context.getInput(transform).getCoder();
FlinkStatefulDoFnFunction<?, ?, OutputT> doFnWrapper =
new FlinkStatefulDoFnFunction<>(
(DoFn) doFn,
fullName,
windowingStrategy,
sideInputStrategies,
context.getPipelineOptions(),
outputMap,
mainOutputTag,
inputCoder,
outputCoderMap,
doFnSchemaInformation,
sideInputMapping);
// Based on the fact that the signature is stateful, DoFnSignatures ensures
// that it is also keyed.
Coder<Object> keyCoder = (Coder) inputCoder.getKeyCoder();
final Grouping<WindowedValue<InputT>> grouping;
if (signature.processElement().requiresTimeSortedInput()) {
grouping =
inputDataSet
.groupBy((KeySelector) new KvKeySelector<>(keyCoder))
.sortGroup(new KeyWithValueTimestampSelector<>(), Order.ASCENDING);
} else {
grouping = inputDataSet.groupBy((KeySelector) new KvKeySelector<>(keyCoder));
}
outputDataSet = new GroupReduceOperator(grouping, typeInformation, doFnWrapper, fullName);
} else {
final FlinkDoFnFunction<InputT, OutputT> doFnWrapper =
new FlinkDoFnFunction<>(
doFn,
fullName,
windowingStrategy,
sideInputStrategies,
context.getPipelineOptions(),
outputMap,
mainOutputTag,
context.getInput(transform).getCoder(),
outputCoderMap,
doFnSchemaInformation,
sideInputMapping);
outputDataSet = new FlatMapOperator<>(inputDataSet, typeInformation, doFnWrapper, fullName);
}
transformSideInputs(sideInputs, outputDataSet, context);
for (Entry<TupleTag<?>, PCollection<?>> output : outputs.entrySet()) {
pruneOutput(
outputDataSet,
context,
outputMap.get(output.getKey()),
(PCollection) output.getValue());
}
}