in sdks/go/pkg/beam/core/runtime/graphx/dataflow.go [89:191]
func EncodeCoderRef(c *coder.Coder) (*CoderRef, error) {
switch c.Kind {
case coder.Custom:
ref, err := encodeCustomCoder(c.Custom)
if err != nil {
return nil, err
}
data, err := protox.EncodeBase64(ref)
if err != nil {
return nil, err
}
return &CoderRef{
Type: lengthPrefixType,
Components: []*CoderRef{{Type: data, PipelineProtoCoderID: c.Custom.ID}},
}, nil
case coder.KV:
if len(c.Components) != 2 {
return nil, errors.Errorf("bad KV: %v", c)
}
key, err := EncodeCoderRef(c.Components[0])
if err != nil {
return nil, err
}
value, err := EncodeCoderRef(c.Components[1])
if err != nil {
return nil, err
}
return &CoderRef{Type: pairType, Components: []*CoderRef{key, value}, IsPairLike: true}, nil
case coder.CoGBK:
if len(c.Components) < 2 {
return nil, errors.Errorf("bad CoGBK: %v", c)
}
refs, err := EncodeCoderRefs(c.Components)
if err != nil {
return nil, err
}
value := refs[1]
if len(c.Components) > 2 {
// TODO(BEAM-490): don't inject union coder for CoGBK.
union := &CoderRef{Type: cogbklistType, Components: refs[1:]}
value = &CoderRef{Type: lengthPrefixType, Components: []*CoderRef{union}}
}
stream := &CoderRef{Type: streamType, Components: []*CoderRef{value}, IsStreamLike: true}
return &CoderRef{Type: pairType, Components: []*CoderRef{refs[0], stream}, IsPairLike: true}, nil
case coder.WindowedValue:
if len(c.Components) != 1 || c.Window == nil {
return nil, errors.Errorf("bad windowed value: %v", c)
}
elm, err := EncodeCoderRef(c.Components[0])
if err != nil {
return nil, err
}
w, err := encodeWindowCoder(c.Window)
if err != nil {
return nil, err
}
return &CoderRef{Type: windowedValueType, Components: []*CoderRef{elm, w}, IsWrapper: true}, nil
case coder.Bytes:
return &CoderRef{Type: bytesType}, nil
case coder.Bool:
return &CoderRef{Type: boolType}, nil
case coder.VarInt:
return &CoderRef{Type: varIntType}, nil
case coder.Double:
return &CoderRef{Type: doubleType}, nil
case coder.String:
return &CoderRef{
Type: lengthPrefixType,
Components: []*CoderRef{{Type: stringType, PipelineProtoCoderID: c.ID}},
}, nil
case coder.Row:
schm, err := schema.FromType(c.T.Type())
if err != nil {
return nil, err
}
data, err := protox.EncodeBase64(schm)
if err != nil {
return nil, err
}
return &CoderRef{
Type: rowType,
Components: []*CoderRef{{Type: data}},
}, nil
default:
return nil, errors.Errorf("bad coder kind: %v", c.Kind)
}
}