func()

in sdks/go/pkg/beam/core/runtime/graphx/coder.go [186:393]


func (b *CoderUnmarshaller) makeCoder(id string, c *pipepb.Coder) (*coder.Coder, error) {
	urn := c.GetSpec().GetUrn()
	components := c.GetComponentCoderIds()

	switch urn {
	case urnBytesCoder:
		return coder.NewBytes(), nil

	case urnBoolCoder:
		return coder.NewBool(), nil

	case urnVarIntCoder:
		return coder.NewVarInt(), nil

	case urnDoubleCoder:
		return coder.NewDouble(), nil

	case urnStringCoder:
		return coder.NewString(), nil

	case urnKVCoder:
		if len(components) != 2 {
			return nil, errors.Errorf("could not unmarshal KV coder from %v, want exactly 2 components but have %d", c, len(components))
		}

		key, err := b.Coder(components[0])
		if err != nil {
			return nil, err
		}

		id := components[1]
		kind := coder.KV
		root := typex.KVType

		elm, err := b.peek(id)
		if err != nil {
			return nil, err
		}

		switch elm.GetSpec().GetUrn() {
		case urnIterableCoder, urnStateBackedIterableCoder:
			id = elm.GetComponentCoderIds()[0]
			kind = coder.CoGBK
			root = typex.CoGBKType

			// TODO(BEAM-490): If CoGBK with > 1 input, handle as special GBK. We expect
			// it to be encoded as CoGBK<K,LP<CoGBKList<V,W,..>>>. Remove this handling once
			// CoGBK has a first-class representation.

			if ids, ok := b.isCoGBKList(id); ok {
				// CoGBK<K,V,W,..>

				values, err := b.Coders(ids)
				if err != nil {
					return nil, err
				}

				t := typex.New(root, append([]typex.FullType{key.T}, coder.Types(values)...)...)
				return &coder.Coder{Kind: kind, T: t, Components: append([]*coder.Coder{key}, values...)}, nil
			}
		}

		value, err := b.Coder(id)
		if err != nil {
			return nil, err
		}

		t := typex.New(root, key.T, value.T)
		return &coder.Coder{Kind: kind, T: t, Components: []*coder.Coder{key, value}}, nil

	case urnLengthPrefixCoder:
		if len(components) != 1 {
			return nil, errors.Errorf("could not unmarshal length prefix coder from %v, want a single sub component but have %d", c, len(components))
		}

		sub, err := b.peek(components[0])
		if err != nil {
			return nil, err
		}

		// No payload means this coder was length prefixed by the runner
		// but is likely self describing - AKA a beam coder.
		if len(sub.GetSpec().GetPayload()) == 0 {
			return b.makeCoder(components[0], sub)
		}
		// TODO(lostluck) 2018/10/17: Make this strict again, once dataflow can use
		// the portable pipeline model directly (BEAM-2885)
		switch u := sub.GetSpec().GetUrn(); u {
		case "", urnCustomCoder:
			var ref v1pb.CustomCoder
			if err := protox.DecodeBase64(string(sub.GetSpec().GetPayload()), &ref); err != nil {
				return nil, err
			}
			custom, err := decodeCustomCoder(&ref)
			if err != nil {
				return nil, err
			}
			custom.ID = components[0]
			t := typex.New(custom.Type)
			cc := &coder.Coder{Kind: coder.Custom, T: t, Custom: custom}
			return cc, nil
		case urnBytesCoder, urnStringCoder: // implicitly length prefixed types.
			return b.makeCoder(components[0], sub)
		default:
			// Handle Length prefixing dictated by the runner.
			cc, err := b.makeCoder(components[0], sub)
			if err != nil {
				return nil, err
			}
			return &coder.Coder{Kind: coder.LP, T: cc.T, Components: []*coder.Coder{cc}}, nil
		}

	case urnWindowedValueCoder, urnParamWindowedValueCoder:
		if len(components) != 2 {
			return nil, errors.Errorf("could not unmarshal windowed value coder from %v, expected two components but got %d", c, len(components))
		}

		elm, err := b.Coder(components[0])
		if err != nil {
			return nil, err
		}
		w, err := b.WindowCoder(components[1])
		if err != nil {
			return nil, err
		}
		t := typex.New(typex.WindowedValueType, elm.T)
		wvc := &coder.Coder{Kind: coder.WindowedValue, T: t, Components: []*coder.Coder{elm}, Window: w}
		if urn == urnWindowedValueCoder {
			return wvc, nil
		}
		wvc.Kind = coder.ParamWindowedValue
		wvc.Window.Payload = string(c.GetSpec().GetPayload())
		return wvc, nil

	case streamType:
		return nil, errors.Errorf("could not unmarshal stream type coder from %v, stream must be pair value", c)

	case "":
		// TODO(herohde) 11/27/2017: we still see CoderRefs from Dataflow. Handle that
		// case here, for now, so that the harness can use this logic.

		payload := c.GetSpec().GetPayload()

		var ref CoderRef
		if err := json.Unmarshal(payload, &ref); err != nil {
			return nil, errors.Wrapf(err, "could not unmarshal CoderRef from %v, failed to decode urn-less coder's payload \"%v\"", c, string(payload))
		}
		c, err := DecodeCoderRef(&ref)
		if err != nil {
			return nil, errors.Wrapf(err, "could not unmarshal CoderRef from %v, failed to decode CoderRef \"%v\"", c, string(payload))
		}
		return c, nil

	case urnIterableCoder:
		if len(components) != 1 {
			return nil, errors.Errorf("could not unmarshal iterable coder from %v, expected one component but got %d", c, len(components))
		}
		elm, err := b.Coder(components[0])
		if err != nil {
			return nil, err
		}
		return coder.NewI(elm), nil
	case urnTimerCoder:
		if len(components) != 2 {
			return nil, errors.Errorf("could not unmarshal timer coder from %v, expected two component but got %d", c, len(components))
		}
		elm, err := b.Coder(components[0])
		if err != nil {
			return nil, err
		}
		w, err := b.WindowCoder(components[1])
		if err != nil {
			return nil, err
		}
		return coder.NewT(elm, w), nil
	case urnRowCoder:
		var s pipepb.Schema
		if err := proto.Unmarshal(c.GetSpec().GetPayload(), &s); err != nil {
			return nil, err
		}
		t, err := schema.ToType(&s)
		if err != nil {
			return nil, err
		}
		return coder.NewR(typex.New(t)), nil

		// Special handling for window coders so they can be treated as
		// a general coder. Generally window coders are not used outside of
		// specific contexts, but this enables improved testing.
		// Window types are not permitted to be fulltypes, so
		// we use assignably equivalent anonymous struct types.
	case urnIntervalWindow:
		w, err := b.WindowCoder(id)
		if err != nil {
			return nil, err
		}
		return &coder.Coder{Kind: coder.Window, T: typex.New(reflect.TypeOf((*struct{ Start, End int64 })(nil)).Elem()), Window: w}, nil
	case urnGlobalWindow:
		w, err := b.WindowCoder(id)
		if err != nil {
			return nil, err
		}
		return &coder.Coder{Kind: coder.Window, T: typex.New(reflect.TypeOf((*struct{})(nil)).Elem()), Window: w}, nil

	default:
		return nil, errors.Errorf("could not unmarshal coder from %v, unknown URN %v", c, urn)
	}
}