func()

in pkg/flink/resources.go [277:301]


func (fc *FlinkCluster) updateFlinkProperties(config *Config, taskCtx FlinkTaskContext) error {

	props := MergeProperties(
		fc.Spec.FlinkProperties,
		taskCtx.Job.FlinkProperties,
		config.FlinkPropertiesOverride,
	)

	result := make(map[string]string)
	for k, v := range props {
		tmpl, err := flinkPropertiesTmpl.Parse(v)
		if err != nil {
			return err
		}
		var tpl bytes.Buffer
		ft := NewFlinkPropertiesTemplateData(taskCtx.Namespace, taskCtx.ClusterName, taskCtx.Labels)
		if err := tmpl.Execute(&tpl, ft); err != nil {
			return err
		}
		result[k] = tpl.String()
	}

	fc.Spec.FlinkProperties = result
	return nil
}