pkg/webhooks/admission/jobs/validate/admit_job.go (286 lines of code) (raw):

/* Copyright 2018 The Volcano Authors. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ package validate import ( "context" "fmt" "strings" admissionv1 "k8s.io/api/admission/v1" whv1 "k8s.io/api/admissionregistration/v1" v1 "k8s.io/api/core/v1" apiequality "k8s.io/apimachinery/pkg/api/equality" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/validation" "k8s.io/apimachinery/pkg/util/validation/field" "k8s.io/klog" k8score "k8s.io/kubernetes/pkg/apis/core" k8scorev1 "k8s.io/kubernetes/pkg/apis/core/v1" v1qos "k8s.io/kubernetes/pkg/apis/core/v1/helper/qos" k8scorevalid "k8s.io/kubernetes/pkg/apis/core/validation" "volcano.sh/apis/pkg/apis/batch/v1alpha1" schedulingv1beta1 "volcano.sh/apis/pkg/apis/scheduling/v1beta1" jobhelpers "volcano.sh/volcano/pkg/controllers/job/helpers" "volcano.sh/volcano/pkg/controllers/job/plugins" "volcano.sh/volcano/pkg/webhooks/router" "volcano.sh/volcano/pkg/webhooks/schema" "volcano.sh/volcano/pkg/webhooks/util" ) func init() { router.RegisterAdmission(service) } var service = &router.AdmissionService{ Path: "/jobs/validate", Func: AdmitJobs, Config: config, ValidatingConfig: &whv1.ValidatingWebhookConfiguration{ Webhooks: []whv1.ValidatingWebhook{{ Name: "validatejob.volcano.sh", Rules: []whv1.RuleWithOperations{ { Operations: []whv1.OperationType{whv1.Create, whv1.Update}, Rule: whv1.Rule{ APIGroups: []string{"batch.volcano.sh"}, APIVersions: []string{"v1alpha1"}, Resources: []string{"jobs"}, }, }, }, }}, }, } var config = &router.AdmissionServiceConfig{} // AdmitJobs is to admit jobs and return response. func AdmitJobs(ar admissionv1.AdmissionReview) *admissionv1.AdmissionResponse { klog.V(3).Infof("admitting jobs -- %s", ar.Request.Operation) job, err := schema.DecodeJob(ar.Request.Object, ar.Request.Resource) if err != nil { return util.ToAdmissionResponse(err) } var msg string reviewResponse := admissionv1.AdmissionResponse{} reviewResponse.Allowed = true switch ar.Request.Operation { case admissionv1.Create: msg = validateJobCreate(job, &reviewResponse) case admissionv1.Update: oldJob, err := schema.DecodeJob(ar.Request.OldObject, ar.Request.Resource) if err != nil { return util.ToAdmissionResponse(err) } err = validateJobUpdate(oldJob, job) if err != nil { return util.ToAdmissionResponse(err) } default: err := fmt.Errorf("expect operation to be 'CREATE' or 'UPDATE'") return util.ToAdmissionResponse(err) } if !reviewResponse.Allowed { reviewResponse.Result = &metav1.Status{Message: strings.TrimSpace(msg)} } return &reviewResponse } func validateJobCreate(job *v1alpha1.Job, reviewResponse *admissionv1.AdmissionResponse) string { var msg string taskNames := map[string]string{} var totalReplicas int32 if job.Spec.MinAvailable < 0 { reviewResponse.Allowed = false return "job 'minAvailable' must be >= 0." } if job.Spec.MaxRetry < 0 { reviewResponse.Allowed = false return "'maxRetry' cannot be less than zero." } if job.Spec.TTLSecondsAfterFinished != nil && *job.Spec.TTLSecondsAfterFinished < 0 { reviewResponse.Allowed = false return "'ttlSecondsAfterFinished' cannot be less than zero." } if len(job.Spec.Tasks) == 0 { reviewResponse.Allowed = false return "No task specified in job spec" } hasDependenciesBetweenTasks := false for index, task := range job.Spec.Tasks { if task.DependsOn != nil { hasDependenciesBetweenTasks = true } if task.Replicas < 0 { msg += fmt.Sprintf(" 'replicas' < 0 in task: %s;", task.Name) } if task.MinAvailable != nil && *task.MinAvailable > task.Replicas { msg += fmt.Sprintf(" 'minAvailable' is greater than 'replicas' in task: %s, job: %s", task.Name, job.Name) } // count replicas totalReplicas += task.Replicas // validate task name if errMsgs := validation.IsDNS1123Label(task.Name); len(errMsgs) > 0 { msg += fmt.Sprintf(" %v;", errMsgs) } // duplicate task name if _, found := taskNames[task.Name]; found { msg += fmt.Sprintf(" duplicated task name %s;", task.Name) break } else { taskNames[task.Name] = task.Name } if err := validatePolicies(task.Policies, field.NewPath("spec.tasks.policies")); err != nil { msg += err.Error() + fmt.Sprintf(" valid events are %v, valid actions are %v", getValidEvents(), getValidActions()) } podName := jobhelpers.MakePodName(job.Name, task.Name, index) msg += validateK8sPodNameLength(podName) msg += validateTaskTemplate(task, job, index) } msg += validateJobName(job) if totalReplicas < job.Spec.MinAvailable { msg += "job 'minAvailable' should not be greater than total replicas in tasks;" } if err := validatePolicies(job.Spec.Policies, field.NewPath("spec.policies")); err != nil { msg = msg + err.Error() + fmt.Sprintf(" valid events are %v, valid actions are %v;", getValidEvents(), getValidActions()) } // invalid job plugins if len(job.Spec.Plugins) != 0 { for name := range job.Spec.Plugins { if _, found := plugins.GetPluginBuilder(name); !found { msg += fmt.Sprintf(" unable to find job plugin: %s", name) } } } if err := validateIO(job.Spec.Volumes); err != nil { msg += err.Error() } queue, err := config.VolcanoClient.SchedulingV1beta1().Queues().Get(context.TODO(), job.Spec.Queue, metav1.GetOptions{}) if err != nil { msg += fmt.Sprintf(" unable to find job queue: %v", err) } else if queue.Status.State != schedulingv1beta1.QueueStateOpen { msg += fmt.Sprintf("can only submit job to queue with state `Open`, "+ "queue `%s` status is `%s`", queue.Name, queue.Status.State) } if hasDependenciesBetweenTasks { _, isDag := topoSort(job) if !isDag { msg += "job has dependencies between tasks, but doesn't form a directed acyclic graph(DAG)" } } if msg != "" { reviewResponse.Allowed = false } return msg } func validateJobUpdate(old, new *v1alpha1.Job) error { var totalReplicas int32 for _, task := range new.Spec.Tasks { if task.Replicas < 0 { return fmt.Errorf("'replicas' must be >= 0 in task: %s", task.Name) } if task.MinAvailable != nil && *task.MinAvailable > task.Replicas { return fmt.Errorf("'minAvailable' must be <= 'replicas' in task: %s;", task.Name) } // count replicas totalReplicas += task.Replicas } if new.Spec.MinAvailable > totalReplicas { return fmt.Errorf("job 'minAvailable' must not be greater than total replicas") } if new.Spec.MinAvailable < 0 { return fmt.Errorf("job 'minAvailable' must be >= 0") } if len(old.Spec.Tasks) != len(new.Spec.Tasks) { return fmt.Errorf("job updates may not add or remove tasks") } // other fields under spec are not allowed to mutate new.Spec.MinAvailable = old.Spec.MinAvailable new.Spec.PriorityClassName = old.Spec.PriorityClassName for i := range new.Spec.Tasks { new.Spec.Tasks[i].Replicas = old.Spec.Tasks[i].Replicas new.Spec.Tasks[i].MinAvailable = old.Spec.Tasks[i].MinAvailable } // job controller will update the pvc name if not provided for i := range new.Spec.Volumes { if new.Spec.Volumes[i].VolumeClaim != nil { new.Spec.Volumes[i].VolumeClaimName = "" } } for i := range old.Spec.Volumes { if old.Spec.Volumes[i].VolumeClaim != nil { old.Spec.Volumes[i].VolumeClaimName = "" } } if !apiequality.Semantic.DeepEqual(new.Spec, old.Spec) { return fmt.Errorf("job updates may not change fields other than `minAvailable`, `tasks[*].replicas under spec`") } return nil } func validateTaskTemplate(task v1alpha1.TaskSpec, job *v1alpha1.Job, index int) string { var v1PodTemplate v1.PodTemplate v1PodTemplate.Template = *task.Template.DeepCopy() k8scorev1.SetObjectDefaults_PodTemplate(&v1PodTemplate) var coreTemplateSpec k8score.PodTemplateSpec k8scorev1.Convert_v1_PodTemplateSpec_To_core_PodTemplateSpec(&v1PodTemplate.Template, &coreTemplateSpec, nil) // Skip verify container SecurityContex.Privileged as it depends on // the kube-apiserver `allow-privileged` flag. for i, container := range coreTemplateSpec.Spec.Containers { if container.SecurityContext != nil && container.SecurityContext.Privileged != nil { coreTemplateSpec.Spec.Containers[i].SecurityContext.Privileged = nil } } corePodTemplate := k8score.PodTemplate{ ObjectMeta: metav1.ObjectMeta{ Name: task.Name, Namespace: job.Namespace, }, Template: coreTemplateSpec, } opts := k8scorevalid.PodValidationOptions{} if allErrs := k8scorevalid.ValidatePodTemplate(&corePodTemplate, opts); len(allErrs) > 0 { msg := fmt.Sprintf("spec.task[%d].", index) for index := range allErrs { msg += allErrs[index].Error() + ". " } return msg } msg := validateTaskTopoPolicy(task, index) if msg != "" { return msg } return "" } func validateK8sPodNameLength(podName string) string { if errMsgs := validation.IsQualifiedName(podName); len(errMsgs) > 0 { return fmt.Sprintf("create pod with name %s validate failed %v;", podName, errMsgs) } return "" } func validateJobName(job *v1alpha1.Job) string { if errMsgs := validation.IsQualifiedName(job.Name); len(errMsgs) > 0 { return fmt.Sprintf("create job with name %s validate failed %v", job.Name, errMsgs) } return "" } func validateTaskTopoPolicy(task v1alpha1.TaskSpec, index int) string { if task.TopologyPolicy == "" || task.TopologyPolicy == v1alpha1.None { return "" } template := task.Template.DeepCopy() for id, container := range template.Spec.Containers { if len(container.Resources.Requests) == 0 { template.Spec.Containers[id].Resources.Requests = container.Resources.Limits.DeepCopy() } } for id, container := range template.Spec.InitContainers { if len(container.Resources.Requests) == 0 { template.Spec.InitContainers[id].Resources.Requests = container.Resources.Limits.DeepCopy() } } pod := &v1.Pod{ Spec: template.Spec, } if v1qos.GetPodQOS(pod) != v1.PodQOSGuaranteed { return fmt.Sprintf("spec.task[%d] isn't Guaranteed pod, kind=%v", index, v1qos.GetPodQOS(pod)) } for id, container := range append(template.Spec.Containers, template.Spec.InitContainers...) { requestNum := guaranteedCPUs(container) if requestNum == 0 { return fmt.Sprintf("the cpu request isn't an integer in spec.task[%d] container[%d].", index, id) } } return "" } func guaranteedCPUs(container v1.Container) int { cpuQuantity := container.Resources.Requests[v1.ResourceCPU] if cpuQuantity.Value()*1000 != cpuQuantity.MilliValue() { return 0 } return int(cpuQuantity.Value()) }