public ParDoFn create()

in runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/GroupAlsoByWindowParDoFnFactory.java [79:228]


  public ParDoFn create(
      PipelineOptions options,
      CloudObject cloudUserFn,
      @Nullable List<SideInputInfo> sideInputInfos,
      TupleTag<?> mainOutputTag,
      Map<TupleTag<?>, Integer> outputTupleTagsToReceiverIndices,
      final DataflowExecutionContext<?> executionContext,
      DataflowOperationContext operationContext)
      throws Exception {
    Map.Entry<TupleTag<?>, Integer> entry =
        Iterables.getOnlyElement(outputTupleTagsToReceiverIndices.entrySet());
    checkArgument(
        entry.getKey().equals(mainOutputTag),
        "Output tags should reference only the main output tag: %s vs %s",
        entry.getKey(),
        mainOutputTag);
    checkArgument(
        entry.getValue() == 0,
        "There should be a single receiver, but using receiver index %s",
        entry.getValue());

    byte[] encodedWindowingStrategy = getBytes(cloudUserFn, PropertyNames.SERIALIZED_FN);
    WindowingStrategy windowingStrategy;
    try {
      windowingStrategy = deserializeWindowingStrategy(encodedWindowingStrategy);
    } catch (Exception e) {
      // Temporarily choose default windowing strategy if fn API is enabled.
      // TODO: Catch block disappears, becoming an error once Python SDK is compliant.
      if (DataflowRunner.hasExperiment(
          options.as(DataflowPipelineDebugOptions.class), "beam_fn_api")) {
        LOG.info("FnAPI: Unable to deserialize windowing strategy, assuming default", e);
        windowingStrategy = WindowingStrategy.globalDefault();
      } else {
        throw e;
      }
    }

    byte[] serializedCombineFn = getBytes(cloudUserFn, WorkerPropertyNames.COMBINE_FN, null);
    AppliedCombineFn<?, ?, ?, ?> combineFn = null;
    if (serializedCombineFn != null) {
      Object combineFnObj =
          SerializableUtils.deserializeFromByteArray(serializedCombineFn, "serialized combine fn");
      checkArgument(
          combineFnObj instanceof AppliedCombineFn,
          "unexpected kind of AppliedCombineFn: " + combineFnObj.getClass().getName());
      combineFn = (AppliedCombineFn<?, ?, ?, ?>) combineFnObj;
    }

    Map<String, Object> inputCoderObject = getObject(cloudUserFn, WorkerPropertyNames.INPUT_CODER);

    Coder<?> inputCoder = CloudObjects.coderFromCloudObject(CloudObject.fromSpec(inputCoderObject));
    checkArgument(
        inputCoder instanceof WindowedValueCoder,
        "Expected WindowedValueCoder for inputCoder, got: " + inputCoder.getClass().getName());
    @SuppressWarnings("unchecked")
    WindowedValueCoder<?> windowedValueCoder = (WindowedValueCoder<?>) inputCoder;

    Coder<?> elemCoder = windowedValueCoder.getValueCoder();
    checkArgument(
        elemCoder instanceof KvCoder,
        "Expected KvCoder for inputCoder, got: " + elemCoder.getClass().getName());
    @SuppressWarnings("unchecked")
    KvCoder<?, ?> kvCoder = (KvCoder<?, ?>) elemCoder;

    boolean isStreamingPipeline = options.as(StreamingOptions.class).isStreaming();

    SideInputReader sideInputReader = NullSideInputReader.empty();
    @Nullable AppliedCombineFn<?, ?, ?, ?> maybeMergingCombineFn = null;
    if (combineFn != null) {
      sideInputReader =
          executionContext.getSideInputReader(
              sideInputInfos, combineFn.getSideInputViews(), operationContext);

      String phase = getString(cloudUserFn, WorkerPropertyNames.PHASE, CombinePhase.ALL);
      checkArgument(
          phase.equals(CombinePhase.ALL) || phase.equals(CombinePhase.MERGE),
          "Unexpected phase: %s",
          phase);
      if (phase.equals(CombinePhase.MERGE)) {
        maybeMergingCombineFn = makeAppliedMergingFunction(combineFn);
      } else {
        maybeMergingCombineFn = combineFn;
      }
    }

    StateInternalsFactory<?> stateInternalsFactory =
        key -> executionContext.getStepContext(operationContext).stateInternals();

    // This will be a GABW Fn for either batch or streaming, with combiner in it or not
    GroupAlsoByWindowFn<?, ?> fn;

    // This will be a FakeKeyedWorkItemCoder for streaming or null for batch
    Coder<?> gabwInputCoder;

    // TODO: do not do this with mess of "if"
    if (isStreamingPipeline) {
      if (maybeMergingCombineFn == null) {
        fn =
            StreamingGroupAlsoByWindowsDoFns.createForIterable(
                windowingStrategy, stateInternalsFactory, ((KvCoder) kvCoder).getValueCoder());
        gabwInputCoder = WindmillKeyedWorkItem.FakeKeyedWorkItemCoder.of(kvCoder);
      } else {
        fn =
            StreamingGroupAlsoByWindowsDoFns.create(
                windowingStrategy,
                stateInternalsFactory,
                (AppliedCombineFn) maybeMergingCombineFn,
                ((KvCoder) kvCoder).getKeyCoder());
        gabwInputCoder =
            WindmillKeyedWorkItem.FakeKeyedWorkItemCoder.of(
                ((AppliedCombineFn) maybeMergingCombineFn).getKvCoder());
      }

    } else {
      if (maybeMergingCombineFn == null) {
        fn =
            BatchGroupAlsoByWindowsDoFns.createForIterable(
                windowingStrategy, stateInternalsFactory, ((KvCoder) kvCoder).getValueCoder());
        gabwInputCoder = null;
      } else {
        fn =
            BatchGroupAlsoByWindowsDoFns.create(
                windowingStrategy, (AppliedCombineFn) maybeMergingCombineFn);
        gabwInputCoder = null;
      }
    }

    // TODO: or anyhow related to it, do not do this with mess of "if"
    if (maybeMergingCombineFn != null) {
      return new GroupAlsoByWindowsParDoFn(
          options,
          fn,
          windowingStrategy,
          ((AppliedCombineFn) maybeMergingCombineFn).getSideInputViews(),
          gabwInputCoder,
          sideInputReader,
          mainOutputTag,
          executionContext.getStepContext(operationContext));
    } else {
      return new GroupAlsoByWindowsParDoFn(
          options,
          fn,
          windowingStrategy,
          null,
          gabwInputCoder,
          sideInputReader,
          mainOutputTag,
          executionContext.getStepContext(operationContext));
    }
  }