in sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/worker/GroupAlsoByWindowsParDoFn.java [83:163]
public ParDoFn create(
PipelineOptions options,
CloudObject cloudUserFn,
String stepName,
String transformName,
@Nullable List<SideInputInfo> sideInputInfos,
@Nullable List<MultiOutputInfo> multiOutputInfos,
int numOutputs,
DataflowExecutionContext executionContext,
CounterSet.AddCounterMutator addCounterMutator,
StateSampler stateSampler)
throws Exception {
Object windowingStrategyObj;
byte[] encodedWindowingStrategy = getBytes(cloudUserFn, PropertyNames.SERIALIZED_FN);
if (encodedWindowingStrategy.length == 0) {
windowingStrategyObj = WindowingStrategy.globalDefault();
} else {
windowingStrategyObj =
SerializableUtils.deserializeFromByteArray(
encodedWindowingStrategy, "serialized windowing strategy");
Preconditions.checkArgument(
windowingStrategyObj instanceof WindowingStrategy,
"unexpected kind of WindowingStrategy: " + windowingStrategyObj.getClass().getName());
}
@SuppressWarnings({"rawtypes", "unchecked"})
WindowingStrategy windowingStrategy = (WindowingStrategy) windowingStrategyObj;
byte[] serializedCombineFn = getBytes(cloudUserFn, PropertyNames.COMBINE_FN, null);
AppliedCombineFn<?, ?, ?, ?> combineFn = null;
if (serializedCombineFn != null) {
Object combineFnObj = SerializableUtils.deserializeFromByteArray(
serializedCombineFn, "serialized combine fn");
Preconditions.checkArgument(
combineFnObj instanceof AppliedCombineFn,
"unexpected kind of AppliedCombineFn: " + combineFnObj.getClass().getName());
combineFn = (AppliedCombineFn<?, ?, ?, ?>) combineFnObj;
}
Map<String, Object> inputCoderObject = getObject(cloudUserFn, PropertyNames.INPUT_CODER);
Coder<?> inputCoder = Serializer.deserialize(inputCoderObject, Coder.class);
Preconditions.checkArgument(
inputCoder instanceof WindowedValueCoder,
"Expected WindowedValueCoder for inputCoder, got: " + inputCoder.getClass().getName());
@SuppressWarnings("unchecked")
WindowedValueCoder<?> windowedValueCoder = (WindowedValueCoder<?>) inputCoder;
Coder<?> elemCoder = windowedValueCoder.getValueCoder();
Preconditions.checkArgument(
elemCoder instanceof KvCoder,
"Expected KvCoder for inputCoder, got: " + elemCoder.getClass().getName());
@SuppressWarnings("unchecked")
KvCoder<?, ?> kvCoder = (KvCoder<?, ?>) elemCoder;
boolean isStreamingPipeline = options.as(StreamingOptions.class).isStreaming();
@Nullable AppliedCombineFn<?, ?, ?, ?> maybeMergingCombineFn = null;
if (combineFn != null) {
String phase = getString(cloudUserFn, PropertyNames.PHASE, CombinePhase.ALL);
Preconditions.checkArgument(
phase.equals(CombinePhase.ALL) || phase.equals(CombinePhase.MERGE),
"Unexpected phase: " + phase);
if (phase.equals(CombinePhase.MERGE)) {
maybeMergingCombineFn = makeAppliedMergingFunction(combineFn);
} else {
maybeMergingCombineFn = combineFn;
}
}
DoFn<?, ?> groupAlsoByWindowsDoFn = getGroupAlsoByWindowsDoFn(
isStreamingPipeline, windowingStrategy, kvCoder, maybeMergingCombineFn);
return GroupAlsoByWindowsParDoFn.of(
options,
groupAlsoByWindowsDoFn,
stepName,
transformName,
executionContext,
addCounterMutator,
stateSampler);
}