public MessageWithComponents getReplacement()

in runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/SplittableParDoExpander.java [119:316]


    public MessageWithComponents getReplacement(
        String transformId, ComponentsOrBuilder existingComponents) {
      try {
        MessageWithComponents.Builder rval = MessageWithComponents.newBuilder();

        PTransform splittableParDo = existingComponents.getTransformsOrThrow(transformId);
        ParDoPayload payload = ParDoPayload.parseFrom(splittableParDo.getSpec().getPayload());
        // Only perform the expansion if this is a splittable DoFn.
        if (payload.getRestrictionCoderId() == null || payload.getRestrictionCoderId().isEmpty()) {
          return null;
        }

        String mainInputName = ParDoTranslation.getMainInputName(splittableParDo);
        String mainInputPCollectionId = splittableParDo.getInputsOrThrow(mainInputName);
        PCollection mainInputPCollection =
            existingComponents.getPcollectionsOrThrow(mainInputPCollectionId);
        Map<String, String> sideInputs =
            Maps.filterKeys(
                splittableParDo.getInputsMap(), input -> payload.containsSideInputs(input));

        String pairWithRestrictionOutCoderId =
            generateUniqueId(
                mainInputPCollection.getCoderId() + "/PairWithRestriction",
                existingComponents::containsCoders);
        rval.getComponentsBuilder()
            .putCoders(
                pairWithRestrictionOutCoderId,
                ModelCoders.kvCoder(
                    mainInputPCollection.getCoderId(), payload.getRestrictionCoderId()));

        String pairWithRestrictionOutId =
            generateUniqueId(
                mainInputPCollectionId + "/PairWithRestriction",
                existingComponents::containsPcollections);
        rval.getComponentsBuilder()
            .putPcollections(
                pairWithRestrictionOutId,
                PCollection.newBuilder()
                    .setCoderId(pairWithRestrictionOutCoderId)
                    .setIsBounded(mainInputPCollection.getIsBounded())
                    .setWindowingStrategyId(mainInputPCollection.getWindowingStrategyId())
                    .setUniqueName(
                        generateUniquePCollectonName(
                            mainInputPCollection.getUniqueName() + "/PairWithRestriction",
                            existingComponents))
                    .build());

        String splitAndSizeOutCoderId =
            generateUniqueId(
                mainInputPCollection.getCoderId() + "/SplitAndSize",
                existingComponents::containsCoders);
        rval.getComponentsBuilder()
            .putCoders(
                splitAndSizeOutCoderId,
                ModelCoders.kvCoder(
                    pairWithRestrictionOutCoderId, getOrAddDoubleCoder(existingComponents, rval)));

        String splitAndSizeOutId =
            generateUniqueId(
                mainInputPCollectionId + "/SplitAndSize", existingComponents::containsPcollections);
        rval.getComponentsBuilder()
            .putPcollections(
                splitAndSizeOutId,
                PCollection.newBuilder()
                    .setCoderId(splitAndSizeOutCoderId)
                    .setIsBounded(mainInputPCollection.getIsBounded())
                    .setWindowingStrategyId(mainInputPCollection.getWindowingStrategyId())
                    .setUniqueName(
                        generateUniquePCollectonName(
                            mainInputPCollection.getUniqueName() + "/SplitAndSize",
                            existingComponents))
                    .build());

        String pairWithRestrictionId =
            generateUniqueId(
                transformId + "/PairWithRestriction", existingComponents::containsTransforms);
        {
          PTransform.Builder pairWithRestriction = PTransform.newBuilder();
          pairWithRestriction.putAllInputs(splittableParDo.getInputsMap());
          pairWithRestriction.putOutputs("out", pairWithRestrictionOutId);
          pairWithRestriction.setUniqueName(
              generateUniquePCollectonName(
                  splittableParDo.getUniqueName() + "/PairWithRestriction", existingComponents));
          pairWithRestriction.setSpec(
              FunctionSpec.newBuilder()
                  .setUrn(PTransformTranslation.SPLITTABLE_PAIR_WITH_RESTRICTION_URN)
                  .setPayload(splittableParDo.getSpec().getPayload()));
          pairWithRestriction.setEnvironmentId(splittableParDo.getEnvironmentId());
          rval.getComponentsBuilder()
              .putTransforms(pairWithRestrictionId, pairWithRestriction.build());
        }

        String splitAndSizeId =
            generateUniqueId(transformId + "/SplitAndSize", existingComponents::containsTransforms);
        {
          PTransform.Builder splitAndSize = PTransform.newBuilder();
          splitAndSize.putInputs(mainInputName, pairWithRestrictionOutId);
          splitAndSize.putAllInputs(sideInputs);
          splitAndSize.putOutputs("out", splitAndSizeOutId);
          splitAndSize.setUniqueName(
              generateUniquePCollectonName(
                  splittableParDo.getUniqueName() + "/SplitAndSize", existingComponents));
          splitAndSize.setSpec(
              FunctionSpec.newBuilder()
                  .setUrn(PTransformTranslation.SPLITTABLE_SPLIT_AND_SIZE_RESTRICTIONS_URN)
                  .setPayload(splittableParDo.getSpec().getPayload()));
          splitAndSize.setEnvironmentId(splittableParDo.getEnvironmentId());
          rval.getComponentsBuilder().putTransforms(splitAndSizeId, splitAndSize.build());
        }
        PTransform.Builder newCompositeRoot =
            splittableParDo
                .toBuilder()
                // Clear the original splittable ParDo spec and add all the new transforms as
                // children.
                .clearSpec()
                .addAllSubtransforms(Arrays.asList(pairWithRestrictionId, splitAndSizeId));

        String processSizedElementsAndRestrictionsId =
            generateUniqueId(
                transformId + "/ProcessSizedElementsAndRestrictions",
                existingComponents::containsTransforms);
        String processSizedElementsInputPCollectionId = splitAndSizeOutId;
        if (isDrain()) {
          String truncateAndSizeCoderId =
              generateUniqueId(
                  mainInputPCollection.getCoderId() + "/TruncateAndSize",
                  existingComponents::containsCoders);
          rval.getComponentsBuilder()
              .putCoders(
                  truncateAndSizeCoderId,
                  ModelCoders.kvCoder(
                      splitAndSizeOutCoderId, getOrAddDoubleCoder(existingComponents, rval)));
          String truncateAndSizeOutId =
              generateUniqueId(
                  mainInputPCollectionId + "/TruncateAndSize",
                  existingComponents::containsPcollections);

          rval.getComponentsBuilder()
              .putPcollections(
                  truncateAndSizeOutId,
                  PCollection.newBuilder()
                      .setCoderId(truncateAndSizeCoderId)
                      .setIsBounded(mainInputPCollection.getIsBounded())
                      .setWindowingStrategyId(mainInputPCollection.getWindowingStrategyId())
                      .setUniqueName(
                          generateUniquePCollectonName(
                              mainInputPCollection.getUniqueName() + "/TruncateAndSize",
                              existingComponents))
                      .build());
          String truncateAndSizeId =
              generateUniqueId(
                  transformId + "/TruncateAndSize", existingComponents::containsTransforms);
          {
            PTransform.Builder truncateAndSize = PTransform.newBuilder();
            truncateAndSize.putInputs(mainInputName, splitAndSizeOutId);
            truncateAndSize.putAllInputs(sideInputs);
            truncateAndSize.putOutputs("out", truncateAndSizeOutId);
            truncateAndSize.setUniqueName(
                generateUniquePCollectonName(
                    splittableParDo.getUniqueName() + "/TruncateAndSize", existingComponents));
            truncateAndSize.setSpec(
                FunctionSpec.newBuilder()
                    .setUrn(PTransformTranslation.SPLITTABLE_TRUNCATE_SIZED_RESTRICTION_URN)
                    .setPayload(splittableParDo.getSpec().getPayload()));
            truncateAndSize.setEnvironmentId(splittableParDo.getEnvironmentId());
            rval.getComponentsBuilder().putTransforms(truncateAndSizeId, truncateAndSize.build());
          }
          newCompositeRoot.addSubtransforms(truncateAndSizeId);
          processSizedElementsInputPCollectionId = truncateAndSizeOutId;
        }
        {
          PTransform.Builder processSizedElementsAndRestrictions = PTransform.newBuilder();
          processSizedElementsAndRestrictions.putInputs(
              mainInputName, processSizedElementsInputPCollectionId);
          processSizedElementsAndRestrictions.putAllInputs(sideInputs);
          processSizedElementsAndRestrictions.putAllOutputs(splittableParDo.getOutputsMap());
          processSizedElementsAndRestrictions.setUniqueName(
              generateUniquePCollectonName(
                  splittableParDo.getUniqueName() + "/ProcessSizedElementsAndRestrictions",
                  existingComponents));
          processSizedElementsAndRestrictions.setSpec(
              FunctionSpec.newBuilder()
                  .setUrn(
                      PTransformTranslation.SPLITTABLE_PROCESS_SIZED_ELEMENTS_AND_RESTRICTIONS_URN)
                  .setPayload(splittableParDo.getSpec().getPayload()));
          processSizedElementsAndRestrictions.setEnvironmentId(splittableParDo.getEnvironmentId());
          rval.getComponentsBuilder()
              .putTransforms(
                  processSizedElementsAndRestrictionsId,
                  processSizedElementsAndRestrictions.build());
        }
        newCompositeRoot.addSubtransforms(processSizedElementsAndRestrictionsId);
        rval.setPtransform(newCompositeRoot);
        return rval.build();
      } catch (IOException e) {
        throw new RuntimeException("Unable to perform expansion for transform " + transformId, e);
      }
    }