in sdks/go/pkg/beam/core/runtime/graphx/serialize.go [545:665]
func decodeType(t *v1pb.Type) (reflect.Type, error) {
if t == nil {
err := errors.New("empty type")
return nil, errors.WithContextf(err, "decoding type %v", t)
}
switch t.Kind {
case v1pb.Type_BOOL:
return reflectx.Bool, nil
case v1pb.Type_INT:
return reflectx.Int, nil
case v1pb.Type_INT8:
return reflectx.Int8, nil
case v1pb.Type_INT16:
return reflectx.Int16, nil
case v1pb.Type_INT32:
return reflectx.Int32, nil
case v1pb.Type_INT64:
return reflectx.Int64, nil
case v1pb.Type_UINT:
return reflectx.Uint, nil
case v1pb.Type_UINT8:
return reflectx.Uint8, nil
case v1pb.Type_UINT16:
return reflectx.Uint16, nil
case v1pb.Type_UINT32:
return reflectx.Uint32, nil
case v1pb.Type_UINT64:
return reflectx.Uint64, nil
case v1pb.Type_FLOAT32:
return reflectx.Float32, nil
case v1pb.Type_FLOAT64:
return reflectx.Float64, nil
case v1pb.Type_STRING:
return reflectx.String, nil
case v1pb.Type_SLICE:
elm, err := decodeType(t.GetElement())
if err != nil {
wrapped := errors.Wrap(err, "bad element")
return nil, errors.WithContextf(wrapped, "failed to decode type %v, bad element", t)
}
return reflect.SliceOf(elm), nil
case v1pb.Type_STRUCT:
var fields []reflect.StructField
for _, f := range t.Fields {
fType, err := decodeType(f.Type)
if err != nil {
wrapped := errors.Wrap(err, "bad field type")
return nil, errors.WithContextf(wrapped, "failed to decode type %v, bad field type", t)
}
field := reflect.StructField{
Name: f.GetName(),
PkgPath: f.GetPkgPath(),
Type: fType,
Tag: reflect.StructTag(f.GetTag()),
Offset: uintptr(f.GetOffset()),
Index: decodeInts(f.GetIndex()),
Anonymous: f.GetAnonymous(),
}
fields = append(fields, field)
}
return reflect.StructOf(fields), nil
case v1pb.Type_FUNC:
in, err := decodeTypes(t.GetParameterTypes())
if err != nil {
wrapped := errors.Wrap(err, "bad parameter type")
return nil, errors.WithContextf(wrapped, "decoding type %v", t)
}
out, err := decodeTypes(t.GetReturnTypes())
if err != nil {
wrapped := errors.Wrap(err, "bad return type")
return nil, errors.WithContextf(wrapped, "decoding type %v", t)
}
return reflect.FuncOf(in, out, t.GetIsVariadic()), nil
case v1pb.Type_CHAN:
elm, err := decodeType(t.GetElement())
if err != nil {
wrapped := errors.Wrap(err, "bad element")
return nil, errors.WithContextf(wrapped, "decoding type %v", t)
}
dir, err := decodeChanDir(t.GetChanDir())
if err != nil {
wrapped := errors.Wrap(err, "bad channel direction")
return nil, errors.WithContextf(wrapped, "decoding type %v", t)
}
return reflect.ChanOf(dir, elm), nil
case v1pb.Type_PTR:
elm, err := decodeType(t.GetElement())
if err != nil {
wrapped := errors.Wrap(err, "bad element")
return nil, errors.WithContextf(wrapped, "decoding type %v", t)
}
return reflect.PtrTo(elm), nil
case v1pb.Type_SPECIAL:
ret, err := decodeSpecial(t.Special)
if err != nil {
wrapped := errors.Wrap(err, "bad element")
return nil, errors.WithContextf(wrapped, "decoding type %v", t)
}
return ret, nil
case v1pb.Type_EXTERNAL:
ret, ok := runtime.LookupType(t.ExternalKey)
if !ok {
err := errors.Errorf("external key not found %v", t.ExternalKey)
return nil, errors.WithContextf(err, "decoding type %v", t)
}
return ret, nil
default:
err := errors.Errorf("unexpected type kind %v", t.Kind)
return nil, errors.WithContextf(err, "failed to decode type %v", t)
}
}