pkg/webhooks/admission/jobs/validate/util.go (200 lines of code) (raw):
/*
Copyright 2018 The Volcano Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package validate
import (
"fmt"
"github.com/hashicorp/go-multierror"
"k8s.io/apimachinery/pkg/util/validation/field"
"k8s.io/kubernetes/pkg/apis/core/validation"
batchv1alpha1 "volcano.sh/apis/pkg/apis/batch/v1alpha1"
busv1alpha1 "volcano.sh/apis/pkg/apis/bus/v1alpha1"
)
// policyEventMap defines all policy events and whether to allow external use.
var policyEventMap = map[busv1alpha1.Event]bool{
busv1alpha1.AnyEvent: true,
busv1alpha1.PodFailedEvent: true,
busv1alpha1.PodEvictedEvent: true,
busv1alpha1.JobUnknownEvent: true,
busv1alpha1.TaskCompletedEvent: true,
busv1alpha1.TaskFailedEvent: true,
busv1alpha1.OutOfSyncEvent: false,
busv1alpha1.CommandIssuedEvent: false,
busv1alpha1.JobUpdatedEvent: true,
}
// policyActionMap defines all policy actions and whether to allow external use.
var policyActionMap = map[busv1alpha1.Action]bool{
busv1alpha1.AbortJobAction: true,
busv1alpha1.RestartJobAction: true,
busv1alpha1.RestartTaskAction: true,
busv1alpha1.TerminateJobAction: true,
busv1alpha1.CompleteJobAction: true,
busv1alpha1.ResumeJobAction: true,
busv1alpha1.SyncJobAction: false,
busv1alpha1.EnqueueAction: false,
busv1alpha1.SyncQueueAction: false,
busv1alpha1.OpenQueueAction: false,
busv1alpha1.CloseQueueAction: false,
}
func validatePolicies(policies []batchv1alpha1.LifecyclePolicy, fldPath *field.Path) error {
var err error
policyEvents := map[busv1alpha1.Event]struct{}{}
exitCodes := map[int32]struct{}{}
for _, policy := range policies {
if (policy.Event != "" || len(policy.Events) != 0) && policy.ExitCode != nil {
err = multierror.Append(err, fmt.Errorf("must not specify event and exitCode simultaneously"))
break
}
if policy.Event == "" && len(policy.Events) == 0 && policy.ExitCode == nil {
err = multierror.Append(err, fmt.Errorf("either event and exitCode should be specified"))
break
}
if len(policy.Event) != 0 || len(policy.Events) != 0 {
bFlag := false
policyEventsList := getEventList(policy)
for _, event := range policyEventsList {
if allow, ok := policyEventMap[event]; !ok || !allow {
err = multierror.Append(err, field.Invalid(fldPath, event, "invalid policy event"))
bFlag = true
break
}
if allow, ok := policyActionMap[policy.Action]; !ok || !allow {
err = multierror.Append(err, field.Invalid(fldPath, policy.Action, "invalid policy action"))
bFlag = true
break
}
if _, found := policyEvents[event]; found {
err = multierror.Append(err, fmt.Errorf("duplicate event %v across different policy", event))
bFlag = true
break
} else {
policyEvents[event] = struct{}{}
}
}
if bFlag {
break
}
} else {
if *policy.ExitCode == 0 {
err = multierror.Append(err, fmt.Errorf("0 is not a valid error code"))
break
}
if _, found := exitCodes[*policy.ExitCode]; found {
err = multierror.Append(err, fmt.Errorf("duplicate exitCode %v", *policy.ExitCode))
break
} else {
exitCodes[*policy.ExitCode] = struct{}{}
}
}
}
if _, found := policyEvents[busv1alpha1.AnyEvent]; found && len(policyEvents) > 1 {
err = multierror.Append(err, fmt.Errorf("if there's * here, no other policy should be here"))
}
return err
}
func getEventList(policy batchv1alpha1.LifecyclePolicy) []busv1alpha1.Event {
policyEventsList := policy.Events
if len(policy.Event) > 0 {
policyEventsList = append(policyEventsList, policy.Event)
}
uniquePolicyEventlist := removeDuplicates(policyEventsList)
return uniquePolicyEventlist
}
func removeDuplicates(eventList []busv1alpha1.Event) []busv1alpha1.Event {
keys := make(map[busv1alpha1.Event]bool)
list := []busv1alpha1.Event{}
for _, val := range eventList {
if _, value := keys[val]; !value {
keys[val] = true
list = append(list, val)
}
}
return list
}
func getValidEvents() []busv1alpha1.Event {
var events []busv1alpha1.Event
for e, allow := range policyEventMap {
if allow {
events = append(events, e)
}
}
return events
}
func getValidActions() []busv1alpha1.Action {
var actions []busv1alpha1.Action
for a, allow := range policyActionMap {
if allow {
actions = append(actions, a)
}
}
return actions
}
// validateIO validates IO configuration.
func validateIO(volumes []batchv1alpha1.VolumeSpec) error {
volumeMap := map[string]bool{}
for _, volume := range volumes {
if len(volume.MountPath) == 0 {
return fmt.Errorf(" mountPath is required;")
}
if _, found := volumeMap[volume.MountPath]; found {
return fmt.Errorf(" duplicated mountPath: %s;", volume.MountPath)
}
if volume.VolumeClaim == nil && volume.VolumeClaimName == "" {
return fmt.Errorf(" either VolumeClaim or VolumeClaimName must be specified;")
}
if len(volume.VolumeClaimName) != 0 {
if volume.VolumeClaim != nil {
return fmt.Errorf("conflict: If you want to use an existing PVC, just specify VolumeClaimName." +
"If you want to create a new PVC, you do not need to specify VolumeClaimName")
}
if errMsgs := validation.ValidatePersistentVolumeName(volume.VolumeClaimName, false); len(errMsgs) > 0 {
return fmt.Errorf("invalid VolumeClaimName %s : %v", volume.VolumeClaimName, errMsgs)
}
}
volumeMap[volume.MountPath] = true
}
return nil
}
// topoSort uses topo sort to sort job tasks based on dependsOn field
// it will return an array contains all sorted task names and a bool which indicates whether it's a valid dag
func topoSort(job *batchv1alpha1.Job) ([]string, bool) {
graph, inDegree, taskList := makeGraph(job)
var taskStack []string
for task, degree := range inDegree {
if degree == 0 {
taskStack = append(taskStack, task)
}
}
sortedTasks := make([]string, 0)
for len(taskStack) > 0 {
length := len(taskStack)
var out string
out, taskStack = taskStack[length-1], taskStack[:length-1]
sortedTasks = append(sortedTasks, out)
for in, connected := range graph[out] {
if connected {
graph[out][in] = false
inDegree[in]--
if inDegree[in] == 0 {
taskStack = append(taskStack, in)
}
}
}
}
isDag := len(sortedTasks) == len(taskList)
if !isDag {
return nil, false
}
return sortedTasks, isDag
}
func makeGraph(job *batchv1alpha1.Job) (map[string]map[string]bool, map[string]int, []string) {
graph := make(map[string]map[string]bool)
inDegree := make(map[string]int)
taskList := make([]string, 0)
for _, task := range job.Spec.Tasks {
taskList = append(taskList, task.Name)
inDegree[task.Name] = 0
if task.DependsOn != nil {
for _, dependOnTask := range task.DependsOn.Name {
if graph[dependOnTask] == nil {
graph[dependOnTask] = make(map[string]bool)
}
graph[dependOnTask][task.Name] = true
inDegree[task.Name]++
}
}
}
return graph, inDegree, taskList
}