public void onTimer()

in sdk/src/main/java/com/google/cloud/dataflow/sdk/util/ReduceFnRunner.java [420:469]


  public void onTimer(TimerData timer) {
    if (!(timer.getNamespace() instanceof WindowNamespace)) {
      throw new IllegalArgumentException(
          "Expected WindowNamespace, but was " + timer.getNamespace());
    }

    @SuppressWarnings("unchecked")
    WindowNamespace<W> windowNamespace = (WindowNamespace<W>) timer.getNamespace();
    W window = windowNamespace.getWindow();
    if (!activeWindows.contains(window) && windowingStrategy.getWindowFn().isNonMerging()) {
      throw new IllegalStateException(
          "Internal Error: Received timer " + timer + " for inactive window: " + window);
    }

    ReduceFn<K, InputT, OutputT, W>.Context context = contextFactory.base(window);

    // If this timer firing is at the watermark, then it may cause a trigger firing of an
    // AfterWatermark trigger.
    boolean isAtWatermark = TimeDomain.EVENT_TIME == timer.getDomain()
        && !timer.getTimestamp().isBefore(window.maxTimestamp());

    if (shouldCleanup(timer, window)) {
      // We're going to cleanup the window. We want to treat any potential output from this as
      // the at-watermark firing if the current time is the at-watermark firing and there was a
      // trigger waiting for it.
      if (isAtWatermark) {
        TriggerResult timerResult = runTriggersForTimer(context, timer);
        isAtWatermark = (timerResult != null && timerResult.isFire());
      }

      // Do the actual cleanup
      try {
        doCleanup(context, isAtWatermark);
      } catch (Exception e) {
        Throwables.propagateIfInstanceOf(e, UserCodeException.class);
        throw new RuntimeException(
            "Exception while garbage collecting window " + windowNamespace.getWindow(), e);
      }
    } else {
      if (activeWindows.contains(window) && !triggerRunner.isClosed(context.state())) {
        handleTriggerResult(context, isAtWatermark, runTriggersForTimer(context, timer));
      }

      if (TimeDomain.EVENT_TIME == timer.getDomain()
          // If we processed an on-time firing, we should schedule the GC timer.
          && timer.getTimestamp().isEqual(window.maxTimestamp())) {
        scheduleCleanup(context);
      }
    }
  }