public void processElement()

in sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/GroupByKey.java [353:451]


            public void processElement(ProcessContext c) {
              KV<K, Iterable<WindowedValue<V>>> kvs = c.element();
              K key = kvs.getKey();
              Iterable<WindowedValue<V>> unsortedValues = kvs.getValue();
              List<WindowedValue<V>> sortedValues = new ArrayList<>();
              for (WindowedValue<V> value : unsortedValues) {
                sortedValues.add(value);
              }
              Collections.sort(sortedValues,
                               new Comparator<WindowedValue<V>>() {
                  @Override
                  public int compare(WindowedValue<V> e1, WindowedValue<V> e2) {
                    return e1.getTimestamp().compareTo(e2.getTimestamp());
                  }
                });
              c.output(KV.<K, Iterable<WindowedValue<V>>>of(key, sortedValues));
            }}))
          .setCoder(input.getCoder());
    }
  }


  /////////////////////////////////////////////////////////////////////////////

  /**
   * Helper transform that takes a collection of timestamp-ordered
   * values associated with each key, groups the values by window,
   * combines windows as needed, and for each window in each key,
   * outputs a collection of key/value-list pairs implicitly assigned
   * to the window and with the timestamp derived from that window.
   */
  public static class GroupAlsoByWindow<K, V>
      extends PTransform<PCollection<KV<K, Iterable<WindowedValue<V>>>>,
                         PCollection<KV<K, Iterable<V>>>> {
    private final WindowingStrategy<?, ?> windowingStrategy;

    public GroupAlsoByWindow(WindowingStrategy<?, ?> windowingStrategy) {
      this.windowingStrategy = windowingStrategy;
    }

    @Override
    @SuppressWarnings("unchecked")
    public PCollection<KV<K, Iterable<V>>> apply(
        PCollection<KV<K, Iterable<WindowedValue<V>>>> input) {
      @SuppressWarnings("unchecked")
      KvCoder<K, Iterable<WindowedValue<V>>> inputKvCoder =
          (KvCoder<K, Iterable<WindowedValue<V>>>) input.getCoder();

      Coder<K> keyCoder = inputKvCoder.getKeyCoder();
      Coder<Iterable<WindowedValue<V>>> inputValueCoder =
          inputKvCoder.getValueCoder();

      IterableCoder<WindowedValue<V>> inputIterableValueCoder =
          (IterableCoder<WindowedValue<V>>) inputValueCoder;
      Coder<WindowedValue<V>> inputIterableElementCoder =
          inputIterableValueCoder.getElemCoder();
      WindowedValueCoder<V> inputIterableWindowedValueCoder =
          (WindowedValueCoder<V>) inputIterableElementCoder;

      Coder<V> inputIterableElementValueCoder =
          inputIterableWindowedValueCoder.getValueCoder();
      Coder<Iterable<V>> outputValueCoder =
          IterableCoder.of(inputIterableElementValueCoder);
      Coder<KV<K, Iterable<V>>> outputKvCoder =
          KvCoder.of(keyCoder, outputValueCoder);

      GroupAlsoByWindowsDoFn<K, V, Iterable<V>, ?> fn =
          GroupAlsoByWindowsDoFn.createForIterable(
              windowingStrategy, inputIterableElementValueCoder);

      return input.apply(ParDo.of(fn)).setCoder(outputKvCoder);
    }
  }


  /////////////////////////////////////////////////////////////////////////////

  /**
   * Primitive helper transform that groups by key only, ignoring any
   * window assignments.
   */
  public static class GroupByKeyOnly<K, V>
      extends PTransform<PCollection<KV<K, V>>,
                         PCollection<KV<K, Iterable<V>>>> {

    @SuppressWarnings({"rawtypes", "unchecked"})
    @Override
    public PCollection<KV<K, Iterable<V>>> apply(PCollection<KV<K, V>> input) {
      return PCollection.<KV<K, Iterable<V>>>createPrimitiveOutputInternal(
          input.getPipeline(), input.getWindowingStrategy(), input.isBounded());
    }

    /**
     * Returns the {@code Coder} of the input to this transform, which
     * should be a {@code KvCoder}.
     */
    @SuppressWarnings("unchecked")
    KvCoder<K, V> getInputKvCoder(Coder<KV<K, V>> inputCoder) {
      if (!(inputCoder instanceof KvCoder)) {