public ParDoFn create()

in sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/worker/GroupAlsoByWindowsParDoFn.java [83:163]


    public ParDoFn create(
        PipelineOptions options,
        CloudObject cloudUserFn,
        String stepName,
        String transformName,
        @Nullable List<SideInputInfo> sideInputInfos,
        @Nullable List<MultiOutputInfo> multiOutputInfos,
        int numOutputs,
        DataflowExecutionContext executionContext,
        CounterSet.AddCounterMutator addCounterMutator,
        StateSampler stateSampler)
            throws Exception {
      Object windowingStrategyObj;
      byte[] encodedWindowingStrategy = getBytes(cloudUserFn, PropertyNames.SERIALIZED_FN);
      if (encodedWindowingStrategy.length == 0) {
        windowingStrategyObj = WindowingStrategy.globalDefault();
      } else {
        windowingStrategyObj =
          SerializableUtils.deserializeFromByteArray(
              encodedWindowingStrategy, "serialized windowing strategy");
        Preconditions.checkArgument(
          windowingStrategyObj instanceof WindowingStrategy,
          "unexpected kind of WindowingStrategy: " + windowingStrategyObj.getClass().getName());
      }
      @SuppressWarnings({"rawtypes", "unchecked"})
      WindowingStrategy windowingStrategy = (WindowingStrategy) windowingStrategyObj;

      byte[] serializedCombineFn = getBytes(cloudUserFn, PropertyNames.COMBINE_FN, null);
      AppliedCombineFn<?, ?, ?, ?> combineFn = null;
      if (serializedCombineFn != null) {
        Object combineFnObj = SerializableUtils.deserializeFromByteArray(
            serializedCombineFn, "serialized combine fn");
        Preconditions.checkArgument(
            combineFnObj instanceof AppliedCombineFn,
            "unexpected kind of AppliedCombineFn: " + combineFnObj.getClass().getName());
        combineFn = (AppliedCombineFn<?, ?, ?, ?>) combineFnObj;
      }

      Map<String, Object> inputCoderObject = getObject(cloudUserFn, PropertyNames.INPUT_CODER);

      Coder<?> inputCoder = Serializer.deserialize(inputCoderObject, Coder.class);
      Preconditions.checkArgument(
          inputCoder instanceof WindowedValueCoder,
          "Expected WindowedValueCoder for inputCoder, got: " + inputCoder.getClass().getName());
      @SuppressWarnings("unchecked")
      WindowedValueCoder<?> windowedValueCoder = (WindowedValueCoder<?>) inputCoder;

      Coder<?> elemCoder = windowedValueCoder.getValueCoder();
      Preconditions.checkArgument(
          elemCoder instanceof KvCoder,
          "Expected KvCoder for inputCoder, got: " + elemCoder.getClass().getName());
      @SuppressWarnings("unchecked")
      KvCoder<?, ?> kvCoder = (KvCoder<?, ?>) elemCoder;

      boolean isStreamingPipeline = options.as(StreamingOptions.class).isStreaming();

      @Nullable AppliedCombineFn<?, ?, ?, ?> maybeMergingCombineFn = null;
      if (combineFn != null) {
        String phase = getString(cloudUserFn, PropertyNames.PHASE, CombinePhase.ALL);
        Preconditions.checkArgument(
            phase.equals(CombinePhase.ALL) || phase.equals(CombinePhase.MERGE),
            "Unexpected phase: " + phase);
        if (phase.equals(CombinePhase.MERGE)) {
          maybeMergingCombineFn = makeAppliedMergingFunction(combineFn);
        } else {
          maybeMergingCombineFn = combineFn;
        }
      }

      DoFn<?, ?> groupAlsoByWindowsDoFn = getGroupAlsoByWindowsDoFn(
          isStreamingPipeline, windowingStrategy, kvCoder, maybeMergingCombineFn);

      return GroupAlsoByWindowsParDoFn.of(
          options,
          groupAlsoByWindowsDoFn,
          stepName,
          transformName,
          executionContext,
          addCounterMutator,
          stateSampler);
    }