in sdks/go/pkg/beam/core/runtime/graphx/serialize.go [377:503]
func encodeType(t reflect.Type) (*v1pb.Type, error) {
if s, ok := tryEncodeSpecial(t); ok {
return &v1pb.Type{Kind: v1pb.Type_SPECIAL, Special: s}, nil
}
if k, ok := runtime.TypeKey(t); ok {
if _, present := runtime.LookupType(k); present {
// External type. Serialize by key and lookup in registry
// on decoding side.
return &v1pb.Type{Kind: v1pb.Type_EXTERNAL, ExternalKey: k}, nil
}
}
// The supplied type isn't special, so apply the standard encodings.
switch t.Kind() {
case reflect.Bool:
return &v1pb.Type{Kind: v1pb.Type_BOOL}, nil
case reflect.Int:
return &v1pb.Type{Kind: v1pb.Type_INT}, nil
case reflect.Int8:
return &v1pb.Type{Kind: v1pb.Type_INT8}, nil
case reflect.Int16:
return &v1pb.Type{Kind: v1pb.Type_INT16}, nil
case reflect.Int32:
return &v1pb.Type{Kind: v1pb.Type_INT32}, nil
case reflect.Int64:
return &v1pb.Type{Kind: v1pb.Type_INT64}, nil
case reflect.Uint:
return &v1pb.Type{Kind: v1pb.Type_UINT}, nil
case reflect.Uint8:
return &v1pb.Type{Kind: v1pb.Type_UINT8}, nil
case reflect.Uint16:
return &v1pb.Type{Kind: v1pb.Type_UINT16}, nil
case reflect.Uint32:
return &v1pb.Type{Kind: v1pb.Type_UINT32}, nil
case reflect.Uint64:
return &v1pb.Type{Kind: v1pb.Type_UINT64}, nil
case reflect.Float32:
return &v1pb.Type{Kind: v1pb.Type_FLOAT32}, nil
case reflect.Float64:
return &v1pb.Type{Kind: v1pb.Type_FLOAT64}, nil
case reflect.String:
return &v1pb.Type{Kind: v1pb.Type_STRING}, nil
case reflect.Slice:
elm, err := encodeType(t.Elem())
if err != nil {
wrapped := errors.Wrap(err, "bad element type")
return nil, errors.WithContextf(wrapped, "encoding slice %v", t)
}
return &v1pb.Type{Kind: v1pb.Type_SLICE, Element: elm}, nil
case reflect.Struct:
var fields []*v1pb.Type_StructField
for i := 0; i < t.NumField(); i++ {
f := t.Field(i)
if f.PkgPath != "" {
wrapped := errors.Errorf("type has unexported field: %v", f.Name)
return nil, errors.WithContextf(wrapped, "encoding struct %v", t)
}
fType, err := encodeType(f.Type)
if err != nil {
wrapped := errors.Wrap(err, "bad field type")
return nil, errors.WithContextf(wrapped, "encoding struct %v", t)
}
field := &v1pb.Type_StructField{
Name: f.Name,
PkgPath: f.PkgPath,
Type: fType,
Tag: string(f.Tag),
Offset: int64(f.Offset),
Index: encodeInts(f.Index),
Anonymous: f.Anonymous,
}
fields = append(fields, field)
}
return &v1pb.Type{Kind: v1pb.Type_STRUCT, Fields: fields}, nil
case reflect.Func:
var in []*v1pb.Type
for i := 0; i < t.NumIn(); i++ {
param, err := encodeType(t.In(i))
if err != nil {
wrapped := errors.Wrap(err, "bad parameter type")
return nil, errors.WithContextf(wrapped, "encoding function %v", t)
}
in = append(in, param)
}
var out []*v1pb.Type
for i := 0; i < t.NumOut(); i++ {
ret, err := encodeType(t.Out(i))
if err != nil {
wrapped := errors.Wrap(err, "bad return type")
return nil, errors.WithContextf(wrapped, "encoding function %v", t)
}
out = append(out, ret)
}
return &v1pb.Type{Kind: v1pb.Type_FUNC, ParameterTypes: in, ReturnTypes: out, IsVariadic: t.IsVariadic()}, nil
case reflect.Chan:
elm, err := encodeType(t.Elem())
if err != nil {
wrapped := errors.Wrap(err, "bad element type")
return nil, errors.WithContextf(wrapped, "encoding channel %v", t)
}
dir, err := encodeChanDir(t.ChanDir())
if err != nil {
wrapped := errors.Wrap(err, "bad channel direction")
return nil, errors.WithContextf(wrapped, "encoding channel %v", t)
}
return &v1pb.Type{Kind: v1pb.Type_CHAN, Element: elm, ChanDir: dir}, nil
case reflect.Ptr:
elm, err := encodeType(t.Elem())
if err != nil {
wrapped := errors.Wrap(err, "bad base type")
return nil, errors.WithContextf(wrapped, "encoding pointer %v", t)
}
return &v1pb.Type{Kind: v1pb.Type_PTR, Element: elm}, nil
default:
return nil, errors.Errorf("unencodable type %v", t)
}
}