in sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/worker/ParDoFnBase.java [130:205]
private void reallyStartBundle() throws Exception {
Preconditions.checkState(fnRunner == null, "bundle already started (or not properly finished)");
StepContext stepContext = null;
if (executionContext != null) {
stepContext = executionContext.getOrCreateStepContext(stepName, transformName, stateSampler);
}
@SuppressWarnings("unchecked")
DoFnInfo<Object, Object> doFnInfo = (DoFnInfo<Object, Object>) getDoFnInfo();
OutputManager outputManager = new OutputManager() {
final Map<TupleTag<?>, OutputReceiver> undeclaredOutputs = new HashMap<>();
@Nullable
private Receiver getReceiverOrNull(TupleTag<?> tag) {
if (tag.equals(mainOutputTag)) {
return receivers[0];
} else if (sideOutputTags.contains(tag)) {
return receivers[sideOutputTags.indexOf(tag) + 1];
} else {
return undeclaredOutputs.get(tag);
}
}
@Override
public <T> void output(TupleTag<T> tag, WindowedValue<T> output) {
Receiver receiver = getReceiverOrNull(tag);
if (receiver == null) {
// A new undeclared output.
// TODO: plumb through the operationName, so that we can
// name implicit outputs after it.
String outputName = "implicit-" + tag.getId();
// TODO: plumb through the counter prefix, so we can
// make it available to the OutputReceiver class in case
// it wants to use it in naming output counters. (It
// doesn't today.)
OutputReceiver undeclaredReceiver = new OutputReceiver();
ElementCounter outputCounter = new DataflowOutputCounter(outputName, addCounterMutator);
undeclaredReceiver.addOutputCounter(outputCounter);
undeclaredOutputs.put(tag, undeclaredReceiver);
receiver = undeclaredReceiver;
}
try {
receiver.process(output);
} catch (Throwable t) {
throw Throwables.propagate(t);
}
}
};
if (hasStreamingSideInput) {
fnRunner = new StreamingSideInputDoFnRunner<Object, Object, BoundedWindow>(
options,
doFnInfo,
sideInputReader,
outputManager,
mainOutputTag,
sideOutputTags,
stepContext,
addCounterMutator);
} else {
fnRunner = DoFnRunner.create(
options,
doFnInfo.getDoFn(),
sideInputReader,
outputManager,
mainOutputTag,
sideOutputTags,
stepContext,
addCounterMutator,
doFnInfo.getWindowingStrategy());
}
fnRunner.startBundle();
}