func()

in apis/flinkcluster/v1beta1/flinkcluster_validate.go [131:160]


func (v *Validator) checkControlAnnotations(old *FlinkCluster, new *FlinkCluster) error {
	oldUserControl := old.Annotations[ControlAnnotation]
	newUserControl, ok := new.Annotations[ControlAnnotation]
	if ok {
		if oldUserControl != newUserControl && old.Status.Control != nil && old.Status.Control.State == ControlStateInProgress {
			return fmt.Errorf(ControlChangeWarnMsg, ControlAnnotation)
		}
		switch newUserControl {
		case ControlNameJobCancel:
			var job = old.Status.Components.Job
			if old.Spec.Job == nil {
				return fmt.Errorf(SessionClusterWarnMsg, ControlNameJobCancel, ControlAnnotation)
			} else if job == nil || job.IsTerminated(old.Spec.Job) {
				return errors.NewResourceExpired(fmt.Sprintf(InvalidJobStateForJobCancelMsg, ControlAnnotation))
			}
		case ControlNameSavepoint:
			var job = old.Status.Components.Job
			if old.Spec.Job == nil {
				return fmt.Errorf(SessionClusterWarnMsg, ControlNameSavepoint, ControlAnnotation)
			} else if old.Spec.Job.SavepointsDir == nil || *old.Spec.Job.SavepointsDir == "" {
				return fmt.Errorf(InvalidSavepointDirMsg, ControlAnnotation)
			} else if job == nil || job.IsStopped() {
				return fmt.Errorf(InvalidJobStateForSavepointMsg, ControlAnnotation)
			}
		default:
			return fmt.Errorf(InvalidControlAnnMsg, ControlAnnotation, newUserControl)
		}
	}
	return nil
}