func tryBindInbound()

in sdks/go/pkg/beam/core/graph/bind.go [170:301]


func tryBindInbound(t typex.FullType, args []funcx.FnParam, isMain bool) (typex.FullType, InputKind, error) {
	kind := Main
	var other typex.FullType

	switch t.Class() {
	case typex.Concrete, typex.Container:
		if isMain {
			other = typex.New(args[0].T)
		} else {
			// We accept various forms for side input. We have to disambiguate
			// []string into a Singleton of type []string or a Slice of type
			// string by matching up the incoming type and the param type.

			arg := args[0]
			switch arg.Kind {
			case funcx.FnValue:
				if args[0].T.Kind() == reflect.Slice && t.Type() == args[0].T.Elem() {
					// TODO(herohde) 6/29/2017: we do not allow universal slices, for now.

					kind = Slice
					other = typex.New(args[0].T.Elem())
				} else {
					kind = Singleton
					other = typex.New(args[0].T)
				}
			case funcx.FnIter:
				values, _ := funcx.UnfoldIter(args[0].T)
				trimmed := trimIllegal(values)
				if len(trimmed) != 1 {
					return nil, kind, errors.Errorf("%v cannot bind to %v", t, args[0])
				}

				kind = Iter
				other = typex.New(trimmed[0])

			case funcx.FnReIter:
				values, _ := funcx.UnfoldReIter(args[0].T)
				trimmed := trimIllegal(values)
				if len(trimmed) != 1 {
					return nil, kind, errors.Errorf("%v cannot bind to %v", t, args[0])
				}

				kind = ReIter
				other = typex.New(trimmed[0])

			default:
				return nil, kind, errors.Errorf("unexpected param kind: %v", arg)
			}
		}
	case typex.Composite:
		switch t.Type() {
		case typex.KVType:
			if isMain {
				if args[0].Kind != funcx.FnValue {
					return nil, kind, errors.Errorf("key of %v cannot bind to %v", t, args[0])
				}
				if args[1].Kind != funcx.FnValue {
					return nil, kind, errors.Errorf("value of %v cannot bind to %v", t, args[1])
				}
				other = typex.NewKV(typex.New(args[0].T), typex.New(args[1].T))
			} else {
				// TODO(herohde) 6/29/2017: side input map form.

				switch args[0].Kind {
				case funcx.FnIter:
					values, _ := funcx.UnfoldIter(args[0].T)
					trimmed := trimIllegal(values)
					if len(trimmed) != 2 {
						return nil, kind, errors.Errorf("%v cannot bind to %v", t, args[0])
					}

					kind = Iter
					other = typex.NewKV(typex.New(trimmed[0]), typex.New(trimmed[1]))

				case funcx.FnReIter:
					values, _ := funcx.UnfoldReIter(args[0].T)
					trimmed := trimIllegal(values)
					if len(trimmed) != 2 {
						return nil, kind, errors.Errorf("%v cannot bind to %v", t, args[0])
					}

					kind = ReIter
					other = typex.NewKV(typex.New(trimmed[0]), typex.New(trimmed[1]))

				default:
					return nil, kind, errors.Errorf("%v cannot bind to %v", t, args[0])
				}
			}

		case typex.CoGBKType:
			if args[0].Kind != funcx.FnValue {
				return nil, kind, errors.Errorf("key of %v cannot bind to %v", t, args[0])
			}

			components := []typex.FullType{typex.New(args[0].T)}

			for i := 1; i < len(args); i++ {
				switch args[i].Kind {
				case funcx.FnIter:
					values, _ := funcx.UnfoldIter(args[i].T)
					trimmed := trimIllegal(values)
					if len(trimmed) != 1 {
						return nil, kind, errors.Errorf("values of %v cannot bind to %v", t, args[i])
					}
					components = append(components, typex.New(trimmed[0]))

				case funcx.FnReIter:
					values, _ := funcx.UnfoldReIter(args[i].T)
					trimmed := trimIllegal(values)
					if len(trimmed) != 1 {
						return nil, kind, errors.Errorf("values of %v cannot bind to %v", t, args[i])
					}
					components = append(components, typex.New(trimmed[0]))
				default:
					return nil, kind, errors.Errorf("values of %v cannot bind to %v", t, args[i])
				}
			}
			other = typex.NewCoGBK(components...)

		default:
			return nil, kind, errors.Errorf("unexpected inbound type: %v", t.Type())
		}

	default:
		return nil, kind, errors.Errorf("unexpected inbound class: %v", t.Class())
	}

	if !typex.IsStructurallyAssignable(t, other) {
		return nil, kind, errors.Errorf("%v is not assignable to %v", t, other)
	}
	return other, kind, nil
}