in sdks/go/pkg/beam/core/graph/bind.go [170:301]
func tryBindInbound(t typex.FullType, args []funcx.FnParam, isMain bool) (typex.FullType, InputKind, error) {
kind := Main
var other typex.FullType
switch t.Class() {
case typex.Concrete, typex.Container:
if isMain {
other = typex.New(args[0].T)
} else {
// We accept various forms for side input. We have to disambiguate
// []string into a Singleton of type []string or a Slice of type
// string by matching up the incoming type and the param type.
arg := args[0]
switch arg.Kind {
case funcx.FnValue:
if args[0].T.Kind() == reflect.Slice && t.Type() == args[0].T.Elem() {
// TODO(herohde) 6/29/2017: we do not allow universal slices, for now.
kind = Slice
other = typex.New(args[0].T.Elem())
} else {
kind = Singleton
other = typex.New(args[0].T)
}
case funcx.FnIter:
values, _ := funcx.UnfoldIter(args[0].T)
trimmed := trimIllegal(values)
if len(trimmed) != 1 {
return nil, kind, errors.Errorf("%v cannot bind to %v", t, args[0])
}
kind = Iter
other = typex.New(trimmed[0])
case funcx.FnReIter:
values, _ := funcx.UnfoldReIter(args[0].T)
trimmed := trimIllegal(values)
if len(trimmed) != 1 {
return nil, kind, errors.Errorf("%v cannot bind to %v", t, args[0])
}
kind = ReIter
other = typex.New(trimmed[0])
default:
return nil, kind, errors.Errorf("unexpected param kind: %v", arg)
}
}
case typex.Composite:
switch t.Type() {
case typex.KVType:
if isMain {
if args[0].Kind != funcx.FnValue {
return nil, kind, errors.Errorf("key of %v cannot bind to %v", t, args[0])
}
if args[1].Kind != funcx.FnValue {
return nil, kind, errors.Errorf("value of %v cannot bind to %v", t, args[1])
}
other = typex.NewKV(typex.New(args[0].T), typex.New(args[1].T))
} else {
// TODO(herohde) 6/29/2017: side input map form.
switch args[0].Kind {
case funcx.FnIter:
values, _ := funcx.UnfoldIter(args[0].T)
trimmed := trimIllegal(values)
if len(trimmed) != 2 {
return nil, kind, errors.Errorf("%v cannot bind to %v", t, args[0])
}
kind = Iter
other = typex.NewKV(typex.New(trimmed[0]), typex.New(trimmed[1]))
case funcx.FnReIter:
values, _ := funcx.UnfoldReIter(args[0].T)
trimmed := trimIllegal(values)
if len(trimmed) != 2 {
return nil, kind, errors.Errorf("%v cannot bind to %v", t, args[0])
}
kind = ReIter
other = typex.NewKV(typex.New(trimmed[0]), typex.New(trimmed[1]))
default:
return nil, kind, errors.Errorf("%v cannot bind to %v", t, args[0])
}
}
case typex.CoGBKType:
if args[0].Kind != funcx.FnValue {
return nil, kind, errors.Errorf("key of %v cannot bind to %v", t, args[0])
}
components := []typex.FullType{typex.New(args[0].T)}
for i := 1; i < len(args); i++ {
switch args[i].Kind {
case funcx.FnIter:
values, _ := funcx.UnfoldIter(args[i].T)
trimmed := trimIllegal(values)
if len(trimmed) != 1 {
return nil, kind, errors.Errorf("values of %v cannot bind to %v", t, args[i])
}
components = append(components, typex.New(trimmed[0]))
case funcx.FnReIter:
values, _ := funcx.UnfoldReIter(args[i].T)
trimmed := trimIllegal(values)
if len(trimmed) != 1 {
return nil, kind, errors.Errorf("values of %v cannot bind to %v", t, args[i])
}
components = append(components, typex.New(trimmed[0]))
default:
return nil, kind, errors.Errorf("values of %v cannot bind to %v", t, args[i])
}
}
other = typex.NewCoGBK(components...)
default:
return nil, kind, errors.Errorf("unexpected inbound type: %v", t.Type())
}
default:
return nil, kind, errors.Errorf("unexpected inbound class: %v", t.Class())
}
if !typex.IsStructurallyAssignable(t, other) {
return nil, kind, errors.Errorf("%v is not assignable to %v", t, other)
}
return other, kind, nil
}