in sdks/go/pkg/beam/coder.go [163:256]
func inferCoder(t FullType) (*coder.Coder, error) {
switch t.Class() {
case typex.Concrete, typex.Container:
switch t.Type() {
case reflectx.Int64:
// use the beam varint coder.
return &coder.Coder{Kind: coder.VarInt, T: t}, nil
case reflectx.Int, reflectx.Int8, reflectx.Int16, reflectx.Int32:
c, err := coderx.NewVarIntZ(t.Type())
if err != nil {
return nil, err
}
return coder.CoderFrom(c), nil
case reflectx.Uint, reflectx.Uint8, reflectx.Uint16, reflectx.Uint32, reflectx.Uint64:
c, err := coderx.NewVarUintZ(t.Type())
if err != nil {
return nil, err
}
return coder.CoderFrom(c), nil
case reflectx.Float32:
c, err := coderx.NewFloat(t.Type())
if err != nil {
return nil, err
}
return coder.CoderFrom(c), nil
case reflectx.Float64:
return &coder.Coder{Kind: coder.Double, T: t}, nil
case reflectx.String:
return &coder.Coder{Kind: coder.String, T: t}, nil
case reflectx.ByteSlice:
return &coder.Coder{Kind: coder.Bytes, T: t}, nil
case reflectx.Bool:
return &coder.Coder{Kind: coder.Bool, T: t}, nil
default:
et := t.Type()
if c := coder.LookupCustomCoder(et); c != nil {
return coder.CoderFrom(c), nil
}
if EnableSchemas {
switch et.Kind() {
case reflect.Ptr:
if et.Elem().Kind() != reflect.Struct {
break
}
fallthrough
case reflect.Struct:
return &coder.Coder{Kind: coder.Row, T: t}, nil
}
}
// Interface types that implement JSON marshalling can be handled by the default coder.
// otherwise, inference needs to fail here.
if et.Kind() == reflect.Interface && !et.Implements(jsonCoderType) {
return nil, errors.Errorf("inferCoder failed: interface type %v has no coder registered", et)
}
c, err := newJSONCoder(et)
if err != nil {
return nil, err
}
return &coder.Coder{Kind: coder.Custom, T: t, Custom: c}, nil
}
case typex.Composite:
c, err := inferCoders(t.Components())
if err != nil {
return nil, err
}
switch t.Type() {
case typex.KVType:
return &coder.Coder{Kind: coder.KV, T: t, Components: c}, nil
case typex.CoGBKType:
return &coder.Coder{Kind: coder.CoGBK, T: t, Components: c}, nil
case typex.WindowedValueType:
// TODO(herohde) 4/15/2018: do we ever infer W types now that PCollections
// are non-windowed? We either need to know the windowing strategy or
// we should remove this case.
return &coder.Coder{Kind: coder.WindowedValue, T: t, Components: c, Window: coder.NewGlobalWindow()}, nil
default:
panic(fmt.Sprintf("Unexpected composite type: %v", t))
}
default:
panic(fmt.Sprintf("Unexpected type: %v", t))
}
}