in sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/worker/ShuffleSink.java [198:268]
public long add(WindowedValue<T> windowedElem) throws IOException {
byte[] keyBytes;
byte[] secondaryKeyBytes;
byte[] valueBytes;
T elem = windowedElem.getValue();
if (shardByKey) {
if (!(elem instanceof KV)) {
throw new AssertionError("expecting the values written to a key-grouping shuffle "
+ "to be KVs");
}
KV<?, ?> kv = (KV) elem;
Object key = kv.getKey();
Object value = kv.getValue();
keyBytes = CoderUtils.encodeToByteArray(keyCoder, key);
if (sortValues) {
if (!(value instanceof KV)) {
throw new AssertionError("expecting the value parts of the KVs written to "
+ "a value-sorting shuffle to also be KVs");
}
KV<?, ?> kvValue = (KV) value;
Object sortKey = kvValue.getKey();
Object sortValue = kvValue.getValue();
// TODO: Need to coordinate with the
// GroupingShuffleReader, to make sure it knows how to
// reconstruct the value from the sortKeyBytes and
// sortValueBytes. Right now, it doesn't know between
// sorting and non-sorting GBKs.
secondaryKeyBytes = CoderUtils.encodeToByteArray(sortKeyCoder, sortKey);
valueBytes = CoderUtils.encodeToByteArray(sortValueCoder, sortValue);
} else if (groupValues) {
// Sort values by timestamp so that GroupAlsoByWindows can run efficiently.
if (windowedElem.getTimestamp().equals(BoundedWindow.TIMESTAMP_MIN_VALUE)) {
// Empty secondary keys sort before all other secondary keys, so we
// can omit this common value here for efficiency.
secondaryKeyBytes = null;
} else {
secondaryKeyBytes =
CoderUtils.encodeToByteArray(InstantCoder.of(), windowedElem.getTimestamp());
}
valueBytes = CoderUtils.encodeToByteArray(valueCoder, value);
} else {
secondaryKeyBytes = null;
valueBytes = CoderUtils.encodeToByteArray(
windowedValueCoder,
windowedElem.withValue(value));
}
} else {
// Not partitioning or grouping by key, just resharding values.
// <key> is ignored, except by the shuffle splitter. Use a seq#
// as the key, so we can split records anywhere. This also works
// for writing a single-sharded ordered PCollection through a
// shuffle, since the order of elements in the input will be
// preserved in the output.
keyBytes = CoderUtils.encodeToByteArray(BigEndianLongCoder.of(), seqNum++);
secondaryKeyBytes = null;
valueBytes = CoderUtils.encodeToByteArray(windowedElemCoder, windowedElem);
}
ShuffleEntry entry = new ShuffleEntry(keyBytes, secondaryKeyBytes, valueBytes);
writer.put(entry);
long bytes = entry.length();
perWorkerPerDatasetBytesCounter.addValue(bytes);
perDatasetBytesCounter.addValue(bytes);
return bytes;
}