func literalToFlinkJobArg()

in pkg/flink/utils.go [36:67]


func literalToFlinkJobArg(key string, literal *core.Literal) ([]string, error) {
	switch l := literal.GetValue().(type) {
	case *core.Literal_Scalar:
		arg, err := scalarToFlinkJobArg(key, l.Scalar)
		if err != nil {
			return nil, err
		}

		// empty arg on boolean false value
		if len(arg) == 0 {
			return []string{}, nil
		}

		return []string{arg}, nil
	case *core.Literal_Collection:
		literals := l.Collection.GetLiterals()
		args := []string{}

		for _, l := range literals {
			strArgs, err := literalToFlinkJobArg(key, l)
			if err != nil {
				return nil, err
			}

			args = append(args, strArgs...)
		}

		return args, nil
	default:
		return nil, fmt.Errorf("not supported type: %s", l)
	}
}