private void reallyStartBundle()

in sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/worker/ParDoFnBase.java [130:205]


  private void reallyStartBundle() throws Exception {
    Preconditions.checkState(fnRunner == null, "bundle already started (or not properly finished)");
    StepContext stepContext = null;
    if (executionContext != null) {
      stepContext = executionContext.getOrCreateStepContext(stepName, transformName, stateSampler);
    }

    @SuppressWarnings("unchecked")
    DoFnInfo<Object, Object> doFnInfo = (DoFnInfo<Object, Object>) getDoFnInfo();

    OutputManager outputManager = new OutputManager() {
      final Map<TupleTag<?>, OutputReceiver> undeclaredOutputs = new HashMap<>();

      @Nullable
      private Receiver getReceiverOrNull(TupleTag<?> tag) {
        if (tag.equals(mainOutputTag)) {
          return receivers[0];
        } else if (sideOutputTags.contains(tag)) {
          return receivers[sideOutputTags.indexOf(tag) + 1];
        } else {
          return undeclaredOutputs.get(tag);
        }
      }

      @Override
      public <T> void output(TupleTag<T> tag, WindowedValue<T> output) {
        Receiver receiver = getReceiverOrNull(tag);
        if (receiver == null) {
          // A new undeclared output.
          // TODO: plumb through the operationName, so that we can
          // name implicit outputs after it.
          String outputName = "implicit-" + tag.getId();
          // TODO: plumb through the counter prefix, so we can
          // make it available to the OutputReceiver class in case
          // it wants to use it in naming output counters.  (It
          // doesn't today.)
          OutputReceiver undeclaredReceiver = new OutputReceiver();
          ElementCounter outputCounter = new DataflowOutputCounter(outputName, addCounterMutator);
          undeclaredReceiver.addOutputCounter(outputCounter);
          undeclaredOutputs.put(tag, undeclaredReceiver);
          receiver = undeclaredReceiver;
        }

        try {
          receiver.process(output);
        } catch (Throwable t) {
          throw Throwables.propagate(t);
        }
      }
    };

    if (hasStreamingSideInput) {
      fnRunner = new StreamingSideInputDoFnRunner<Object, Object, BoundedWindow>(
          options,
          doFnInfo,
          sideInputReader,
          outputManager,
          mainOutputTag,
          sideOutputTags,
          stepContext,
          addCounterMutator);
    } else {
      fnRunner = DoFnRunner.create(
          options,
          doFnInfo.getDoFn(),
          sideInputReader,
          outputManager,
          mainOutputTag,
          sideOutputTags,
          stepContext,
          addCounterMutator,
          doFnInfo.getWindowingStrategy());
    }

    fnRunner.startBundle();
  }