in runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/SplittableParDoExpander.java [119:316]
public MessageWithComponents getReplacement(
String transformId, ComponentsOrBuilder existingComponents) {
try {
MessageWithComponents.Builder rval = MessageWithComponents.newBuilder();
PTransform splittableParDo = existingComponents.getTransformsOrThrow(transformId);
ParDoPayload payload = ParDoPayload.parseFrom(splittableParDo.getSpec().getPayload());
// Only perform the expansion if this is a splittable DoFn.
if (payload.getRestrictionCoderId() == null || payload.getRestrictionCoderId().isEmpty()) {
return null;
}
String mainInputName = ParDoTranslation.getMainInputName(splittableParDo);
String mainInputPCollectionId = splittableParDo.getInputsOrThrow(mainInputName);
PCollection mainInputPCollection =
existingComponents.getPcollectionsOrThrow(mainInputPCollectionId);
Map<String, String> sideInputs =
Maps.filterKeys(
splittableParDo.getInputsMap(), input -> payload.containsSideInputs(input));
String pairWithRestrictionOutCoderId =
generateUniqueId(
mainInputPCollection.getCoderId() + "/PairWithRestriction",
existingComponents::containsCoders);
rval.getComponentsBuilder()
.putCoders(
pairWithRestrictionOutCoderId,
ModelCoders.kvCoder(
mainInputPCollection.getCoderId(), payload.getRestrictionCoderId()));
String pairWithRestrictionOutId =
generateUniqueId(
mainInputPCollectionId + "/PairWithRestriction",
existingComponents::containsPcollections);
rval.getComponentsBuilder()
.putPcollections(
pairWithRestrictionOutId,
PCollection.newBuilder()
.setCoderId(pairWithRestrictionOutCoderId)
.setIsBounded(mainInputPCollection.getIsBounded())
.setWindowingStrategyId(mainInputPCollection.getWindowingStrategyId())
.setUniqueName(
generateUniquePCollectonName(
mainInputPCollection.getUniqueName() + "/PairWithRestriction",
existingComponents))
.build());
String splitAndSizeOutCoderId =
generateUniqueId(
mainInputPCollection.getCoderId() + "/SplitAndSize",
existingComponents::containsCoders);
rval.getComponentsBuilder()
.putCoders(
splitAndSizeOutCoderId,
ModelCoders.kvCoder(
pairWithRestrictionOutCoderId, getOrAddDoubleCoder(existingComponents, rval)));
String splitAndSizeOutId =
generateUniqueId(
mainInputPCollectionId + "/SplitAndSize", existingComponents::containsPcollections);
rval.getComponentsBuilder()
.putPcollections(
splitAndSizeOutId,
PCollection.newBuilder()
.setCoderId(splitAndSizeOutCoderId)
.setIsBounded(mainInputPCollection.getIsBounded())
.setWindowingStrategyId(mainInputPCollection.getWindowingStrategyId())
.setUniqueName(
generateUniquePCollectonName(
mainInputPCollection.getUniqueName() + "/SplitAndSize",
existingComponents))
.build());
String pairWithRestrictionId =
generateUniqueId(
transformId + "/PairWithRestriction", existingComponents::containsTransforms);
{
PTransform.Builder pairWithRestriction = PTransform.newBuilder();
pairWithRestriction.putAllInputs(splittableParDo.getInputsMap());
pairWithRestriction.putOutputs("out", pairWithRestrictionOutId);
pairWithRestriction.setUniqueName(
generateUniquePCollectonName(
splittableParDo.getUniqueName() + "/PairWithRestriction", existingComponents));
pairWithRestriction.setSpec(
FunctionSpec.newBuilder()
.setUrn(PTransformTranslation.SPLITTABLE_PAIR_WITH_RESTRICTION_URN)
.setPayload(splittableParDo.getSpec().getPayload()));
pairWithRestriction.setEnvironmentId(splittableParDo.getEnvironmentId());
rval.getComponentsBuilder()
.putTransforms(pairWithRestrictionId, pairWithRestriction.build());
}
String splitAndSizeId =
generateUniqueId(transformId + "/SplitAndSize", existingComponents::containsTransforms);
{
PTransform.Builder splitAndSize = PTransform.newBuilder();
splitAndSize.putInputs(mainInputName, pairWithRestrictionOutId);
splitAndSize.putAllInputs(sideInputs);
splitAndSize.putOutputs("out", splitAndSizeOutId);
splitAndSize.setUniqueName(
generateUniquePCollectonName(
splittableParDo.getUniqueName() + "/SplitAndSize", existingComponents));
splitAndSize.setSpec(
FunctionSpec.newBuilder()
.setUrn(PTransformTranslation.SPLITTABLE_SPLIT_AND_SIZE_RESTRICTIONS_URN)
.setPayload(splittableParDo.getSpec().getPayload()));
splitAndSize.setEnvironmentId(splittableParDo.getEnvironmentId());
rval.getComponentsBuilder().putTransforms(splitAndSizeId, splitAndSize.build());
}
PTransform.Builder newCompositeRoot =
splittableParDo
.toBuilder()
// Clear the original splittable ParDo spec and add all the new transforms as
// children.
.clearSpec()
.addAllSubtransforms(Arrays.asList(pairWithRestrictionId, splitAndSizeId));
String processSizedElementsAndRestrictionsId =
generateUniqueId(
transformId + "/ProcessSizedElementsAndRestrictions",
existingComponents::containsTransforms);
String processSizedElementsInputPCollectionId = splitAndSizeOutId;
if (isDrain()) {
String truncateAndSizeCoderId =
generateUniqueId(
mainInputPCollection.getCoderId() + "/TruncateAndSize",
existingComponents::containsCoders);
rval.getComponentsBuilder()
.putCoders(
truncateAndSizeCoderId,
ModelCoders.kvCoder(
splitAndSizeOutCoderId, getOrAddDoubleCoder(existingComponents, rval)));
String truncateAndSizeOutId =
generateUniqueId(
mainInputPCollectionId + "/TruncateAndSize",
existingComponents::containsPcollections);
rval.getComponentsBuilder()
.putPcollections(
truncateAndSizeOutId,
PCollection.newBuilder()
.setCoderId(truncateAndSizeCoderId)
.setIsBounded(mainInputPCollection.getIsBounded())
.setWindowingStrategyId(mainInputPCollection.getWindowingStrategyId())
.setUniqueName(
generateUniquePCollectonName(
mainInputPCollection.getUniqueName() + "/TruncateAndSize",
existingComponents))
.build());
String truncateAndSizeId =
generateUniqueId(
transformId + "/TruncateAndSize", existingComponents::containsTransforms);
{
PTransform.Builder truncateAndSize = PTransform.newBuilder();
truncateAndSize.putInputs(mainInputName, splitAndSizeOutId);
truncateAndSize.putAllInputs(sideInputs);
truncateAndSize.putOutputs("out", truncateAndSizeOutId);
truncateAndSize.setUniqueName(
generateUniquePCollectonName(
splittableParDo.getUniqueName() + "/TruncateAndSize", existingComponents));
truncateAndSize.setSpec(
FunctionSpec.newBuilder()
.setUrn(PTransformTranslation.SPLITTABLE_TRUNCATE_SIZED_RESTRICTION_URN)
.setPayload(splittableParDo.getSpec().getPayload()));
truncateAndSize.setEnvironmentId(splittableParDo.getEnvironmentId());
rval.getComponentsBuilder().putTransforms(truncateAndSizeId, truncateAndSize.build());
}
newCompositeRoot.addSubtransforms(truncateAndSizeId);
processSizedElementsInputPCollectionId = truncateAndSizeOutId;
}
{
PTransform.Builder processSizedElementsAndRestrictions = PTransform.newBuilder();
processSizedElementsAndRestrictions.putInputs(
mainInputName, processSizedElementsInputPCollectionId);
processSizedElementsAndRestrictions.putAllInputs(sideInputs);
processSizedElementsAndRestrictions.putAllOutputs(splittableParDo.getOutputsMap());
processSizedElementsAndRestrictions.setUniqueName(
generateUniquePCollectonName(
splittableParDo.getUniqueName() + "/ProcessSizedElementsAndRestrictions",
existingComponents));
processSizedElementsAndRestrictions.setSpec(
FunctionSpec.newBuilder()
.setUrn(
PTransformTranslation.SPLITTABLE_PROCESS_SIZED_ELEMENTS_AND_RESTRICTIONS_URN)
.setPayload(splittableParDo.getSpec().getPayload()));
processSizedElementsAndRestrictions.setEnvironmentId(splittableParDo.getEnvironmentId());
rval.getComponentsBuilder()
.putTransforms(
processSizedElementsAndRestrictionsId,
processSizedElementsAndRestrictions.build());
}
newCompositeRoot.addSubtransforms(processSizedElementsAndRestrictionsId);
rval.setPtransform(newCompositeRoot);
return rval.build();
} catch (IOException e) {
throw new RuntimeException("Unable to perform expansion for transform " + transformId, e);
}
}