func primitiveToFlinkJobArg()

in pkg/flink/utils.go [78:99]


func primitiveToFlinkJobArg(arg string, primitive *core.Primitive) (string, error) {
	switch p := primitive.GetValue().(type) {
	case *core.Primitive_Integer:
		return fmt.Sprintf("--%s=%d", arg, p.Integer), nil
	case *core.Primitive_FloatValue:
		return fmt.Sprintf("--%s=%f", arg, p.FloatValue), nil
	case *core.Primitive_Boolean:
		if p.Boolean {
			return fmt.Sprintf("--%s", arg), nil
		}

		return "", nil
	case *core.Primitive_StringValue:
		return fmt.Sprintf("--%s=%s", arg, p.StringValue), nil
	case *core.Primitive_Datetime:
		return fmt.Sprintf("--%s=%s", arg, p.Datetime.String()), nil
	case *core.Primitive_Duration:
		return fmt.Sprintf("--%s=%s", arg, p.Duration.String()), nil
	default:
		return "", fmt.Errorf("not supported type: %s", p)
	}
}