func newRevisionDataPatch()

in controllers/flinkcluster/flinkcluster_util.go [197:231]


func newRevisionDataPatch(cluster *v1beta1.FlinkCluster) ([]byte, error) {
	// Ignore fields not related to rendering job resource.
	var c *v1beta1.FlinkCluster
	if cluster.Spec.Job != nil {
		c = cluster.DeepCopy()
		c.Spec.Job.CleanupPolicy = nil
		c.Spec.Job.RestartPolicy = nil
		c.Spec.Job.CancelRequested = nil
		c.Spec.Job.SavepointGeneration = 0
	} else {
		c = cluster
	}

	str := &bytes.Buffer{}
	err := unstructured.UnstructuredJSONScheme.Encode(c, str)

	if err != nil {
		return nil, err
	}
	var raw map[string]interface{}
	json.Unmarshal([]byte(str.Bytes()), &raw)
	objCopy := make(map[string]interface{})
	spec := raw["spec"].(map[string]interface{})
	objCopy["spec"] = spec
	spec["$patch"] = "replace"

	// backward compatibility fix
	if c.Spec.Job != nil {
		job := spec["job"].(map[string]interface{})
		job["restartPolicy"] = nil
	}

	patch, err := json.Marshal(objCopy)
	return patch, err
}