func()

in apis/flinkcluster/v1beta1/flinkcluster_validate.go [242:284]


func (v *Validator) validateJobUpdate(old *FlinkCluster, new *FlinkCluster) error {
	switch {
	case old.Spec.Job == nil && new.Spec.Job == nil:
		return nil
	case old.Spec.Job == nil || new.Spec.Job == nil:
		oldJobSpec, _ := json.Marshal(old.Spec.Job)
		newJobSpec, _ := json.Marshal(new.Spec.Job)
		return fmt.Errorf("you cannot change cluster type between session cluster and job cluster, old spec.job: %q, new spec.job: %q", oldJobSpec, newJobSpec)
	case old.Spec.Job.SavepointsDir == nil || *old.Spec.Job.SavepointsDir == "":
		return fmt.Errorf("updating job is not allowed when spec.job.savepointsDir was not provided")
	case old.Spec.Job.SavepointsDir != nil && *old.Spec.Job.SavepointsDir != "" &&
		(new.Spec.Job.SavepointsDir == nil || *new.Spec.Job.SavepointsDir == ""):
		return fmt.Errorf("removing savepointsDir is not allowed")
	case old.IsHighAvailabilityEnabled() != new.IsHighAvailabilityEnabled():
		return fmt.Errorf("updating highAvailability settings is not allowed")
	case !isBlank(new.Spec.Job.FromSavepoint):
		return nil
	default:
		// In the case of taking savepoint is skipped, check if the savepoint is up-to-date.
		var oldJob = old.Status.Components.Job
		var takeSavepointOnUpdate = new.Spec.Job.TakeSavepointOnUpdate == nil || *new.Spec.Job.TakeSavepointOnUpdate
		var skipTakeSavepoint = !takeSavepointOnUpdate || oldJob.IsStopped()
		var now = time.Now()
		if skipTakeSavepoint && oldJob != nil && !oldJob.UpdateReady(new.Spec.Job, now) {
			oldJobJson, _ := json.Marshal(oldJob)
			var takeSP, maxStateAge string
			if new.Spec.Job.TakeSavepointOnUpdate == nil {
				takeSP = "nil"
			} else {
				takeSP = strconv.FormatBool(*new.Spec.Job.TakeSavepointOnUpdate)
			}
			if new.Spec.Job.MaxStateAgeToRestoreSeconds == nil {
				maxStateAge = "nil"
			} else {
				maxStateAge = strconv.Itoa(int(*new.Spec.Job.MaxStateAgeToRestoreSeconds))
			}
			return fmt.Errorf("cannot update spec: taking savepoint is skipped but no up-to-date savepoint, "+
				"spec.job.takeSavepointOnUpdate: %v, spec.job.maxStateAgeToRestoreSeconds: %v, job status: %q",
				takeSP, maxStateAge, oldJobJson)
		}
	}
	return nil
}