pkg/webhooks/admission/jobs/mutate/mutate_job.go (180 lines of code) (raw):
/*
Copyright 2018 The Volcano Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package mutate
import (
"encoding/json"
"fmt"
"strconv"
admissionv1 "k8s.io/api/admission/v1"
whv1 "k8s.io/api/admissionregistration/v1"
v1 "k8s.io/api/core/v1"
"k8s.io/klog"
"volcano.sh/apis/pkg/apis/batch/v1alpha1"
"volcano.sh/volcano/pkg/webhooks/router"
"volcano.sh/volcano/pkg/webhooks/schema"
"volcano.sh/volcano/pkg/webhooks/util"
)
const (
// DefaultQueue constant stores the name of the queue as "default"
DefaultQueue = "default"
// DefaultMaxRetry is the default number of retries.
DefaultMaxRetry = 3
defaultSchedulerName = "volcano"
defaultMaxRetry int32 = 3
)
func init() {
router.RegisterAdmission(service)
}
var service = &router.AdmissionService{
Path: "/jobs/mutate",
Func: Jobs,
MutatingConfig: &whv1.MutatingWebhookConfiguration{
Webhooks: []whv1.MutatingWebhook{{
Name: "mutatejob.volcano.sh",
Rules: []whv1.RuleWithOperations{
{
Operations: []whv1.OperationType{whv1.Create},
Rule: whv1.Rule{
APIGroups: []string{"batch.volcano.sh"},
APIVersions: []string{"v1alpha1"},
Resources: []string{"jobs"},
},
},
},
}},
},
}
type patchOperation struct {
Op string `json:"op"`
Path string `json:"path"`
Value interface{} `json:"value,omitempty"`
}
// Jobs mutate jobs.
func Jobs(ar admissionv1.AdmissionReview) *admissionv1.AdmissionResponse {
klog.V(3).Infof("mutating jobs")
job, err := schema.DecodeJob(ar.Request.Object, ar.Request.Resource)
if err != nil {
return util.ToAdmissionResponse(err)
}
var patchBytes []byte
switch ar.Request.Operation {
case admissionv1.Create:
patchBytes, _ = createPatch(job)
default:
err = fmt.Errorf("expect operation to be 'CREATE' ")
return util.ToAdmissionResponse(err)
}
klog.V(3).Infof("AdmissionResponse: patch=%v", string(patchBytes))
reviewResponse := admissionv1.AdmissionResponse{
Allowed: true,
Patch: patchBytes,
}
pt := admissionv1.PatchTypeJSONPatch
reviewResponse.PatchType = &pt
return &reviewResponse
}
func createPatch(job *v1alpha1.Job) ([]byte, error) {
var patch []patchOperation
pathQueue := patchDefaultQueue(job)
if pathQueue != nil {
patch = append(patch, *pathQueue)
}
pathScheduler := patchDefaultScheduler(job)
if pathScheduler != nil {
patch = append(patch, *pathScheduler)
}
pathMaxRetry := patchDefaultMaxRetry(job)
if pathMaxRetry != nil {
patch = append(patch, *pathMaxRetry)
}
pathSpec := mutateSpec(job.Spec.Tasks, "/spec/tasks")
if pathSpec != nil {
patch = append(patch, *pathSpec)
}
pathMinAvailable := patchDefaultMinAvailable(job)
if pathMinAvailable != nil {
patch = append(patch, *pathMinAvailable)
}
// Add default plugins for some distributed-framework plugin cases
patchPlugins := patchDefaultPlugins(job)
if patchPlugins != nil {
patch = append(patch, *patchPlugins)
}
return json.Marshal(patch)
}
func patchDefaultQueue(job *v1alpha1.Job) *patchOperation {
//Add default queue if not specified.
if job.Spec.Queue == "" {
return &patchOperation{Op: "add", Path: "/spec/queue", Value: DefaultQueue}
}
return nil
}
func patchDefaultScheduler(job *v1alpha1.Job) *patchOperation {
// Add default scheduler name if not specified.
if job.Spec.SchedulerName == "" {
return &patchOperation{Op: "add", Path: "/spec/schedulerName", Value: defaultSchedulerName}
}
return nil
}
func patchDefaultMaxRetry(job *v1alpha1.Job) *patchOperation {
// Add default maxRetry if maxRetry is zero.
if job.Spec.MaxRetry == 0 {
return &patchOperation{Op: "add", Path: "/spec/maxRetry", Value: DefaultMaxRetry}
}
return nil
}
func patchDefaultMinAvailable(job *v1alpha1.Job) *patchOperation {
// Add default minAvailable if minAvailable is zero.
if job.Spec.MinAvailable == 0 {
var jobMinAvailable int32
for _, task := range job.Spec.Tasks {
if task.MinAvailable != nil {
jobMinAvailable += *task.MinAvailable
} else {
jobMinAvailable += task.Replicas
}
}
return &patchOperation{Op: "add", Path: "/spec/minAvailable", Value: jobMinAvailable}
}
return nil
}
func mutateSpec(tasks []v1alpha1.TaskSpec, basePath string) *patchOperation {
patched := false
for index := range tasks {
// add default task name
taskName := tasks[index].Name
if len(taskName) == 0 {
patched = true
tasks[index].Name = v1alpha1.DefaultTaskSpec + strconv.Itoa(index)
}
if tasks[index].Template.Spec.HostNetwork && tasks[index].Template.Spec.DNSPolicy == "" {
patched = true
tasks[index].Template.Spec.DNSPolicy = v1.DNSClusterFirstWithHostNet
}
if tasks[index].MinAvailable == nil {
patched = true
minAvailable := tasks[index].Replicas
tasks[index].MinAvailable = &minAvailable
}
if tasks[index].MaxRetry == 0 {
patched = true
tasks[index].MaxRetry = defaultMaxRetry
}
}
if !patched {
return nil
}
return &patchOperation{
Op: "replace",
Path: basePath,
Value: tasks,
}
}
func patchDefaultPlugins(job *v1alpha1.Job) *patchOperation {
if job.Spec.Plugins == nil {
return nil
}
plugins := map[string][]string{}
for k, v := range job.Spec.Plugins {
plugins[k] = v
}
// Because the tensorflow-plugin depends on svc-plugin.
// If the svc-plugin is not defined, we should add it.
if _, ok := job.Spec.Plugins["tensorflow"]; ok {
if _, ok := plugins["svc"]; !ok {
plugins["svc"] = []string{}
}
}
return &patchOperation{
Op: "replace",
Path: "/spec/plugins",
Value: plugins,
}
}