in sdk/src/main/java/com/google/cloud/dataflow/sdk/util/GroupAlsoByWindowsAndCombineDoFn.java [82:160]
public void processElement(ProcessContext c) throws Exception {
final K key = c.element().getKey();
Iterator<WindowedValue<InputT>> iterator = c.element().getValue().iterator();
final PriorityQueue<W> liveWindows =
new PriorityQueue<>(11, new Comparator<BoundedWindow>() {
@Override
public int compare(BoundedWindow w1, BoundedWindow w2) {
return Long.signum(w1.maxTimestamp().getMillis() - w2.maxTimestamp().getMillis());
}
});
final Map<W, AccumT> accumulators = Maps.newHashMap();
final Map<W, Instant> minTimestamps = Maps.newHashMap();
WindowFn<Object, W>.MergeContext mergeContext = new CombiningMergeContext() {
@Override
public Collection<W> windows() {
return liveWindows;
}
@Override
public void merge(Collection<W> toBeMerged, W mergeResult) throws Exception {
List<AccumT> accumsToBeMerged = new ArrayList<>(toBeMerged.size());
Instant minTimestamp = null;
for (W window : toBeMerged) {
accumsToBeMerged.add(accumulators.remove(window));
Instant timestampToBeMerged = minTimestamps.remove(window);
if (minTimestamp == null
|| (timestampToBeMerged != null && timestampToBeMerged.isBefore(minTimestamp))) {
minTimestamp = timestampToBeMerged;
}
}
liveWindows.removeAll(toBeMerged);
minTimestamps.put(mergeResult, minTimestamp);
liveWindows.add(mergeResult);
accumulators.put(mergeResult, combineFn.mergeAccumulators(key, accumsToBeMerged));
}
};
while (iterator.hasNext()) {
WindowedValue<InputT> e = iterator.next();
@SuppressWarnings("unchecked")
Collection<W> windows = (Collection<W>) e.getWindows();
for (W w : windows) {
Instant timestamp = minTimestamps.get(w);
if (timestamp == null || timestamp.compareTo(e.getTimestamp()) > 0) {
minTimestamps.put(w, e.getTimestamp());
} else {
minTimestamps.put(w, timestamp);
}
AccumT accum = accumulators.get(w);
checkState((timestamp == null && accum == null) || (timestamp != null && accum != null));
if (accum == null) {
accum = combineFn.createAccumulator(key);
liveWindows.add(w);
}
accum = combineFn.addInput(key, accum, e.getValue());
accumulators.put(w, accum);
}
windowFn.mergeWindows(mergeContext);
while (!liveWindows.isEmpty()
&& liveWindows.peek().maxTimestamp().isBefore(e.getTimestamp())) {
closeWindow(key, liveWindows.poll(), accumulators, minTimestamps, c);
}
}
// To have gotten here, we've either not had any elements added, or we've only run merge
// and then closed windows. We don't need to retry merging.
while (!liveWindows.isEmpty()) {
closeWindow(key, liveWindows.poll(), accumulators, minTimestamps, c);
}
}