in sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/worker/ShuffleSink.java [94:156]
private void initCoder(Coder<WindowedValue<T>> coder) throws Exception {
switch (shuffleKind) {
case UNGROUPED:
this.shardByKey = false;
this.groupValues = false;
this.sortValues = false;
break;
case PARTITION_KEYS:
this.shardByKey = true;
this.groupValues = false;
this.sortValues = false;
break;
case GROUP_KEYS:
this.shardByKey = true;
this.groupValues = true;
this.sortValues = false;
break;
case GROUP_KEYS_AND_SORT_VALUES:
this.shardByKey = true;
this.groupValues = true;
this.sortValues = true;
break;
default:
throw new AssertionError("unexpected shuffle kind");
}
this.windowedElemCoder = (WindowedValueCoder<T>) coder;
this.elemCoder = windowedElemCoder.getValueCoder();
if (shardByKey) {
if (!(elemCoder instanceof KvCoder)) {
throw new Exception("unexpected kind of coder for elements written to "
+ "a key-grouping shuffle");
}
KvCoder<?, ?> kvCoder = (KvCoder<?, ?>) elemCoder;
this.keyCoder = kvCoder.getKeyCoder();
this.valueCoder = kvCoder.getValueCoder();
if (sortValues) {
// TODO: Decide the representation of sort-keyed values.
// For now, we'll just use KVs.
if (!(valueCoder instanceof KvCoder)) {
throw new Exception("unexpected kind of coder for values written to "
+ "a value-sorting shuffle");
}
KvCoder<?, ?> kvValueCoder = (KvCoder<?, ?>) valueCoder;
this.sortKeyCoder = kvValueCoder.getKeyCoder();
this.sortValueCoder = kvValueCoder.getValueCoder();
} else {
this.sortKeyCoder = null;
this.sortValueCoder = null;
}
if (groupValues) {
this.windowedValueCoder = null;
} else {
this.windowedValueCoder = this.windowedElemCoder.withValueCoder(this.valueCoder);
}
} else {
this.keyCoder = null;
this.valueCoder = null;
this.sortKeyCoder = null;
this.sortValueCoder = null;
this.windowedValueCoder = null;
}
}