in sdks/go/pkg/beam/core/runtime/graphx/coder.go [186:393]
func (b *CoderUnmarshaller) makeCoder(id string, c *pipepb.Coder) (*coder.Coder, error) {
urn := c.GetSpec().GetUrn()
components := c.GetComponentCoderIds()
switch urn {
case urnBytesCoder:
return coder.NewBytes(), nil
case urnBoolCoder:
return coder.NewBool(), nil
case urnVarIntCoder:
return coder.NewVarInt(), nil
case urnDoubleCoder:
return coder.NewDouble(), nil
case urnStringCoder:
return coder.NewString(), nil
case urnKVCoder:
if len(components) != 2 {
return nil, errors.Errorf("could not unmarshal KV coder from %v, want exactly 2 components but have %d", c, len(components))
}
key, err := b.Coder(components[0])
if err != nil {
return nil, err
}
id := components[1]
kind := coder.KV
root := typex.KVType
elm, err := b.peek(id)
if err != nil {
return nil, err
}
switch elm.GetSpec().GetUrn() {
case urnIterableCoder, urnStateBackedIterableCoder:
id = elm.GetComponentCoderIds()[0]
kind = coder.CoGBK
root = typex.CoGBKType
// TODO(BEAM-490): If CoGBK with > 1 input, handle as special GBK. We expect
// it to be encoded as CoGBK<K,LP<CoGBKList<V,W,..>>>. Remove this handling once
// CoGBK has a first-class representation.
if ids, ok := b.isCoGBKList(id); ok {
// CoGBK<K,V,W,..>
values, err := b.Coders(ids)
if err != nil {
return nil, err
}
t := typex.New(root, append([]typex.FullType{key.T}, coder.Types(values)...)...)
return &coder.Coder{Kind: kind, T: t, Components: append([]*coder.Coder{key}, values...)}, nil
}
}
value, err := b.Coder(id)
if err != nil {
return nil, err
}
t := typex.New(root, key.T, value.T)
return &coder.Coder{Kind: kind, T: t, Components: []*coder.Coder{key, value}}, nil
case urnLengthPrefixCoder:
if len(components) != 1 {
return nil, errors.Errorf("could not unmarshal length prefix coder from %v, want a single sub component but have %d", c, len(components))
}
sub, err := b.peek(components[0])
if err != nil {
return nil, err
}
// No payload means this coder was length prefixed by the runner
// but is likely self describing - AKA a beam coder.
if len(sub.GetSpec().GetPayload()) == 0 {
return b.makeCoder(components[0], sub)
}
// TODO(lostluck) 2018/10/17: Make this strict again, once dataflow can use
// the portable pipeline model directly (BEAM-2885)
switch u := sub.GetSpec().GetUrn(); u {
case "", urnCustomCoder:
var ref v1pb.CustomCoder
if err := protox.DecodeBase64(string(sub.GetSpec().GetPayload()), &ref); err != nil {
return nil, err
}
custom, err := decodeCustomCoder(&ref)
if err != nil {
return nil, err
}
custom.ID = components[0]
t := typex.New(custom.Type)
cc := &coder.Coder{Kind: coder.Custom, T: t, Custom: custom}
return cc, nil
case urnBytesCoder, urnStringCoder: // implicitly length prefixed types.
return b.makeCoder(components[0], sub)
default:
// Handle Length prefixing dictated by the runner.
cc, err := b.makeCoder(components[0], sub)
if err != nil {
return nil, err
}
return &coder.Coder{Kind: coder.LP, T: cc.T, Components: []*coder.Coder{cc}}, nil
}
case urnWindowedValueCoder, urnParamWindowedValueCoder:
if len(components) != 2 {
return nil, errors.Errorf("could not unmarshal windowed value coder from %v, expected two components but got %d", c, len(components))
}
elm, err := b.Coder(components[0])
if err != nil {
return nil, err
}
w, err := b.WindowCoder(components[1])
if err != nil {
return nil, err
}
t := typex.New(typex.WindowedValueType, elm.T)
wvc := &coder.Coder{Kind: coder.WindowedValue, T: t, Components: []*coder.Coder{elm}, Window: w}
if urn == urnWindowedValueCoder {
return wvc, nil
}
wvc.Kind = coder.ParamWindowedValue
wvc.Window.Payload = string(c.GetSpec().GetPayload())
return wvc, nil
case streamType:
return nil, errors.Errorf("could not unmarshal stream type coder from %v, stream must be pair value", c)
case "":
// TODO(herohde) 11/27/2017: we still see CoderRefs from Dataflow. Handle that
// case here, for now, so that the harness can use this logic.
payload := c.GetSpec().GetPayload()
var ref CoderRef
if err := json.Unmarshal(payload, &ref); err != nil {
return nil, errors.Wrapf(err, "could not unmarshal CoderRef from %v, failed to decode urn-less coder's payload \"%v\"", c, string(payload))
}
c, err := DecodeCoderRef(&ref)
if err != nil {
return nil, errors.Wrapf(err, "could not unmarshal CoderRef from %v, failed to decode CoderRef \"%v\"", c, string(payload))
}
return c, nil
case urnIterableCoder:
if len(components) != 1 {
return nil, errors.Errorf("could not unmarshal iterable coder from %v, expected one component but got %d", c, len(components))
}
elm, err := b.Coder(components[0])
if err != nil {
return nil, err
}
return coder.NewI(elm), nil
case urnTimerCoder:
if len(components) != 2 {
return nil, errors.Errorf("could not unmarshal timer coder from %v, expected two component but got %d", c, len(components))
}
elm, err := b.Coder(components[0])
if err != nil {
return nil, err
}
w, err := b.WindowCoder(components[1])
if err != nil {
return nil, err
}
return coder.NewT(elm, w), nil
case urnRowCoder:
var s pipepb.Schema
if err := proto.Unmarshal(c.GetSpec().GetPayload(), &s); err != nil {
return nil, err
}
t, err := schema.ToType(&s)
if err != nil {
return nil, err
}
return coder.NewR(typex.New(t)), nil
// Special handling for window coders so they can be treated as
// a general coder. Generally window coders are not used outside of
// specific contexts, but this enables improved testing.
// Window types are not permitted to be fulltypes, so
// we use assignably equivalent anonymous struct types.
case urnIntervalWindow:
w, err := b.WindowCoder(id)
if err != nil {
return nil, err
}
return &coder.Coder{Kind: coder.Window, T: typex.New(reflect.TypeOf((*struct{ Start, End int64 })(nil)).Elem()), Window: w}, nil
case urnGlobalWindow:
w, err := b.WindowCoder(id)
if err != nil {
return nil, err
}
return &coder.Coder{Kind: coder.Window, T: typex.New(reflect.TypeOf((*struct{})(nil)).Elem()), Window: w}, nil
default:
return nil, errors.Errorf("could not unmarshal coder from %v, unknown URN %v", c, urn)
}
}