in apis/flinkcluster/v1beta1/flinkcluster_validate.go [454:497]
func (v *Validator) validateJob(jobSpec *JobSpec) error {
if jobSpec == nil {
return nil
}
fp := field.NewPath("spec.job")
if errors := validation.ValidateAnnotations(jobSpec.PodAnnotations, fp.Child("podAnnotations")); len(errors) > 0 {
return fmt.Errorf(errors.ToAggregate().Error())
}
if errors := v1validation.ValidateLabels(jobSpec.PodLabels, fp.Child("podLabels")); len(errors) > 0 {
return fmt.Errorf(errors.ToAggregate().Error())
}
applicationMode := jobSpec.Mode != nil && *jobSpec.Mode == JobModeApplication
if !applicationMode && jobSpec.JarFile == nil && jobSpec.PyFile == nil && jobSpec.PyModule == nil {
return fmt.Errorf("job jarFile or pythonFile or pythonModule is unspecified")
}
if jobSpec.Parallelism != nil && *jobSpec.Parallelism < 1 {
return fmt.Errorf("job parallelism must be >= 1")
}
switch *jobSpec.RestartPolicy {
case JobRestartPolicyNever:
case JobRestartPolicyFromSavepointOnFailure:
if jobSpec.MaxStateAgeToRestoreSeconds == nil {
return fmt.Errorf("maxStateAgeToRestoreSeconds must be specified when restartPolicy is set as FromSavepointOnFailure")
}
default:
return fmt.Errorf("invalid job restartPolicy: %v", *jobSpec.RestartPolicy)
}
if jobSpec.TakeSavepointOnUpdate != nil && !*jobSpec.TakeSavepointOnUpdate &&
jobSpec.MaxStateAgeToRestoreSeconds == nil {
return fmt.Errorf("maxStateAgeToRestoreSeconds must be specified when takeSavepointOnUpdate is set as false")
}
if jobSpec.CancelRequested != nil && *jobSpec.CancelRequested {
return fmt.Errorf(
"property `cancelRequested` cannot be set to true for a new job")
}
return nil
}