in pkg/flink/resources.go [277:301]
func (fc *FlinkCluster) updateFlinkProperties(config *Config, taskCtx FlinkTaskContext) error {
props := MergeProperties(
fc.Spec.FlinkProperties,
taskCtx.Job.FlinkProperties,
config.FlinkPropertiesOverride,
)
result := make(map[string]string)
for k, v := range props {
tmpl, err := flinkPropertiesTmpl.Parse(v)
if err != nil {
return err
}
var tpl bytes.Buffer
ft := NewFlinkPropertiesTemplateData(taskCtx.Namespace, taskCtx.ClusterName, taskCtx.Labels)
if err := tmpl.Execute(&tpl, ft); err != nil {
return err
}
result[k] = tpl.String()
}
fc.Spec.FlinkProperties = result
return nil
}