in runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java [215:360]
public static <InputT> ParDoPayload translateParDo(
ParDo.MultiOutput<InputT, ?> parDo,
PCollection<InputT> mainInput,
DoFnSchemaInformation doFnSchemaInformation,
Pipeline pipeline,
SdkComponents components)
throws IOException {
final DoFn<?, ?> doFn = parDo.getFn();
final DoFnSignature signature = DoFnSignatures.getSignature(doFn.getClass());
final String restrictionCoderId;
if (signature.processElement().isSplittable()) {
DoFnInvoker<?, ?> doFnInvoker = DoFnInvokers.invokerFor(doFn);
final Coder<?> restrictionAndWatermarkStateCoder =
KvCoder.of(
doFnInvoker.invokeGetRestrictionCoder(pipeline.getCoderRegistry()),
doFnInvoker.invokeGetWatermarkEstimatorStateCoder(pipeline.getCoderRegistry()));
restrictionCoderId = components.registerCoder(restrictionAndWatermarkStateCoder);
} else {
restrictionCoderId = "";
}
Coder<BoundedWindow> windowCoder =
(Coder<BoundedWindow>) mainInput.getWindowingStrategy().getWindowFn().windowCoder();
Coder<?> keyCoder;
if (signature.usesState() || signature.usesTimers()) {
checkArgument(
mainInput.getCoder() instanceof KvCoder,
"DoFn's that use state or timers must have an input PCollection with a KvCoder but received %s",
mainInput.getCoder());
keyCoder = ((KvCoder) mainInput.getCoder()).getKeyCoder();
} else {
keyCoder = null;
}
return payloadForParDoLike(
new ParDoLike() {
@Override
public FunctionSpec translateDoFn(SdkComponents newComponents) {
return ParDoTranslation.translateDoFn(
parDo.getFn(),
parDo.getMainOutputTag(),
parDo.getSideInputs(),
doFnSchemaInformation,
newComponents);
}
@Override
public Map<String, SideInput> translateSideInputs(SdkComponents components) {
Map<String, SideInput> sideInputs = new HashMap<>();
for (PCollectionView<?> sideInput : parDo.getSideInputs().values()) {
sideInputs.put(
sideInput.getTagInternal().getId(), translateView(sideInput, components));
}
return sideInputs;
}
@Override
public Map<String, RunnerApi.StateSpec> translateStateSpecs(SdkComponents components)
throws IOException {
Map<String, RunnerApi.StateSpec> stateSpecs = new HashMap<>();
for (Map.Entry<String, StateDeclaration> state :
signature.stateDeclarations().entrySet()) {
RunnerApi.StateSpec spec =
translateStateSpec(getStateSpecOrThrow(state.getValue(), doFn), components);
stateSpecs.put(state.getKey(), spec);
}
return stateSpecs;
}
@Override
public Map<String, RunnerApi.TimerFamilySpec> translateTimerFamilySpecs(
SdkComponents newComponents) {
Map<String, RunnerApi.TimerFamilySpec> timerFamilySpecs = new HashMap<>();
for (Map.Entry<String, TimerDeclaration> timer :
signature.timerDeclarations().entrySet()) {
RunnerApi.TimerFamilySpec spec =
translateTimerFamilySpec(
getTimerSpecOrThrow(timer.getValue(), doFn),
newComponents,
keyCoder,
windowCoder);
timerFamilySpecs.put(timer.getKey(), spec);
}
for (Map.Entry<String, DoFnSignature.TimerFamilyDeclaration> timerFamily :
signature.timerFamilyDeclarations().entrySet()) {
RunnerApi.TimerFamilySpec spec =
translateTimerFamilySpec(
DoFnSignatures.getTimerFamilySpecOrThrow(timerFamily.getValue(), doFn),
newComponents,
keyCoder,
windowCoder);
timerFamilySpecs.put(timerFamily.getKey(), spec);
}
return timerFamilySpecs;
}
@Override
public boolean isStateful() {
return !signature.stateDeclarations().isEmpty()
|| !signature.timerDeclarations().isEmpty()
|| !signature.timerFamilyDeclarations().isEmpty();
}
@Override
public boolean isSplittable() {
return signature.processElement().isSplittable();
}
@Override
public boolean isRequiresStableInput() {
return signature.processElement().requiresStableInput();
}
@Override
public boolean isRequiresTimeSortedInput() {
return signature.processElement().requiresTimeSortedInput();
}
@Override
public boolean requestsFinalization() {
return (signature.startBundle() != null
&& signature
.startBundle()
.extraParameters()
.contains(Parameter.bundleFinalizer()))
|| (signature.processElement() != null
&& signature
.processElement()
.extraParameters()
.contains(Parameter.bundleFinalizer()))
|| (signature.finishBundle() != null
&& signature
.finishBundle()
.extraParameters()
.contains(Parameter.bundleFinalizer()));
}
@Override
public String translateRestrictionCoderId(SdkComponents newComponents) {
return restrictionCoderId;
}
},
components);
}