public static ParDoPayload translateParDo()

in runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java [215:360]


  public static <InputT> ParDoPayload translateParDo(
      ParDo.MultiOutput<InputT, ?> parDo,
      PCollection<InputT> mainInput,
      DoFnSchemaInformation doFnSchemaInformation,
      Pipeline pipeline,
      SdkComponents components)
      throws IOException {
    final DoFn<?, ?> doFn = parDo.getFn();
    final DoFnSignature signature = DoFnSignatures.getSignature(doFn.getClass());
    final String restrictionCoderId;
    if (signature.processElement().isSplittable()) {
      DoFnInvoker<?, ?> doFnInvoker = DoFnInvokers.invokerFor(doFn);
      final Coder<?> restrictionAndWatermarkStateCoder =
          KvCoder.of(
              doFnInvoker.invokeGetRestrictionCoder(pipeline.getCoderRegistry()),
              doFnInvoker.invokeGetWatermarkEstimatorStateCoder(pipeline.getCoderRegistry()));
      restrictionCoderId = components.registerCoder(restrictionAndWatermarkStateCoder);
    } else {
      restrictionCoderId = "";
    }

    Coder<BoundedWindow> windowCoder =
        (Coder<BoundedWindow>) mainInput.getWindowingStrategy().getWindowFn().windowCoder();
    Coder<?> keyCoder;
    if (signature.usesState() || signature.usesTimers()) {
      checkArgument(
          mainInput.getCoder() instanceof KvCoder,
          "DoFn's that use state or timers must have an input PCollection with a KvCoder but received %s",
          mainInput.getCoder());
      keyCoder = ((KvCoder) mainInput.getCoder()).getKeyCoder();
    } else {
      keyCoder = null;
    }

    return payloadForParDoLike(
        new ParDoLike() {
          @Override
          public FunctionSpec translateDoFn(SdkComponents newComponents) {
            return ParDoTranslation.translateDoFn(
                parDo.getFn(),
                parDo.getMainOutputTag(),
                parDo.getSideInputs(),
                doFnSchemaInformation,
                newComponents);
          }

          @Override
          public Map<String, SideInput> translateSideInputs(SdkComponents components) {
            Map<String, SideInput> sideInputs = new HashMap<>();
            for (PCollectionView<?> sideInput : parDo.getSideInputs().values()) {
              sideInputs.put(
                  sideInput.getTagInternal().getId(), translateView(sideInput, components));
            }
            return sideInputs;
          }

          @Override
          public Map<String, RunnerApi.StateSpec> translateStateSpecs(SdkComponents components)
              throws IOException {
            Map<String, RunnerApi.StateSpec> stateSpecs = new HashMap<>();
            for (Map.Entry<String, StateDeclaration> state :
                signature.stateDeclarations().entrySet()) {
              RunnerApi.StateSpec spec =
                  translateStateSpec(getStateSpecOrThrow(state.getValue(), doFn), components);
              stateSpecs.put(state.getKey(), spec);
            }
            return stateSpecs;
          }

          @Override
          public Map<String, RunnerApi.TimerFamilySpec> translateTimerFamilySpecs(
              SdkComponents newComponents) {
            Map<String, RunnerApi.TimerFamilySpec> timerFamilySpecs = new HashMap<>();

            for (Map.Entry<String, TimerDeclaration> timer :
                signature.timerDeclarations().entrySet()) {
              RunnerApi.TimerFamilySpec spec =
                  translateTimerFamilySpec(
                      getTimerSpecOrThrow(timer.getValue(), doFn),
                      newComponents,
                      keyCoder,
                      windowCoder);
              timerFamilySpecs.put(timer.getKey(), spec);
            }

            for (Map.Entry<String, DoFnSignature.TimerFamilyDeclaration> timerFamily :
                signature.timerFamilyDeclarations().entrySet()) {
              RunnerApi.TimerFamilySpec spec =
                  translateTimerFamilySpec(
                      DoFnSignatures.getTimerFamilySpecOrThrow(timerFamily.getValue(), doFn),
                      newComponents,
                      keyCoder,
                      windowCoder);
              timerFamilySpecs.put(timerFamily.getKey(), spec);
            }
            return timerFamilySpecs;
          }

          @Override
          public boolean isStateful() {
            return !signature.stateDeclarations().isEmpty()
                || !signature.timerDeclarations().isEmpty()
                || !signature.timerFamilyDeclarations().isEmpty();
          }

          @Override
          public boolean isSplittable() {
            return signature.processElement().isSplittable();
          }

          @Override
          public boolean isRequiresStableInput() {
            return signature.processElement().requiresStableInput();
          }

          @Override
          public boolean isRequiresTimeSortedInput() {
            return signature.processElement().requiresTimeSortedInput();
          }

          @Override
          public boolean requestsFinalization() {
            return (signature.startBundle() != null
                    && signature
                        .startBundle()
                        .extraParameters()
                        .contains(Parameter.bundleFinalizer()))
                || (signature.processElement() != null
                    && signature
                        .processElement()
                        .extraParameters()
                        .contains(Parameter.bundleFinalizer()))
                || (signature.finishBundle() != null
                    && signature
                        .finishBundle()
                        .extraParameters()
                        .contains(Parameter.bundleFinalizer()));
          }

          @Override
          public String translateRestrictionCoderId(SdkComponents newComponents) {
            return restrictionCoderId;
          }
        },
        components);
  }