in apis/flinkcluster/v1beta1/flinkcluster_validate.go [131:160]
func (v *Validator) checkControlAnnotations(old *FlinkCluster, new *FlinkCluster) error {
oldUserControl := old.Annotations[ControlAnnotation]
newUserControl, ok := new.Annotations[ControlAnnotation]
if ok {
if oldUserControl != newUserControl && old.Status.Control != nil && old.Status.Control.State == ControlStateInProgress {
return fmt.Errorf(ControlChangeWarnMsg, ControlAnnotation)
}
switch newUserControl {
case ControlNameJobCancel:
var job = old.Status.Components.Job
if old.Spec.Job == nil {
return fmt.Errorf(SessionClusterWarnMsg, ControlNameJobCancel, ControlAnnotation)
} else if job == nil || job.IsTerminated(old.Spec.Job) {
return errors.NewResourceExpired(fmt.Sprintf(InvalidJobStateForJobCancelMsg, ControlAnnotation))
}
case ControlNameSavepoint:
var job = old.Status.Components.Job
if old.Spec.Job == nil {
return fmt.Errorf(SessionClusterWarnMsg, ControlNameSavepoint, ControlAnnotation)
} else if old.Spec.Job.SavepointsDir == nil || *old.Spec.Job.SavepointsDir == "" {
return fmt.Errorf(InvalidSavepointDirMsg, ControlAnnotation)
} else if job == nil || job.IsStopped() {
return fmt.Errorf(InvalidJobStateForSavepointMsg, ControlAnnotation)
}
default:
return fmt.Errorf(InvalidControlAnnMsg, ControlAnnotation, newUserControl)
}
}
return nil
}