in pkg/flink/utils.go [36:67]
func literalToFlinkJobArg(key string, literal *core.Literal) ([]string, error) {
switch l := literal.GetValue().(type) {
case *core.Literal_Scalar:
arg, err := scalarToFlinkJobArg(key, l.Scalar)
if err != nil {
return nil, err
}
// empty arg on boolean false value
if len(arg) == 0 {
return []string{}, nil
}
return []string{arg}, nil
case *core.Literal_Collection:
literals := l.Collection.GetLiterals()
args := []string{}
for _, l := range literals {
strArgs, err := literalToFlinkJobArg(key, l)
if err != nil {
return nil, err
}
args = append(args, strArgs...)
}
return args, nil
default:
return nil, fmt.Errorf("not supported type: %s", l)
}
}