pkg/scheduler/plugins/predicates/predicates.go (262 lines of code) (raw):

/* Copyright 2018 The Kubernetes Authors. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ package predicates import ( "context" "fmt" "strings" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" "k8s.io/klog" "k8s.io/kubernetes/pkg/scheduler/apis/config" k8sframework "k8s.io/kubernetes/pkg/scheduler/framework" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/feature" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/interpodaffinity" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/nodeaffinity" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/nodeports" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/nodeunschedulable" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/tainttoleration" "volcano.sh/volcano/pkg/scheduler/api" "volcano.sh/volcano/pkg/scheduler/framework" "volcano.sh/volcano/pkg/scheduler/plugins/util" "volcano.sh/volcano/pkg/scheduler/plugins/util/k8s" ) const ( // PluginName indicates name of volcano scheduler plugin. PluginName = "predicates" // GPUSharingPredicate is the key for enabling GPU Sharing Predicate in YAML GPUSharingPredicate = "predicate.GPUSharingEnable" // CachePredicate control cache predicate feature CachePredicate = "predicate.CacheEnable" // ProportionalPredicate is the key for enabling Proportional Predicate in YAML ProportionalPredicate = "predicate.ProportionalEnable" // ProportionalResource is the key for additional resource key name ProportionalResource = "predicate.resources" // ProportionalResourcesPrefix is the key prefix for additional resource key name ProportionalResourcesPrefix = ProportionalResource + "." ) type predicatesPlugin struct { // Arguments given for the plugin pluginArguments framework.Arguments } // New return predicate plugin func New(arguments framework.Arguments) framework.Plugin { return &predicatesPlugin{pluginArguments: arguments} } func (pp *predicatesPlugin) Name() string { return PluginName } type baseResource struct { CPU float64 Memory float64 } type predicateEnable struct { gpuSharingEnable bool cacheEnable bool proportionalEnable bool proportional map[v1.ResourceName]baseResource } func enablePredicate(args framework.Arguments) predicateEnable { /* User Should give predicatesEnable in this format(predicate.GPUSharingEnable). Currently supported only GPUSharing predicate checks. actions: "reclaim, allocate, backfill, preempt" tiers: - plugins: - name: priority - name: gang - name: conformance - plugins: - name: drf - name: predicates arguments: predicate.GPUSharingEnable: true predicate.CacheEnable: true predicate.ProportionalEnable: true predicate.resources: nvidia.com/gpu predicate.resources.nvidia.com/gpu.cpu: 4 predicate.resources.nvidia.com/gpu.memory: 8 - name: proportion - name: nodeorder */ predicate := predicateEnable{ gpuSharingEnable: false, cacheEnable: false, proportionalEnable: false, } // Checks whether predicate.GPUSharingEnable is provided or not, if given, modifies the value in predicateEnable struct. args.GetBool(&predicate.gpuSharingEnable, GPUSharingPredicate) args.GetBool(&predicate.cacheEnable, CachePredicate) // Checks whether predicate.ProportionalEnable is provided or not, if given, modifies the value in predicateEnable struct. args.GetBool(&predicate.proportionalEnable, ProportionalPredicate) resourcesProportional := make(map[v1.ResourceName]baseResource) resourcesStr, ok := args[ProportionalResource].(string) if !ok { resourcesStr = "" } resources := strings.Split(resourcesStr, ",") for _, resource := range resources { resource = strings.TrimSpace(resource) if resource == "" { continue } // proportional.resources.[ResourceName] cpuResourceKey := ProportionalResourcesPrefix + resource + ".cpu" cpuResourceRate := 1.0 args.GetFloat64(&cpuResourceRate, cpuResourceKey) if cpuResourceRate < 0 { cpuResourceRate = 1.0 } memoryResourceKey := ProportionalResourcesPrefix + resource + ".memory" memoryResourceRate := 1.0 args.GetFloat64(&memoryResourceRate, memoryResourceKey) if memoryResourceRate < 0 { memoryResourceRate = 1.0 } r := baseResource{ CPU: cpuResourceRate, Memory: memoryResourceRate, } resourcesProportional[v1.ResourceName(resource)] = r } predicate.proportional = resourcesProportional return predicate } func (pp *predicatesPlugin) OnSessionOpen(ssn *framework.Session) { pl := util.NewPodListerFromNode(ssn) nodeMap := util.GenerateNodeMapAndSlice(ssn.Nodes) pCache := predicateCacheNew() predicate := enablePredicate(pp.pluginArguments) kubeClient := ssn.KubeClient() // Register event handlers to update task info in PodLister & nodeMap ssn.AddEventHandler(&framework.EventHandler{ AllocateFunc: func(event *framework.Event) { pod := pl.UpdateTask(event.Task, event.Task.NodeName) nodeName := event.Task.NodeName node, found := nodeMap[nodeName] if !found { klog.Errorf("predicates, update pod %s/%s allocate to NOT EXIST node [%s]", pod.Namespace, pod.Name, nodeName) return } if predicate.gpuSharingEnable && api.GetGPUResourceOfPod(pod) > 0 { nodeInfo, ok := ssn.Nodes[nodeName] if !ok { klog.Errorf("Failed to get node %s info from cache", nodeName) return } id := predicateGPU(pod, nodeInfo) if id < 0 { klog.Errorf("The node %s can't place the pod %s in ns %s", pod.Spec.NodeName, pod.Name, pod.Namespace) return } dev, ok := nodeInfo.GPUDevices[id] if !ok { klog.Errorf("Failed to get GPU %d from node %s", id, nodeName) return } patch := api.AddGPUIndexPatch(id) pod, err := kubeClient.CoreV1().Pods(pod.Namespace).Patch(context.TODO(), pod.Name, types.JSONPatchType, []byte(patch), metav1.PatchOptions{}) if err != nil { klog.Errorf("Patch pod %s failed with patch %s: %v", pod.Name, patch, err) return } dev.PodMap[string(pod.UID)] = pod klog.V(4).Infof("predicates with gpu sharing, update pod %s/%s allocate to node [%s]", pod.Namespace, pod.Name, nodeName) } node.AddPod(pod) klog.V(4).Infof("predicates, update pod %s/%s allocate to node [%s]", pod.Namespace, pod.Name, nodeName) }, DeallocateFunc: func(event *framework.Event) { pod := pl.UpdateTask(event.Task, "") nodeName := event.Task.NodeName node, found := nodeMap[nodeName] if !found { klog.Errorf("predicates, update pod %s/%s allocate from NOT EXIST node [%s]", pod.Namespace, pod.Name, nodeName) return } if predicate.gpuSharingEnable && api.GetGPUResourceOfPod(pod) > 0 { // deallocate pod gpu id id := api.GetGPUIndex(pod) patch := api.RemoveGPUIndexPatch() _, err := kubeClient.CoreV1().Pods(pod.Namespace).Patch(context.TODO(), pod.Name, types.JSONPatchType, []byte(patch), metav1.PatchOptions{}) if err != nil { klog.Errorf("Patch pod %s failed with patch %s: %v", pod.Name, patch, err) return } nodeInfo, ok := ssn.Nodes[nodeName] if !ok { klog.Errorf("Failed to get node %s info from cache", nodeName) return } if dev, ok := nodeInfo.GPUDevices[id]; ok { delete(dev.PodMap, string(pod.UID)) } klog.V(4).Infof("predicates with gpu sharing, update pod %s/%s deallocate from node [%s]", pod.Namespace, pod.Name, nodeName) } err := node.RemovePod(pod) if err != nil { klog.Errorf("predicates, remove pod %s/%s from node [%s] error: %v", pod.Namespace, pod.Name, nodeName, err) return } klog.V(4).Infof("predicates, update pod %s/%s deallocate from node [%s]", pod.Namespace, pod.Name, nodeName) }, }) // Initialize k8s plugins // TODO: Add more predicates, k8s.io/kubernetes/pkg/scheduler/framework/plugins/legacy_registry.go handle := k8s.NewFrameworkHandle(nodeMap, ssn.KubeClient(), ssn.InformerFactory()) // 1. NodeUnschedulable plugin, _ := nodeunschedulable.New(nil, handle) nodeUnscheduleFilter := plugin.(*nodeunschedulable.NodeUnschedulable) // 2. NodeAffinity nodeAffinityArgs := config.NodeAffinityArgs{ AddedAffinity: &v1.NodeAffinity{}, } plugin, _ = nodeaffinity.New(&nodeAffinityArgs, handle) nodeAffinityFilter := plugin.(*nodeaffinity.NodeAffinity) // 3. NodePorts plugin, _ = nodeports.New(nil, handle) nodePortFilter := plugin.(*nodeports.NodePorts) // 4. TaintToleration plugin, _ = tainttoleration.New(nil, handle) tolerationFilter := plugin.(*tainttoleration.TaintToleration) // 5. InterPodAffinity plArgs := &config.InterPodAffinityArgs{} features := feature.Features{} plugin, _ = interpodaffinity.New(plArgs, handle, features) podAffinityFilter := plugin.(*interpodaffinity.InterPodAffinity) ssn.AddPredicateFn(pp.Name(), func(task *api.TaskInfo, node *api.NodeInfo) error { nodeInfo, found := nodeMap[node.Name] if !found { return fmt.Errorf("failed to predicates, node info for %s not found", node.Name) } if node.Allocatable.MaxTaskNum <= len(nodeInfo.Pods) { klog.V(4).Infof("NodePodNumber predicates Task <%s/%s> on Node <%s> failed", task.Namespace, task.Name, node.Name) return api.NewFitError(task, node, api.NodePodNumberExceeded) } state := k8sframework.NewCycleState() predicateByStablefilter := func(pod *v1.Pod, nodeInfo *k8sframework.NodeInfo) (bool, error) { // CheckNodeUnschedulable status := nodeUnscheduleFilter.Filter(context.TODO(), state, task.Pod, nodeInfo) if !status.IsSuccess() { return false, fmt.Errorf("plugin %s predicates failed %s", nodeunschedulable.Name, status.Message()) } // Check NodeAffinity status = nodeAffinityFilter.Filter(context.TODO(), state, task.Pod, nodeInfo) if !status.IsSuccess() { return false, fmt.Errorf("plugin %s predicates failed %s", nodeaffinity.Name, status.Message()) } // PodToleratesNodeTaints: TaintToleration status = tolerationFilter.Filter(context.TODO(), state, task.Pod, nodeInfo) if !status.IsSuccess() { return false, fmt.Errorf("plugin %s predicates failed %s", tainttoleration.Name, status.Message()) } return true, nil } // Check PredicateWithCache { var err error var fit bool if predicate.cacheEnable { fit, err = pCache.PredicateWithCache(node.Name, task.Pod) if err != nil { fit, err = predicateByStablefilter(task.Pod, nodeInfo) pCache.UpdateCache(node.Name, task.Pod, fit) } else { if !fit { err = fmt.Errorf("plugin equivalence cache predicates failed") } } } else { fit, err = predicateByStablefilter(task.Pod, nodeInfo) } if !fit { return err } } // Check NodePorts nodePortFilter.PreFilter(context.TODO(), state, task.Pod) status := nodePortFilter.Filter(context.TODO(), state, nil, nodeInfo) if !status.IsSuccess() { return fmt.Errorf("plugin %s predicates failed %s", nodeaffinity.Name, status.Message()) } // InterPodAffinity Predicate status = podAffinityFilter.PreFilter(context.TODO(), state, task.Pod) if !status.IsSuccess() { return fmt.Errorf("plugin %s pre-predicates failed %s", interpodaffinity.Name, status.Message()) } status = podAffinityFilter.Filter(context.TODO(), state, task.Pod, nodeInfo) if !status.IsSuccess() { return fmt.Errorf("plugin %s predicates failed %s", interpodaffinity.Name, status.Message()) } if predicate.gpuSharingEnable { // CheckGPUSharingPredicate fit, err := checkNodeGPUSharingPredicate(task.Pod, node) if err != nil { return err } klog.V(4).Infof("checkNodeGPUSharingPredicate predicates Task <%s/%s> on Node <%s>: fit %v", task.Namespace, task.Name, node.Name, fit) } if predicate.proportionalEnable { // Check ProportionalPredicate fit, err := checkNodeResourceIsProportional(task, node, predicate.proportional) if err != nil { return err } klog.V(4).Infof("checkNodeResourceIsProportional predicates Task <%s/%s> on Node <%s>: fit %v", task.Namespace, task.Name, node.Name, fit) } return nil }) } func (pp *predicatesPlugin) OnSessionClose(ssn *framework.Session) {}