in controllers/flinkcluster/flinkcluster_util.go [197:231]
func newRevisionDataPatch(cluster *v1beta1.FlinkCluster) ([]byte, error) {
// Ignore fields not related to rendering job resource.
var c *v1beta1.FlinkCluster
if cluster.Spec.Job != nil {
c = cluster.DeepCopy()
c.Spec.Job.CleanupPolicy = nil
c.Spec.Job.RestartPolicy = nil
c.Spec.Job.CancelRequested = nil
c.Spec.Job.SavepointGeneration = 0
} else {
c = cluster
}
str := &bytes.Buffer{}
err := unstructured.UnstructuredJSONScheme.Encode(c, str)
if err != nil {
return nil, err
}
var raw map[string]interface{}
json.Unmarshal([]byte(str.Bytes()), &raw)
objCopy := make(map[string]interface{})
spec := raw["spec"].(map[string]interface{})
objCopy["spec"] = spec
spec["$patch"] = "replace"
// backward compatibility fix
if c.Spec.Job != nil {
job := spec["job"].(map[string]interface{})
job["restartPolicy"] = nil
}
patch, err := json.Marshal(objCopy)
return patch, err
}