in apis/flinkcluster/v1beta1/flinkcluster_validate.go [242:284]
func (v *Validator) validateJobUpdate(old *FlinkCluster, new *FlinkCluster) error {
switch {
case old.Spec.Job == nil && new.Spec.Job == nil:
return nil
case old.Spec.Job == nil || new.Spec.Job == nil:
oldJobSpec, _ := json.Marshal(old.Spec.Job)
newJobSpec, _ := json.Marshal(new.Spec.Job)
return fmt.Errorf("you cannot change cluster type between session cluster and job cluster, old spec.job: %q, new spec.job: %q", oldJobSpec, newJobSpec)
case old.Spec.Job.SavepointsDir == nil || *old.Spec.Job.SavepointsDir == "":
return fmt.Errorf("updating job is not allowed when spec.job.savepointsDir was not provided")
case old.Spec.Job.SavepointsDir != nil && *old.Spec.Job.SavepointsDir != "" &&
(new.Spec.Job.SavepointsDir == nil || *new.Spec.Job.SavepointsDir == ""):
return fmt.Errorf("removing savepointsDir is not allowed")
case old.IsHighAvailabilityEnabled() != new.IsHighAvailabilityEnabled():
return fmt.Errorf("updating highAvailability settings is not allowed")
case !isBlank(new.Spec.Job.FromSavepoint):
return nil
default:
// In the case of taking savepoint is skipped, check if the savepoint is up-to-date.
var oldJob = old.Status.Components.Job
var takeSavepointOnUpdate = new.Spec.Job.TakeSavepointOnUpdate == nil || *new.Spec.Job.TakeSavepointOnUpdate
var skipTakeSavepoint = !takeSavepointOnUpdate || oldJob.IsStopped()
var now = time.Now()
if skipTakeSavepoint && oldJob != nil && !oldJob.UpdateReady(new.Spec.Job, now) {
oldJobJson, _ := json.Marshal(oldJob)
var takeSP, maxStateAge string
if new.Spec.Job.TakeSavepointOnUpdate == nil {
takeSP = "nil"
} else {
takeSP = strconv.FormatBool(*new.Spec.Job.TakeSavepointOnUpdate)
}
if new.Spec.Job.MaxStateAgeToRestoreSeconds == nil {
maxStateAge = "nil"
} else {
maxStateAge = strconv.Itoa(int(*new.Spec.Job.MaxStateAgeToRestoreSeconds))
}
return fmt.Errorf("cannot update spec: taking savepoint is skipped but no up-to-date savepoint, "+
"spec.job.takeSavepointOnUpdate: %v, spec.job.maxStateAgeToRestoreSeconds: %v, job status: %q",
takeSP, maxStateAge, oldJobJson)
}
}
return nil
}