in sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/windowing/AfterWatermark.java [298:349]
public TriggerResult onElement(OnElementContext c) throws Exception {
// We always have an early trigger, even if it is the one that never fires. It will be marked
// as finished once the watermark has passed the end of the window.
if (!c.trigger().isFinished(EARLY_INDEX)) {
// We're running the early trigger. If the window function is merging, we need to also
// pass the events to the late trigger, so that merging data is available.
ExecutableTrigger<W> current = c.trigger().subTrigger(EARLY_INDEX);
TriggerResult result = current.invokeElement(c);
if (result.isFire()) {
// the subtriggers are OnceTriggers that are implicitly repeated. Rather than having
// wrapped them explicitly, we implement that logic here. This allows us to take advantage
// of the fact that they're being repeated to improve the implementation of this trigger.
current.invokeClear(c);
c.trigger().setFinished(false, EARLY_INDEX);
if (lateTrigger != null && c.trigger().isMerging()) {
c.trigger().subTrigger(LATE_INDEX).invokeClear(c);
}
return TriggerResult.FIRE;
} else {
if (lateTrigger != null && c.trigger().isMerging()) {
if (c.trigger().subTrigger(LATE_INDEX).invokeElement(c).isFinish()) {
// If late trigegr finishes, clear it out and keep going.
c.trigger().subTrigger(LATE_INDEX).invokeClear(c);
c.trigger().setFinished(false, LATE_INDEX);
}
}
return TriggerResult.CONTINUE;
}
} else if (lateTrigger != null) {
// We're running the late trigger -- otherwise the root would have finished when the early
// finished.
ExecutableTrigger<W> current = c.trigger().subTrigger(LATE_INDEX);
TriggerResult result = current.invokeElement(c);
if (result.isFire()) {
// the subtriggers are OnceTriggers that need an implicit repeat around them. So, reset
// the trigger after it fires.
current.invokeClear(c);
c.trigger().setFinished(false, LATE_INDEX);
return TriggerResult.FIRE;
} else {
return TriggerResult.CONTINUE;
}
} else {
throw new IllegalStateException(
"Shouldn't receive elements after the watermark with no late trigger");
}
}