private void processElement()

in sdk/src/main/java/com/google/cloud/dataflow/sdk/util/ReduceFnRunner.java [283:349]


  private void processElement(Function<W, W> windowMapping, Map<W, TriggerResult> results,
      WindowedValue<InputT> value) {
    Lateness lateness = getLateness(value.getTimestamp());
    if (lateness.isPastAllowedLateness) {
      // Drop the element in all assigned windows if it is past the allowed lateness limit.
      droppedDueToLateness.addValue((long) value.getWindows().size());
      return;
    }

    @SuppressWarnings("unchecked")
    Iterable<W> windows =
        FluentIterable.from((Collection<W>) value.getWindows()).transform(windowMapping);

    // Prefetch in each of the windows if we're going to need to process triggers
    for (W window : windows) {
      if (!results.containsKey(window)) {
        ReduceFn<K, InputT, OutputT, W>.ProcessValueContext context =
            contextFactory.forValue(window, value.getValue(), value.getTimestamp());
        triggerRunner.prefetchForValue(context.state());
      }
    }

    // And process each of the windows
    for (W window : windows) {
      ReduceFn<K, InputT, OutputT, W>.ProcessValueContext context =
          contextFactory.forValue(window, value.getValue(), value.getTimestamp());

      // Check to see if the triggerRunner thinks the window is closed. If so, drop that window.
      if (!results.containsKey(window) && triggerRunner.isClosed(context.state())) {
          droppedDueToClosedWindow.addValue(1L);
          continue;
      }

      nonEmptyPanes.recordContent(context);

      // Make sure we've scheduled the cleanup timer for this window, if the premerge didn't already
      // do that.
      if (windowingStrategy.getWindowFn().isNonMerging()) {
        // Since non-merging window functions don't track the active window set, we always schedule
        // cleanup.
        scheduleCleanup(context);
      }

      // Update the watermark hold since the value will be part of the next pane.
      watermarkHold.addHold(context, lateness.isLate);

      // Execute the reduceFn, which will buffer the value as appropriate
      try {
        reduceFn.processValue(context);
      } catch (Exception e) {
        throw wrapMaybeUserException(e);
      }

      // Run the trigger and handle the result as appropriate
      if (!results.containsKey(window)) {
        try {
          TriggerResult result = triggerRunner.processValue(context);
          if (result.isFire()) {
            results.put(window, result);
          }
        } catch (Exception e) {
          Throwables.propagateIfPossible(e);
          throw new RuntimeException("Failed to run trigger", e);
        }
      }
    }
  }