func validateJobCreate()

in pkg/webhooks/admission/jobs/validate/admit_job.go [110:218]


func validateJobCreate(job *v1alpha1.Job, reviewResponse *admissionv1.AdmissionResponse) string {
	var msg string
	taskNames := map[string]string{}
	var totalReplicas int32

	if job.Spec.MinAvailable < 0 {
		reviewResponse.Allowed = false
		return "job 'minAvailable' must be >= 0."
	}

	if job.Spec.MaxRetry < 0 {
		reviewResponse.Allowed = false
		return "'maxRetry' cannot be less than zero."
	}

	if job.Spec.TTLSecondsAfterFinished != nil && *job.Spec.TTLSecondsAfterFinished < 0 {
		reviewResponse.Allowed = false
		return "'ttlSecondsAfterFinished' cannot be less than zero."
	}

	if len(job.Spec.Tasks) == 0 {
		reviewResponse.Allowed = false
		return "No task specified in job spec"
	}

	hasDependenciesBetweenTasks := false
	for index, task := range job.Spec.Tasks {
		if task.DependsOn != nil {
			hasDependenciesBetweenTasks = true
		}

		if task.Replicas < 0 {
			msg += fmt.Sprintf(" 'replicas' < 0 in task: %s;", task.Name)
		}

		if task.MinAvailable != nil && *task.MinAvailable > task.Replicas {
			msg += fmt.Sprintf(" 'minAvailable' is greater than 'replicas' in task: %s, job: %s", task.Name, job.Name)
		}

		// count replicas
		totalReplicas += task.Replicas

		// validate task name
		if errMsgs := validation.IsDNS1123Label(task.Name); len(errMsgs) > 0 {
			msg += fmt.Sprintf(" %v;", errMsgs)
		}

		// duplicate task name
		if _, found := taskNames[task.Name]; found {
			msg += fmt.Sprintf(" duplicated task name %s;", task.Name)
			break
		} else {
			taskNames[task.Name] = task.Name
		}

		if err := validatePolicies(task.Policies, field.NewPath("spec.tasks.policies")); err != nil {
			msg += err.Error() + fmt.Sprintf(" valid events are %v, valid actions are %v",
				getValidEvents(), getValidActions())
		}
		podName := jobhelpers.MakePodName(job.Name, task.Name, index)
		msg += validateK8sPodNameLength(podName)
		msg += validateTaskTemplate(task, job, index)
	}

	msg += validateJobName(job)

	if totalReplicas < job.Spec.MinAvailable {
		msg += "job 'minAvailable' should not be greater than total replicas in tasks;"
	}

	if err := validatePolicies(job.Spec.Policies, field.NewPath("spec.policies")); err != nil {
		msg = msg + err.Error() + fmt.Sprintf(" valid events are %v, valid actions are %v;",
			getValidEvents(), getValidActions())
	}

	// invalid job plugins
	if len(job.Spec.Plugins) != 0 {
		for name := range job.Spec.Plugins {
			if _, found := plugins.GetPluginBuilder(name); !found {
				msg += fmt.Sprintf(" unable to find job plugin: %s", name)
			}
		}
	}

	if err := validateIO(job.Spec.Volumes); err != nil {
		msg += err.Error()
	}

	queue, err := config.VolcanoClient.SchedulingV1beta1().Queues().Get(context.TODO(), job.Spec.Queue, metav1.GetOptions{})
	if err != nil {
		msg += fmt.Sprintf(" unable to find job queue: %v", err)
	} else if queue.Status.State != schedulingv1beta1.QueueStateOpen {
		msg += fmt.Sprintf("can only submit job to queue with state `Open`, "+
			"queue `%s` status is `%s`", queue.Name, queue.Status.State)
	}

	if hasDependenciesBetweenTasks {
		_, isDag := topoSort(job)
		if !isDag {
			msg += "job has dependencies between tasks, but doesn't form a directed acyclic graph(DAG)"
		}
	}

	if msg != "" {
		reviewResponse.Allowed = false
	}

	return msg
}