public void translateNode()

in runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchTransformTranslators.java [629:786]


    public void translateNode(
        PTransform<PCollection<InputT>, PCollectionTuple> transform,
        FlinkBatchTranslationContext context) {
      DoFn<InputT, OutputT> doFn;
      try {
        doFn = (DoFn<InputT, OutputT>) ParDoTranslation.getDoFn(context.getCurrentTransform());
      } catch (IOException e) {
        throw new RuntimeException(e);
      }
      DoFnSignature signature = DoFnSignatures.signatureForDoFn(doFn);
      checkState(
          !signature.processElement().isSplittable(),
          "Not expected to directly translate splittable DoFn, should have been overridden: %s",
          doFn);
      DataSet<WindowedValue<InputT>> inputDataSet =
          context.getInputDataSet(context.getInput(transform));

      Map<TupleTag<?>, PCollection<?>> outputs = context.getOutputs(transform);

      final TupleTag<OutputT> mainOutputTag;
      DoFnSchemaInformation doFnSchemaInformation;
      Map<String, PCollectionView<?>> sideInputMapping;
      try {
        mainOutputTag =
            (TupleTag<OutputT>) ParDoTranslation.getMainOutputTag(context.getCurrentTransform());
      } catch (IOException e) {
        throw new RuntimeException(e);
      }
      doFnSchemaInformation = ParDoTranslation.getSchemaInformation(context.getCurrentTransform());
      sideInputMapping = ParDoTranslation.getSideInputMapping(context.getCurrentTransform());
      Map<TupleTag<?>, Integer> outputMap = Maps.newHashMap();
      // put the main output at index 0, FlinkMultiOutputDoFnFunction  expects this
      outputMap.put(mainOutputTag, 0);
      int count = 1;
      for (TupleTag<?> tag : outputs.keySet()) {
        if (!outputMap.containsKey(tag)) {
          outputMap.put(tag, count++);
        }
      }

      // Union coder elements must match the order of the output tags.
      Map<Integer, TupleTag<?>> indexMap = Maps.newTreeMap();
      for (Map.Entry<TupleTag<?>, Integer> entry : outputMap.entrySet()) {
        indexMap.put(entry.getValue(), entry.getKey());
      }

      // assume that the windowing strategy is the same for all outputs
      WindowingStrategy<?, ?> windowingStrategy = null;

      // collect all output Coders and create a UnionCoder for our tagged outputs
      List<Coder<?>> outputCoders = Lists.newArrayList();
      for (TupleTag<?> tag : indexMap.values()) {
        PValue taggedValue = outputs.get(tag);
        checkState(
            taggedValue instanceof PCollection,
            "Within ParDo, got a non-PCollection output %s of type %s",
            taggedValue,
            taggedValue.getClass().getSimpleName());
        PCollection<?> coll = (PCollection<?>) taggedValue;
        outputCoders.add(coll.getCoder());
        windowingStrategy = coll.getWindowingStrategy();
      }

      if (windowingStrategy == null) {
        throw new IllegalStateException("No outputs defined.");
      }

      UnionCoder unionCoder = UnionCoder.of(outputCoders);

      TypeInformation<WindowedValue<RawUnionValue>> typeInformation =
          new CoderTypeInformation<>(
              WindowedValue.getFullCoder(unionCoder, windowingStrategy.getWindowFn().windowCoder()),
              context.getPipelineOptions());

      List<PCollectionView<?>> sideInputs;
      try {
        sideInputs = ParDoTranslation.getSideInputs(context.getCurrentTransform());
      } catch (IOException e) {
        throw new RuntimeException(e);
      }

      // construct a map from side input to WindowingStrategy so that
      // the DoFn runner can map main-input windows to side input windows
      Map<PCollectionView<?>, WindowingStrategy<?, ?>> sideInputStrategies = new HashMap<>();
      for (PCollectionView<?> sideInput : sideInputs) {
        sideInputStrategies.put(sideInput, sideInput.getWindowingStrategyInternal());
      }

      SingleInputUdfOperator<WindowedValue<InputT>, WindowedValue<RawUnionValue>, ?> outputDataSet;
      boolean usesStateOrTimers;
      try {
        usesStateOrTimers = ParDoTranslation.usesStateOrTimers(context.getCurrentTransform());
      } catch (IOException e) {
        throw new RuntimeException(e);
      }

      final Map<TupleTag<?>, Coder<?>> outputCoderMap = context.getOutputCoders(transform);

      String fullName = getCurrentTransformName(context);
      if (usesStateOrTimers) {
        KvCoder<?, ?> inputCoder = (KvCoder<?, ?>) context.getInput(transform).getCoder();
        FlinkStatefulDoFnFunction<?, ?, OutputT> doFnWrapper =
            new FlinkStatefulDoFnFunction<>(
                (DoFn) doFn,
                fullName,
                windowingStrategy,
                sideInputStrategies,
                context.getPipelineOptions(),
                outputMap,
                mainOutputTag,
                inputCoder,
                outputCoderMap,
                doFnSchemaInformation,
                sideInputMapping);

        // Based on the fact that the signature is stateful, DoFnSignatures ensures
        // that it is also keyed.
        Coder<Object> keyCoder = (Coder) inputCoder.getKeyCoder();
        final Grouping<WindowedValue<InputT>> grouping;
        if (signature.processElement().requiresTimeSortedInput()) {
          grouping =
              inputDataSet
                  .groupBy((KeySelector) new KvKeySelector<>(keyCoder))
                  .sortGroup(new KeyWithValueTimestampSelector<>(), Order.ASCENDING);
        } else {
          grouping = inputDataSet.groupBy((KeySelector) new KvKeySelector<>(keyCoder));
        }

        outputDataSet = new GroupReduceOperator(grouping, typeInformation, doFnWrapper, fullName);

      } else {
        final FlinkDoFnFunction<InputT, OutputT> doFnWrapper =
            new FlinkDoFnFunction<>(
                doFn,
                fullName,
                windowingStrategy,
                sideInputStrategies,
                context.getPipelineOptions(),
                outputMap,
                mainOutputTag,
                context.getInput(transform).getCoder(),
                outputCoderMap,
                doFnSchemaInformation,
                sideInputMapping);

        outputDataSet = new FlatMapOperator<>(inputDataSet, typeInformation, doFnWrapper, fullName);
      }

      transformSideInputs(sideInputs, outputDataSet, context);

      for (Entry<TupleTag<?>, PCollection<?>> output : outputs.entrySet()) {
        pruneOutput(
            outputDataSet,
            context,
            outputMap.get(output.getKey()),
            (PCollection) output.getValue());
      }
    }