public ParDoFn create()

in sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/worker/NormalParDoFn.java [79:148]


    public ParDoFn create(
        PipelineOptions options,
        final CloudObject cloudUserFn,
        String stepName,
        String transformName,
        @Nullable List<SideInputInfo> sideInputInfos,
        @Nullable List<MultiOutputInfo> multiOutputInfos,
        int numOutputs,
        DataflowExecutionContext executionContext,
        CounterSet.AddCounterMutator addCounterMutator,
        StateSampler stateSampler)
            throws Exception {
      Object deserializedFnInfo =
          SerializableUtils.deserializeFromByteArray(
              getBytes(cloudUserFn, PropertyNames.SERIALIZED_FN),
              "serialized fn info");
      if (!(deserializedFnInfo instanceof DoFnInfo)) {
        throw new Exception(
            "unexpected kind of DoFnInfo: " + deserializedFnInfo.getClass().getName());
      }
      DoFnInfo<?, ?> doFnInfo = (DoFnInfo<?, ?>) deserializedFnInfo;

      // If side input source metadata is provided by the service in sideInputInfos, we request
      // a SideInputReader from the executionContext using that info.
      //
      // If no side input source metadata is provided but the DoFn expects side inputs, as a
      // fallback, we request a SideInputReader based only on the expected views.
      //
      // These cases are not disjoint: Whenever a DoFn takes side inputs,
      // doFnInfo.getSideInputViews() should be non-empty.
      //
      // A note on the behavior of the Dataflow service: Today, the first case corresponds to
      // batch mode, while the fallback corresponds to streaming mode.
      SideInputReader sideInputReader;
      final Iterable<PCollectionView<?>> sideInputViews = doFnInfo.getSideInputViews();
      if (sideInputInfos != null && !sideInputInfos.isEmpty()) {
        sideInputReader = executionContext.getSideInputReader(sideInputInfos);
      } else if (sideInputViews != null && Iterables.size(sideInputViews) > 0) {
        sideInputReader = executionContext.getSideInputReaderForViews(sideInputViews);
      } else {
        sideInputReader = NullSideInputReader.empty();
      }

      List<String> outputTags = new ArrayList<>();
      if (multiOutputInfos != null) {
        for (MultiOutputInfo multiOutputInfo : multiOutputInfos) {
          outputTags.add(multiOutputInfo.getTag());
        }
      }
      if (outputTags.isEmpty()) {
        // Legacy support: assume there's a single output tag named "output".
        // (The output tag name will be ignored, for the main output.)
        outputTags.add("output");
      }
      if (numOutputs != outputTags.size()) {
        throw new AssertionError(
            "unexpected number of outputTags for DoFn");
      }

      return NormalParDoFn.of(
          options,
          doFnInfo,
          sideInputReader,
          outputTags,
          stepName,
          transformName,
          executionContext,
          addCounterMutator,
          stateSampler);
    }