public void processElement()

in sdk/src/main/java/com/google/cloud/dataflow/sdk/util/GroupAlsoByWindowsViaIteratorsDoFn.java [80:133]


  public void processElement(ProcessContext c) throws Exception {
    K key = c.element().getKey();
    Iterable<WindowedValue<V>> value = c.element().getValue();
    PeekingReiterator<WindowedValue<V>> iterator;

    if (value instanceof Collection) {
      iterator = new PeekingReiterator<>(new ListReiterator<WindowedValue<V>>(
          new ArrayList<WindowedValue<V>>((Collection<WindowedValue<V>>) value), 0));
    } else if (value instanceof Reiterable) {
      iterator = new PeekingReiterator<>(((Reiterable<WindowedValue<V>>) value).iterator());
    } else {
      throw new IllegalArgumentException(
          "Input to GroupAlsoByWindowsDoFn must be a Collection or Reiterable");
    }

    // This ListMultimap is a map of window maxTimestamps to the list of active
    // windows with that maxTimestamp.
    ListMultimap<Instant, BoundedWindow> windows = ArrayListMultimap.create();

    while (iterator.hasNext()) {
      WindowedValue<V> e = iterator.peek();
      for (BoundedWindow window : e.getWindows()) {
        // If this window is not already in the active set, emit a new WindowReiterable
        // corresponding to this window, starting at this element in the input Reiterable.
        if (!windows.containsEntry(window.maxTimestamp(), window)) {
          // This window was produced by strategy.getWindowFn()
          @SuppressWarnings("unchecked")
          W typedWindow = (W) window;
          // Iterating through the WindowReiterable may advance iterator as an optimization
          // for as long as it detects that there are no new windows.
          windows.put(window.maxTimestamp(), window);
          c.windowingInternals().outputWindowedValue(
              KV.of(key, (Iterable<V>) new WindowReiterable<V>(iterator, window)),
              strategy.getWindowFn().getOutputTime(e.getTimestamp(), typedWindow),
              Arrays.asList(window),
              PaneInfo.ON_TIME_AND_ONLY_FIRING);
        }
      }
      // Copy the iterator in case the next DoFn cached its version of the iterator instead
      // of immediately iterating through it.
      // And, only advance the iterator if the consuming operation hasn't done so.
      iterator = iterator.copy();
      if (iterator.hasNext() && iterator.peek() == e) {
        iterator.next();
      }

      // Remove all windows with maxTimestamp behind the current timestamp.
      Iterator<Instant> windowIterator = windows.keys().iterator();
      while (windowIterator.hasNext()
          && windowIterator.next().isBefore(e.getTimestamp())) {
        windowIterator.remove();
      }
    }
  }