func inferCoder()

in sdks/go/pkg/beam/coder.go [163:256]


func inferCoder(t FullType) (*coder.Coder, error) {
	switch t.Class() {
	case typex.Concrete, typex.Container:
		switch t.Type() {
		case reflectx.Int64:
			// use the beam varint coder.
			return &coder.Coder{Kind: coder.VarInt, T: t}, nil
		case reflectx.Int, reflectx.Int8, reflectx.Int16, reflectx.Int32:
			c, err := coderx.NewVarIntZ(t.Type())
			if err != nil {
				return nil, err
			}
			return coder.CoderFrom(c), nil
		case reflectx.Uint, reflectx.Uint8, reflectx.Uint16, reflectx.Uint32, reflectx.Uint64:
			c, err := coderx.NewVarUintZ(t.Type())
			if err != nil {
				return nil, err
			}
			return coder.CoderFrom(c), nil

		case reflectx.Float32:
			c, err := coderx.NewFloat(t.Type())
			if err != nil {
				return nil, err
			}
			return coder.CoderFrom(c), nil

		case reflectx.Float64:
			return &coder.Coder{Kind: coder.Double, T: t}, nil

		case reflectx.String:
			return &coder.Coder{Kind: coder.String, T: t}, nil

		case reflectx.ByteSlice:
			return &coder.Coder{Kind: coder.Bytes, T: t}, nil

		case reflectx.Bool:
			return &coder.Coder{Kind: coder.Bool, T: t}, nil

		default:
			et := t.Type()
			if c := coder.LookupCustomCoder(et); c != nil {
				return coder.CoderFrom(c), nil
			}

			if EnableSchemas {
				switch et.Kind() {
				case reflect.Ptr:
					if et.Elem().Kind() != reflect.Struct {
						break
					}
					fallthrough
				case reflect.Struct:
					return &coder.Coder{Kind: coder.Row, T: t}, nil
				}
			}

			// Interface types that implement JSON marshalling can be handled by the default coder.
			// otherwise, inference needs to fail here.
			if et.Kind() == reflect.Interface && !et.Implements(jsonCoderType) {
				return nil, errors.Errorf("inferCoder failed: interface type %v has no coder registered", et)
			}

			c, err := newJSONCoder(et)
			if err != nil {
				return nil, err
			}
			return &coder.Coder{Kind: coder.Custom, T: t, Custom: c}, nil
		}

	case typex.Composite:
		c, err := inferCoders(t.Components())
		if err != nil {
			return nil, err
		}

		switch t.Type() {
		case typex.KVType:
			return &coder.Coder{Kind: coder.KV, T: t, Components: c}, nil
		case typex.CoGBKType:
			return &coder.Coder{Kind: coder.CoGBK, T: t, Components: c}, nil
		case typex.WindowedValueType:
			// TODO(herohde) 4/15/2018: do we ever infer W types now that PCollections
			// are non-windowed? We either need to know the windowing strategy or
			// we should remove this case.
			return &coder.Coder{Kind: coder.WindowedValue, T: t, Components: c, Window: coder.NewGlobalWindow()}, nil

		default:
			panic(fmt.Sprintf("Unexpected composite type: %v", t))
		}
	default:
		panic(fmt.Sprintf("Unexpected type: %v", t))
	}
}