func EncodeCoderRef()

in sdks/go/pkg/beam/core/runtime/graphx/dataflow.go [89:191]


func EncodeCoderRef(c *coder.Coder) (*CoderRef, error) {
	switch c.Kind {
	case coder.Custom:
		ref, err := encodeCustomCoder(c.Custom)
		if err != nil {
			return nil, err
		}
		data, err := protox.EncodeBase64(ref)
		if err != nil {
			return nil, err
		}
		return &CoderRef{
			Type:       lengthPrefixType,
			Components: []*CoderRef{{Type: data, PipelineProtoCoderID: c.Custom.ID}},
		}, nil

	case coder.KV:
		if len(c.Components) != 2 {
			return nil, errors.Errorf("bad KV: %v", c)
		}

		key, err := EncodeCoderRef(c.Components[0])
		if err != nil {
			return nil, err
		}
		value, err := EncodeCoderRef(c.Components[1])
		if err != nil {
			return nil, err
		}
		return &CoderRef{Type: pairType, Components: []*CoderRef{key, value}, IsPairLike: true}, nil

	case coder.CoGBK:
		if len(c.Components) < 2 {
			return nil, errors.Errorf("bad CoGBK: %v", c)
		}

		refs, err := EncodeCoderRefs(c.Components)
		if err != nil {
			return nil, err
		}

		value := refs[1]
		if len(c.Components) > 2 {
			// TODO(BEAM-490): don't inject union coder for CoGBK.

			union := &CoderRef{Type: cogbklistType, Components: refs[1:]}
			value = &CoderRef{Type: lengthPrefixType, Components: []*CoderRef{union}}
		}

		stream := &CoderRef{Type: streamType, Components: []*CoderRef{value}, IsStreamLike: true}
		return &CoderRef{Type: pairType, Components: []*CoderRef{refs[0], stream}, IsPairLike: true}, nil

	case coder.WindowedValue:
		if len(c.Components) != 1 || c.Window == nil {
			return nil, errors.Errorf("bad windowed value: %v", c)
		}

		elm, err := EncodeCoderRef(c.Components[0])
		if err != nil {
			return nil, err
		}
		w, err := encodeWindowCoder(c.Window)
		if err != nil {
			return nil, err
		}
		return &CoderRef{Type: windowedValueType, Components: []*CoderRef{elm, w}, IsWrapper: true}, nil

	case coder.Bytes:
		return &CoderRef{Type: bytesType}, nil

	case coder.Bool:
		return &CoderRef{Type: boolType}, nil

	case coder.VarInt:
		return &CoderRef{Type: varIntType}, nil

	case coder.Double:
		return &CoderRef{Type: doubleType}, nil

	case coder.String:
		return &CoderRef{
			Type:       lengthPrefixType,
			Components: []*CoderRef{{Type: stringType, PipelineProtoCoderID: c.ID}},
		}, nil

	case coder.Row:
		schm, err := schema.FromType(c.T.Type())
		if err != nil {
			return nil, err
		}
		data, err := protox.EncodeBase64(schm)
		if err != nil {
			return nil, err
		}
		return &CoderRef{
			Type:       rowType,
			Components: []*CoderRef{{Type: data}},
		}, nil

	default:
		return nil, errors.Errorf("bad coder kind: %v", c.Kind)
	}
}