func()

in apis/flinkcluster/v1beta1/flinkcluster_validate.go [454:497]


func (v *Validator) validateJob(jobSpec *JobSpec) error {
	if jobSpec == nil {
		return nil
	}

	fp := field.NewPath("spec.job")
	if errors := validation.ValidateAnnotations(jobSpec.PodAnnotations, fp.Child("podAnnotations")); len(errors) > 0 {
		return fmt.Errorf(errors.ToAggregate().Error())
	}
	if errors := v1validation.ValidateLabels(jobSpec.PodLabels, fp.Child("podLabels")); len(errors) > 0 {
		return fmt.Errorf(errors.ToAggregate().Error())
	}

	applicationMode := jobSpec.Mode != nil && *jobSpec.Mode == JobModeApplication
	if !applicationMode && jobSpec.JarFile == nil && jobSpec.PyFile == nil && jobSpec.PyModule == nil {
		return fmt.Errorf("job jarFile or pythonFile or pythonModule is unspecified")
	}

	if jobSpec.Parallelism != nil && *jobSpec.Parallelism < 1 {
		return fmt.Errorf("job parallelism must be >= 1")
	}

	switch *jobSpec.RestartPolicy {
	case JobRestartPolicyNever:
	case JobRestartPolicyFromSavepointOnFailure:
		if jobSpec.MaxStateAgeToRestoreSeconds == nil {
			return fmt.Errorf("maxStateAgeToRestoreSeconds must be specified when restartPolicy is set as FromSavepointOnFailure")
		}
	default:
		return fmt.Errorf("invalid job restartPolicy: %v", *jobSpec.RestartPolicy)
	}

	if jobSpec.TakeSavepointOnUpdate != nil && !*jobSpec.TakeSavepointOnUpdate &&
		jobSpec.MaxStateAgeToRestoreSeconds == nil {
		return fmt.Errorf("maxStateAgeToRestoreSeconds must be specified when takeSavepointOnUpdate is set as false")
	}

	if jobSpec.CancelRequested != nil && *jobSpec.CancelRequested {
		return fmt.Errorf(
			"property `cancelRequested` cannot be set to true for a new job")
	}

	return nil
}