in sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/worker/NormalParDoFn.java [79:148]
public ParDoFn create(
PipelineOptions options,
final CloudObject cloudUserFn,
String stepName,
String transformName,
@Nullable List<SideInputInfo> sideInputInfos,
@Nullable List<MultiOutputInfo> multiOutputInfos,
int numOutputs,
DataflowExecutionContext executionContext,
CounterSet.AddCounterMutator addCounterMutator,
StateSampler stateSampler)
throws Exception {
Object deserializedFnInfo =
SerializableUtils.deserializeFromByteArray(
getBytes(cloudUserFn, PropertyNames.SERIALIZED_FN),
"serialized fn info");
if (!(deserializedFnInfo instanceof DoFnInfo)) {
throw new Exception(
"unexpected kind of DoFnInfo: " + deserializedFnInfo.getClass().getName());
}
DoFnInfo<?, ?> doFnInfo = (DoFnInfo<?, ?>) deserializedFnInfo;
// If side input source metadata is provided by the service in sideInputInfos, we request
// a SideInputReader from the executionContext using that info.
//
// If no side input source metadata is provided but the DoFn expects side inputs, as a
// fallback, we request a SideInputReader based only on the expected views.
//
// These cases are not disjoint: Whenever a DoFn takes side inputs,
// doFnInfo.getSideInputViews() should be non-empty.
//
// A note on the behavior of the Dataflow service: Today, the first case corresponds to
// batch mode, while the fallback corresponds to streaming mode.
SideInputReader sideInputReader;
final Iterable<PCollectionView<?>> sideInputViews = doFnInfo.getSideInputViews();
if (sideInputInfos != null && !sideInputInfos.isEmpty()) {
sideInputReader = executionContext.getSideInputReader(sideInputInfos);
} else if (sideInputViews != null && Iterables.size(sideInputViews) > 0) {
sideInputReader = executionContext.getSideInputReaderForViews(sideInputViews);
} else {
sideInputReader = NullSideInputReader.empty();
}
List<String> outputTags = new ArrayList<>();
if (multiOutputInfos != null) {
for (MultiOutputInfo multiOutputInfo : multiOutputInfos) {
outputTags.add(multiOutputInfo.getTag());
}
}
if (outputTags.isEmpty()) {
// Legacy support: assume there's a single output tag named "output".
// (The output tag name will be ignored, for the main output.)
outputTags.add("output");
}
if (numOutputs != outputTags.size()) {
throw new AssertionError(
"unexpected number of outputTags for DoFn");
}
return NormalParDoFn.of(
options,
doFnInfo,
sideInputReader,
outputTags,
stepName,
transformName,
executionContext,
addCounterMutator,
stateSampler);
}