in sdk/src/main/java/com/google/cloud/dataflow/sdk/util/ReduceFnRunner.java [420:469]
public void onTimer(TimerData timer) {
if (!(timer.getNamespace() instanceof WindowNamespace)) {
throw new IllegalArgumentException(
"Expected WindowNamespace, but was " + timer.getNamespace());
}
@SuppressWarnings("unchecked")
WindowNamespace<W> windowNamespace = (WindowNamespace<W>) timer.getNamespace();
W window = windowNamespace.getWindow();
if (!activeWindows.contains(window) && windowingStrategy.getWindowFn().isNonMerging()) {
throw new IllegalStateException(
"Internal Error: Received timer " + timer + " for inactive window: " + window);
}
ReduceFn<K, InputT, OutputT, W>.Context context = contextFactory.base(window);
// If this timer firing is at the watermark, then it may cause a trigger firing of an
// AfterWatermark trigger.
boolean isAtWatermark = TimeDomain.EVENT_TIME == timer.getDomain()
&& !timer.getTimestamp().isBefore(window.maxTimestamp());
if (shouldCleanup(timer, window)) {
// We're going to cleanup the window. We want to treat any potential output from this as
// the at-watermark firing if the current time is the at-watermark firing and there was a
// trigger waiting for it.
if (isAtWatermark) {
TriggerResult timerResult = runTriggersForTimer(context, timer);
isAtWatermark = (timerResult != null && timerResult.isFire());
}
// Do the actual cleanup
try {
doCleanup(context, isAtWatermark);
} catch (Exception e) {
Throwables.propagateIfInstanceOf(e, UserCodeException.class);
throw new RuntimeException(
"Exception while garbage collecting window " + windowNamespace.getWindow(), e);
}
} else {
if (activeWindows.contains(window) && !triggerRunner.isClosed(context.state())) {
handleTriggerResult(context, isAtWatermark, runTriggersForTimer(context, timer));
}
if (TimeDomain.EVENT_TIME == timer.getDomain()
// If we processed an on-time firing, we should schedule the GC timer.
&& timer.getTimestamp().isEqual(window.maxTimestamp())) {
scheduleCleanup(context);
}
}
}