public long add()

in sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/worker/ShuffleSink.java [198:268]


    public long add(WindowedValue<T> windowedElem) throws IOException {
      byte[] keyBytes;
      byte[] secondaryKeyBytes;
      byte[] valueBytes;
      T elem = windowedElem.getValue();
      if (shardByKey) {
        if (!(elem instanceof KV)) {
          throw new AssertionError("expecting the values written to a key-grouping shuffle "
              + "to be KVs");
        }
        KV<?, ?> kv = (KV) elem;
        Object key = kv.getKey();
        Object value = kv.getValue();

        keyBytes = CoderUtils.encodeToByteArray(keyCoder, key);

        if (sortValues) {
          if (!(value instanceof KV)) {
            throw new AssertionError("expecting the value parts of the KVs written to "
                + "a value-sorting shuffle to also be KVs");
          }
          KV<?, ?> kvValue = (KV) value;
          Object sortKey = kvValue.getKey();
          Object sortValue = kvValue.getValue();

          // TODO: Need to coordinate with the
          // GroupingShuffleReader, to make sure it knows how to
          // reconstruct the value from the sortKeyBytes and
          // sortValueBytes.  Right now, it doesn't know between
          // sorting and non-sorting GBKs.
          secondaryKeyBytes = CoderUtils.encodeToByteArray(sortKeyCoder, sortKey);
          valueBytes = CoderUtils.encodeToByteArray(sortValueCoder, sortValue);

        } else if (groupValues) {
          // Sort values by timestamp so that GroupAlsoByWindows can run efficiently.
          if (windowedElem.getTimestamp().equals(BoundedWindow.TIMESTAMP_MIN_VALUE)) {
            // Empty secondary keys sort before all other secondary keys, so we
            // can omit this common value here for efficiency.
            secondaryKeyBytes = null;
          } else {
            secondaryKeyBytes =
                CoderUtils.encodeToByteArray(InstantCoder.of(), windowedElem.getTimestamp());
          }
          valueBytes = CoderUtils.encodeToByteArray(valueCoder, value);
        } else {
          secondaryKeyBytes = null;
          valueBytes = CoderUtils.encodeToByteArray(
              windowedValueCoder,
              windowedElem.withValue(value));
        }

      } else {
        // Not partitioning or grouping by key, just resharding values.
        // <key> is ignored, except by the shuffle splitter.  Use a seq#
        // as the key, so we can split records anywhere.  This also works
        // for writing a single-sharded ordered PCollection through a
        // shuffle, since the order of elements in the input will be
        // preserved in the output.
        keyBytes = CoderUtils.encodeToByteArray(BigEndianLongCoder.of(), seqNum++);

        secondaryKeyBytes = null;
        valueBytes = CoderUtils.encodeToByteArray(windowedElemCoder, windowedElem);
      }

      ShuffleEntry entry = new ShuffleEntry(keyBytes, secondaryKeyBytes, valueBytes);
      writer.put(entry);
      long bytes = entry.length();
      perWorkerPerDatasetBytesCounter.addValue(bytes);
      perDatasetBytesCounter.addValue(bytes);
      return bytes;
    }