public TriggerResult onElement()

in sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/windowing/AfterWatermark.java [298:349]


    public TriggerResult onElement(OnElementContext c) throws Exception {
      // We always have an early trigger, even if it is the one that never fires. It will be marked
      // as finished once the watermark has passed the end of the window.

      if (!c.trigger().isFinished(EARLY_INDEX)) {
        // We're running the early trigger. If the window function is merging, we need to also
        // pass the events to the late trigger, so that merging data is available.
        ExecutableTrigger<W> current = c.trigger().subTrigger(EARLY_INDEX);
        TriggerResult result = current.invokeElement(c);
        if (result.isFire()) {
          // the subtriggers are OnceTriggers that are implicitly repeated. Rather than having
          // wrapped them explicitly, we implement that logic here. This allows us to take advantage
          // of the fact that they're being repeated to improve the implementation of this trigger.
          current.invokeClear(c);
          c.trigger().setFinished(false, EARLY_INDEX);

          if (lateTrigger != null && c.trigger().isMerging()) {
            c.trigger().subTrigger(LATE_INDEX).invokeClear(c);
          }

          return TriggerResult.FIRE;
        } else {

          if (lateTrigger != null && c.trigger().isMerging()) {
            if (c.trigger().subTrigger(LATE_INDEX).invokeElement(c).isFinish()) {
              // If late trigegr finishes, clear it out and keep going.
              c.trigger().subTrigger(LATE_INDEX).invokeClear(c);
              c.trigger().setFinished(false, LATE_INDEX);
            }
          }

          return TriggerResult.CONTINUE;
        }
      } else if (lateTrigger != null) {
        // We're running the late trigger -- otherwise the root would have finished when the early
        // finished.
        ExecutableTrigger<W> current = c.trigger().subTrigger(LATE_INDEX);
        TriggerResult result = current.invokeElement(c);
        if (result.isFire()) {
          // the subtriggers are OnceTriggers that need an implicit repeat around them. So, reset
          // the trigger after it fires.
          current.invokeClear(c);
          c.trigger().setFinished(false, LATE_INDEX);
          return TriggerResult.FIRE;
        } else {
          return TriggerResult.CONTINUE;
        }
      } else {
        throw new IllegalStateException(
            "Shouldn't receive elements after the watermark with no late trigger");
      }
    }