in sdk/src/main/java/com/google/cloud/dataflow/sdk/util/ReduceFnRunner.java [283:349]
private void processElement(Function<W, W> windowMapping, Map<W, TriggerResult> results,
WindowedValue<InputT> value) {
Lateness lateness = getLateness(value.getTimestamp());
if (lateness.isPastAllowedLateness) {
// Drop the element in all assigned windows if it is past the allowed lateness limit.
droppedDueToLateness.addValue((long) value.getWindows().size());
return;
}
@SuppressWarnings("unchecked")
Iterable<W> windows =
FluentIterable.from((Collection<W>) value.getWindows()).transform(windowMapping);
// Prefetch in each of the windows if we're going to need to process triggers
for (W window : windows) {
if (!results.containsKey(window)) {
ReduceFn<K, InputT, OutputT, W>.ProcessValueContext context =
contextFactory.forValue(window, value.getValue(), value.getTimestamp());
triggerRunner.prefetchForValue(context.state());
}
}
// And process each of the windows
for (W window : windows) {
ReduceFn<K, InputT, OutputT, W>.ProcessValueContext context =
contextFactory.forValue(window, value.getValue(), value.getTimestamp());
// Check to see if the triggerRunner thinks the window is closed. If so, drop that window.
if (!results.containsKey(window) && triggerRunner.isClosed(context.state())) {
droppedDueToClosedWindow.addValue(1L);
continue;
}
nonEmptyPanes.recordContent(context);
// Make sure we've scheduled the cleanup timer for this window, if the premerge didn't already
// do that.
if (windowingStrategy.getWindowFn().isNonMerging()) {
// Since non-merging window functions don't track the active window set, we always schedule
// cleanup.
scheduleCleanup(context);
}
// Update the watermark hold since the value will be part of the next pane.
watermarkHold.addHold(context, lateness.isLate);
// Execute the reduceFn, which will buffer the value as appropriate
try {
reduceFn.processValue(context);
} catch (Exception e) {
throw wrapMaybeUserException(e);
}
// Run the trigger and handle the result as appropriate
if (!results.containsKey(window)) {
try {
TriggerResult result = triggerRunner.processValue(context);
if (result.isFire()) {
results.put(window, result);
}
} catch (Exception e) {
Throwables.propagateIfPossible(e);
throw new RuntimeException("Failed to run trigger", e);
}
}
}
}