in sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/GroupByKey.java [353:451]
public void processElement(ProcessContext c) {
KV<K, Iterable<WindowedValue<V>>> kvs = c.element();
K key = kvs.getKey();
Iterable<WindowedValue<V>> unsortedValues = kvs.getValue();
List<WindowedValue<V>> sortedValues = new ArrayList<>();
for (WindowedValue<V> value : unsortedValues) {
sortedValues.add(value);
}
Collections.sort(sortedValues,
new Comparator<WindowedValue<V>>() {
@Override
public int compare(WindowedValue<V> e1, WindowedValue<V> e2) {
return e1.getTimestamp().compareTo(e2.getTimestamp());
}
});
c.output(KV.<K, Iterable<WindowedValue<V>>>of(key, sortedValues));
}}))
.setCoder(input.getCoder());
}
}
/////////////////////////////////////////////////////////////////////////////
/**
* Helper transform that takes a collection of timestamp-ordered
* values associated with each key, groups the values by window,
* combines windows as needed, and for each window in each key,
* outputs a collection of key/value-list pairs implicitly assigned
* to the window and with the timestamp derived from that window.
*/
public static class GroupAlsoByWindow<K, V>
extends PTransform<PCollection<KV<K, Iterable<WindowedValue<V>>>>,
PCollection<KV<K, Iterable<V>>>> {
private final WindowingStrategy<?, ?> windowingStrategy;
public GroupAlsoByWindow(WindowingStrategy<?, ?> windowingStrategy) {
this.windowingStrategy = windowingStrategy;
}
@Override
@SuppressWarnings("unchecked")
public PCollection<KV<K, Iterable<V>>> apply(
PCollection<KV<K, Iterable<WindowedValue<V>>>> input) {
@SuppressWarnings("unchecked")
KvCoder<K, Iterable<WindowedValue<V>>> inputKvCoder =
(KvCoder<K, Iterable<WindowedValue<V>>>) input.getCoder();
Coder<K> keyCoder = inputKvCoder.getKeyCoder();
Coder<Iterable<WindowedValue<V>>> inputValueCoder =
inputKvCoder.getValueCoder();
IterableCoder<WindowedValue<V>> inputIterableValueCoder =
(IterableCoder<WindowedValue<V>>) inputValueCoder;
Coder<WindowedValue<V>> inputIterableElementCoder =
inputIterableValueCoder.getElemCoder();
WindowedValueCoder<V> inputIterableWindowedValueCoder =
(WindowedValueCoder<V>) inputIterableElementCoder;
Coder<V> inputIterableElementValueCoder =
inputIterableWindowedValueCoder.getValueCoder();
Coder<Iterable<V>> outputValueCoder =
IterableCoder.of(inputIterableElementValueCoder);
Coder<KV<K, Iterable<V>>> outputKvCoder =
KvCoder.of(keyCoder, outputValueCoder);
GroupAlsoByWindowsDoFn<K, V, Iterable<V>, ?> fn =
GroupAlsoByWindowsDoFn.createForIterable(
windowingStrategy, inputIterableElementValueCoder);
return input.apply(ParDo.of(fn)).setCoder(outputKvCoder);
}
}
/////////////////////////////////////////////////////////////////////////////
/**
* Primitive helper transform that groups by key only, ignoring any
* window assignments.
*/
public static class GroupByKeyOnly<K, V>
extends PTransform<PCollection<KV<K, V>>,
PCollection<KV<K, Iterable<V>>>> {
@SuppressWarnings({"rawtypes", "unchecked"})
@Override
public PCollection<KV<K, Iterable<V>>> apply(PCollection<KV<K, V>> input) {
return PCollection.<KV<K, Iterable<V>>>createPrimitiveOutputInternal(
input.getPipeline(), input.getWindowingStrategy(), input.isBounded());
}
/**
* Returns the {@code Coder} of the input to this transform, which
* should be a {@code KvCoder}.
*/
@SuppressWarnings("unchecked")
KvCoder<K, V> getInputKvCoder(Coder<KV<K, V>> inputCoder) {
if (!(inputCoder instanceof KvCoder)) {