private void initCoder()

in sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/worker/ShuffleSink.java [94:156]


  private void initCoder(Coder<WindowedValue<T>> coder) throws Exception {
    switch (shuffleKind) {
      case UNGROUPED:
        this.shardByKey = false;
        this.groupValues = false;
        this.sortValues = false;
        break;
      case PARTITION_KEYS:
        this.shardByKey = true;
        this.groupValues = false;
        this.sortValues = false;
        break;
      case GROUP_KEYS:
        this.shardByKey = true;
        this.groupValues = true;
        this.sortValues = false;
        break;
      case GROUP_KEYS_AND_SORT_VALUES:
        this.shardByKey = true;
        this.groupValues = true;
        this.sortValues = true;
        break;
      default:
        throw new AssertionError("unexpected shuffle kind");
    }

    this.windowedElemCoder = (WindowedValueCoder<T>) coder;
    this.elemCoder = windowedElemCoder.getValueCoder();
    if (shardByKey) {
      if (!(elemCoder instanceof KvCoder)) {
        throw new Exception("unexpected kind of coder for elements written to "
            + "a key-grouping shuffle");
      }
      KvCoder<?, ?> kvCoder = (KvCoder<?, ?>) elemCoder;
      this.keyCoder = kvCoder.getKeyCoder();
      this.valueCoder = kvCoder.getValueCoder();
      if (sortValues) {
        // TODO: Decide the representation of sort-keyed values.
        // For now, we'll just use KVs.
        if (!(valueCoder instanceof KvCoder)) {
          throw new Exception("unexpected kind of coder for values written to "
              + "a value-sorting shuffle");
        }
        KvCoder<?, ?> kvValueCoder = (KvCoder<?, ?>) valueCoder;
        this.sortKeyCoder = kvValueCoder.getKeyCoder();
        this.sortValueCoder = kvValueCoder.getValueCoder();
      } else {
        this.sortKeyCoder = null;
        this.sortValueCoder = null;
      }
      if (groupValues) {
        this.windowedValueCoder = null;
      } else {
        this.windowedValueCoder = this.windowedElemCoder.withValueCoder(this.valueCoder);
      }
    } else {
      this.keyCoder = null;
      this.valueCoder = null;
      this.sortKeyCoder = null;
      this.sortValueCoder = null;
      this.windowedValueCoder = null;
    }
  }