pkg/scheduler/plugins/numaaware/numaaware.go (214 lines of code) (raw):

/* Copyright 2021 The Volcano Authors. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ package numaaware import ( "context" "fmt" "sync" v1 "k8s.io/api/core/v1" "k8s.io/client-go/util/workqueue" "k8s.io/klog" v1qos "k8s.io/kubernetes/pkg/apis/core/v1/helper/qos" "k8s.io/kubernetes/pkg/kubelet/cm/cpumanager/topology" "k8s.io/kubernetes/pkg/kubelet/cm/cpuset" "k8s.io/kubernetes/pkg/kubelet/cm/topologymanager/bitmask" nodeinfov1alpha1 "volcano.sh/apis/pkg/apis/nodeinfo/v1alpha1" "volcano.sh/volcano/pkg/scheduler/api" "volcano.sh/volcano/pkg/scheduler/framework" "volcano.sh/volcano/pkg/scheduler/plugins/numaaware/policy" "volcano.sh/volcano/pkg/scheduler/plugins/numaaware/provider/cpumanager" "volcano.sh/volcano/pkg/scheduler/plugins/util" ) const ( // PluginName indicates name of volcano scheduler plugin. PluginName = "numa-aware" // NumaTopoWeight indicates the weight of numa-aware plugin. NumaTopoWeight = "weight" ) type numaPlugin struct { sync.Mutex // Arguments given for the plugin pluginArguments framework.Arguments hintProviders []policy.HintProvider assignRes map[api.TaskID]map[string]api.ResNumaSets // map[taskUID]map[nodename][resourceName]cpuset.CPUSet nodeResSets map[string]api.ResNumaSets // map[nodename][resourceName]cpuset.CPUSet taskBindNodeMap map[api.TaskID]string } // New function returns prioritize plugin object. func New(arguments framework.Arguments) framework.Plugin { plugin := &numaPlugin{ pluginArguments: arguments, assignRes: make(map[api.TaskID]map[string]api.ResNumaSets), taskBindNodeMap: make(map[api.TaskID]string), } plugin.hintProviders = append(plugin.hintProviders, cpumanager.NewProvider()) return plugin } func (pp *numaPlugin) Name() string { return PluginName } func calculateWeight(args framework.Arguments) int { weight := 1 args.GetInt(&weight, NumaTopoWeight) return weight } func (pp *numaPlugin) OnSessionOpen(ssn *framework.Session) { weight := calculateWeight(pp.pluginArguments) numaNodes := api.GenerateNumaNodes(ssn.Nodes) pp.nodeResSets = api.GenerateNodeResNumaSets(ssn.Nodes) ssn.AddEventHandler(&framework.EventHandler{ AllocateFunc: func(event *framework.Event) { node := pp.nodeResSets[event.Task.NodeName] if _, ok := pp.assignRes[event.Task.UID]; !ok { return } resNumaSets, ok := pp.assignRes[event.Task.UID][event.Task.NodeName] if !ok { return } node.Allocate(resNumaSets) pp.taskBindNodeMap[event.Task.UID] = event.Task.NodeName }, DeallocateFunc: func(event *framework.Event) { node := pp.nodeResSets[event.Task.NodeName] if _, ok := pp.assignRes[event.Task.UID]; !ok { return } resNumaSets, ok := pp.assignRes[event.Task.UID][event.Task.NodeName] if !ok { return } delete(pp.taskBindNodeMap, event.Task.UID) node.Release(resNumaSets) }, }) predicateFn := func(task *api.TaskInfo, node *api.NodeInfo) error { if v1qos.GetPodQOS(task.Pod) != v1.PodQOSGuaranteed { klog.V(3).Infof("task %s isn't Guaranteed pod", task.Name) return nil } if fit, err := filterNodeByPolicy(task, node, pp.nodeResSets); !fit { return err } resNumaSets := pp.nodeResSets[node.Name].Clone() taskPolicy := policy.GetPolicy(node, numaNodes[node.Name]) allResAssignMap := make(map[string]cpuset.CPUSet) for _, container := range task.Pod.Spec.Containers { providersHints := policy.AccumulateProvidersHints(&container, node.NumaSchedulerInfo, resNumaSets, pp.hintProviders) hit, admit := taskPolicy.Predicate(providersHints) if !admit { return fmt.Errorf("plugin %s predicates failed for task %s container %s on node %s", pp.Name(), task.Name, container.Name, node.Name) } klog.V(4).Infof("[numaaware] hits for task %s container '%v': %v on node %s, besthit: %v", task.Name, container.Name, providersHints, node.Name, hit) resAssignMap := policy.Allocate(&container, &hit, node.NumaSchedulerInfo, resNumaSets, pp.hintProviders) for resName, assign := range resAssignMap { allResAssignMap[resName] = allResAssignMap[resName].Union(assign) resNumaSets[resName] = resNumaSets[resName].Difference(assign) } } pp.Lock() defer pp.Unlock() if _, ok := pp.assignRes[task.UID]; !ok { pp.assignRes[task.UID] = make(map[string]api.ResNumaSets) } pp.assignRes[task.UID][node.Name] = allResAssignMap klog.V(4).Infof(" task %s's on node<%s> resAssignMap: %v", task.Name, node.Name, pp.assignRes[task.UID][node.Name]) return nil } ssn.AddPredicateFn(pp.Name(), predicateFn) batchNodeOrderFn := func(task *api.TaskInfo, nodeInfo []*api.NodeInfo) (map[string]float64, error) { nodeScores := make(map[string]float64, len(nodeInfo)) if task.NumaInfo == nil || task.NumaInfo.Policy == "" || task.NumaInfo.Policy == "none" { return nodeScores, nil } if _, found := pp.assignRes[task.UID]; !found { return nodeScores, nil } scoreList := getNodeNumaNumForTask(nodeInfo, pp.assignRes[task.UID]) util.NormalizeScore(api.DefaultMaxNodeScore, true, scoreList) for idx, scoreNode := range scoreList { scoreNode.Score *= int64(weight) nodeName := nodeInfo[idx].Name nodeScores[nodeName] = float64(scoreNode.Score) } klog.V(4).Infof("numa-aware plugin Score for task %s/%s is: %v", task.Namespace, task.Name, nodeScores) return nodeScores, nil } ssn.AddBatchNodeOrderFn(pp.Name(), batchNodeOrderFn) } func filterNodeByPolicy(task *api.TaskInfo, node *api.NodeInfo, nodeResSets map[string]api.ResNumaSets) (fit bool, err error) { if !(task.NumaInfo == nil || task.NumaInfo.Policy == "" || task.NumaInfo.Policy == "none") { if node.NumaSchedulerInfo == nil { return false, fmt.Errorf("numa info is empty") } if node.NumaSchedulerInfo.Policies[nodeinfov1alpha1.CPUManagerPolicy] != "static" { return false, fmt.Errorf("cpu manager policy isn't static") } if task.NumaInfo.Policy != node.NumaSchedulerInfo.Policies[nodeinfov1alpha1.TopologyManagerPolicy] { return false, fmt.Errorf("task topology polocy[%s] is different with node[%s]", task.NumaInfo.Policy, node.NumaSchedulerInfo.Policies[nodeinfov1alpha1.TopologyManagerPolicy]) } if _, ok := nodeResSets[node.Name]; !ok { return false, fmt.Errorf("no topo information") } if nodeResSets[node.Name][string(v1.ResourceCPU)].Size() == 0 { return false, fmt.Errorf("cpu allocatable map is empty") } } else { if node.NumaSchedulerInfo == nil { return false, nil } if node.NumaSchedulerInfo.Policies[nodeinfov1alpha1.CPUManagerPolicy] != "static" { return false, nil } if (node.NumaSchedulerInfo.Policies[nodeinfov1alpha1.TopologyManagerPolicy] == "none") || (node.NumaSchedulerInfo.Policies[nodeinfov1alpha1.TopologyManagerPolicy] == "") { return false, nil } } return true, nil } func getNodeNumaNumForTask(nodeInfo []*api.NodeInfo, resAssignMap map[string]api.ResNumaSets) []api.ScoredNode { nodeNumaCnts := make([]api.ScoredNode, len(nodeInfo)) workqueue.ParallelizeUntil(context.TODO(), 16, len(nodeInfo), func(index int) { node := nodeInfo[index] assignCpus := resAssignMap[node.Name][string(v1.ResourceCPU)] nodeNumaCnts[index] = api.ScoredNode{ NodeName: node.Name, Score: int64(getNumaNodeCntForCPUID(assignCpus, node.NumaSchedulerInfo.CPUDetail)), } }) return nodeNumaCnts } func getNumaNodeCntForCPUID(cpus cpuset.CPUSet, cpuDetails topology.CPUDetails) int { mask, _ := bitmask.NewBitMask() s := cpus.ToSlice() for _, cpuID := range s { mask.Add(cpuDetails[cpuID].NUMANodeID) } return mask.Count() } func (pp *numaPlugin) OnSessionClose(ssn *framework.Session) { if len(pp.taskBindNodeMap) == 0 { return } allocatedResSet := make(map[string]api.ResNumaSets) for taskID, nodeName := range pp.taskBindNodeMap { if _, existed := pp.assignRes[taskID]; !existed { continue } if _, existed := pp.assignRes[taskID][nodeName]; !existed { continue } if _, existed := allocatedResSet[nodeName]; !existed { allocatedResSet[nodeName] = make(api.ResNumaSets) } resSet := pp.assignRes[taskID][nodeName] for resName, set := range resSet { if _, existed := allocatedResSet[nodeName][resName]; !existed { allocatedResSet[nodeName][resName] = cpuset.NewCPUSet() } allocatedResSet[nodeName][resName] = allocatedResSet[nodeName][resName].Union(set) } } klog.V(4).Infof("[numaPlugin]allocatedResSet: %v", allocatedResSet) ssn.UpdateSchedulerNumaInfo(allocatedResSet) }