in pkg/webhooks/admission/jobs/validate/admit_job.go [110:218]
func validateJobCreate(job *v1alpha1.Job, reviewResponse *admissionv1.AdmissionResponse) string {
var msg string
taskNames := map[string]string{}
var totalReplicas int32
if job.Spec.MinAvailable < 0 {
reviewResponse.Allowed = false
return "job 'minAvailable' must be >= 0."
}
if job.Spec.MaxRetry < 0 {
reviewResponse.Allowed = false
return "'maxRetry' cannot be less than zero."
}
if job.Spec.TTLSecondsAfterFinished != nil && *job.Spec.TTLSecondsAfterFinished < 0 {
reviewResponse.Allowed = false
return "'ttlSecondsAfterFinished' cannot be less than zero."
}
if len(job.Spec.Tasks) == 0 {
reviewResponse.Allowed = false
return "No task specified in job spec"
}
hasDependenciesBetweenTasks := false
for index, task := range job.Spec.Tasks {
if task.DependsOn != nil {
hasDependenciesBetweenTasks = true
}
if task.Replicas < 0 {
msg += fmt.Sprintf(" 'replicas' < 0 in task: %s;", task.Name)
}
if task.MinAvailable != nil && *task.MinAvailable > task.Replicas {
msg += fmt.Sprintf(" 'minAvailable' is greater than 'replicas' in task: %s, job: %s", task.Name, job.Name)
}
// count replicas
totalReplicas += task.Replicas
// validate task name
if errMsgs := validation.IsDNS1123Label(task.Name); len(errMsgs) > 0 {
msg += fmt.Sprintf(" %v;", errMsgs)
}
// duplicate task name
if _, found := taskNames[task.Name]; found {
msg += fmt.Sprintf(" duplicated task name %s;", task.Name)
break
} else {
taskNames[task.Name] = task.Name
}
if err := validatePolicies(task.Policies, field.NewPath("spec.tasks.policies")); err != nil {
msg += err.Error() + fmt.Sprintf(" valid events are %v, valid actions are %v",
getValidEvents(), getValidActions())
}
podName := jobhelpers.MakePodName(job.Name, task.Name, index)
msg += validateK8sPodNameLength(podName)
msg += validateTaskTemplate(task, job, index)
}
msg += validateJobName(job)
if totalReplicas < job.Spec.MinAvailable {
msg += "job 'minAvailable' should not be greater than total replicas in tasks;"
}
if err := validatePolicies(job.Spec.Policies, field.NewPath("spec.policies")); err != nil {
msg = msg + err.Error() + fmt.Sprintf(" valid events are %v, valid actions are %v;",
getValidEvents(), getValidActions())
}
// invalid job plugins
if len(job.Spec.Plugins) != 0 {
for name := range job.Spec.Plugins {
if _, found := plugins.GetPluginBuilder(name); !found {
msg += fmt.Sprintf(" unable to find job plugin: %s", name)
}
}
}
if err := validateIO(job.Spec.Volumes); err != nil {
msg += err.Error()
}
queue, err := config.VolcanoClient.SchedulingV1beta1().Queues().Get(context.TODO(), job.Spec.Queue, metav1.GetOptions{})
if err != nil {
msg += fmt.Sprintf(" unable to find job queue: %v", err)
} else if queue.Status.State != schedulingv1beta1.QueueStateOpen {
msg += fmt.Sprintf("can only submit job to queue with state `Open`, "+
"queue `%s` status is `%s`", queue.Name, queue.Status.State)
}
if hasDependenciesBetweenTasks {
_, isDag := topoSort(job)
if !isDag {
msg += "job has dependencies between tasks, but doesn't form a directed acyclic graph(DAG)"
}
}
if msg != "" {
reviewResponse.Allowed = false
}
return msg
}