in pkg/scheduler/plugins/numaaware/numaaware.go [79:187]
func (pp *numaPlugin) OnSessionOpen(ssn *framework.Session) {
weight := calculateWeight(pp.pluginArguments)
numaNodes := api.GenerateNumaNodes(ssn.Nodes)
pp.nodeResSets = api.GenerateNodeResNumaSets(ssn.Nodes)
ssn.AddEventHandler(&framework.EventHandler{
AllocateFunc: func(event *framework.Event) {
node := pp.nodeResSets[event.Task.NodeName]
if _, ok := pp.assignRes[event.Task.UID]; !ok {
return
}
resNumaSets, ok := pp.assignRes[event.Task.UID][event.Task.NodeName]
if !ok {
return
}
node.Allocate(resNumaSets)
pp.taskBindNodeMap[event.Task.UID] = event.Task.NodeName
},
DeallocateFunc: func(event *framework.Event) {
node := pp.nodeResSets[event.Task.NodeName]
if _, ok := pp.assignRes[event.Task.UID]; !ok {
return
}
resNumaSets, ok := pp.assignRes[event.Task.UID][event.Task.NodeName]
if !ok {
return
}
delete(pp.taskBindNodeMap, event.Task.UID)
node.Release(resNumaSets)
},
})
predicateFn := func(task *api.TaskInfo, node *api.NodeInfo) error {
if v1qos.GetPodQOS(task.Pod) != v1.PodQOSGuaranteed {
klog.V(3).Infof("task %s isn't Guaranteed pod", task.Name)
return nil
}
if fit, err := filterNodeByPolicy(task, node, pp.nodeResSets); !fit {
return err
}
resNumaSets := pp.nodeResSets[node.Name].Clone()
taskPolicy := policy.GetPolicy(node, numaNodes[node.Name])
allResAssignMap := make(map[string]cpuset.CPUSet)
for _, container := range task.Pod.Spec.Containers {
providersHints := policy.AccumulateProvidersHints(&container, node.NumaSchedulerInfo, resNumaSets, pp.hintProviders)
hit, admit := taskPolicy.Predicate(providersHints)
if !admit {
return fmt.Errorf("plugin %s predicates failed for task %s container %s on node %s",
pp.Name(), task.Name, container.Name, node.Name)
}
klog.V(4).Infof("[numaaware] hits for task %s container '%v': %v on node %s, besthit: %v",
task.Name, container.Name, providersHints, node.Name, hit)
resAssignMap := policy.Allocate(&container, &hit, node.NumaSchedulerInfo, resNumaSets, pp.hintProviders)
for resName, assign := range resAssignMap {
allResAssignMap[resName] = allResAssignMap[resName].Union(assign)
resNumaSets[resName] = resNumaSets[resName].Difference(assign)
}
}
pp.Lock()
defer pp.Unlock()
if _, ok := pp.assignRes[task.UID]; !ok {
pp.assignRes[task.UID] = make(map[string]api.ResNumaSets)
}
pp.assignRes[task.UID][node.Name] = allResAssignMap
klog.V(4).Infof(" task %s's on node<%s> resAssignMap: %v",
task.Name, node.Name, pp.assignRes[task.UID][node.Name])
return nil
}
ssn.AddPredicateFn(pp.Name(), predicateFn)
batchNodeOrderFn := func(task *api.TaskInfo, nodeInfo []*api.NodeInfo) (map[string]float64, error) {
nodeScores := make(map[string]float64, len(nodeInfo))
if task.NumaInfo == nil || task.NumaInfo.Policy == "" || task.NumaInfo.Policy == "none" {
return nodeScores, nil
}
if _, found := pp.assignRes[task.UID]; !found {
return nodeScores, nil
}
scoreList := getNodeNumaNumForTask(nodeInfo, pp.assignRes[task.UID])
util.NormalizeScore(api.DefaultMaxNodeScore, true, scoreList)
for idx, scoreNode := range scoreList {
scoreNode.Score *= int64(weight)
nodeName := nodeInfo[idx].Name
nodeScores[nodeName] = float64(scoreNode.Score)
}
klog.V(4).Infof("numa-aware plugin Score for task %s/%s is: %v",
task.Namespace, task.Name, nodeScores)
return nodeScores, nil
}
ssn.AddBatchNodeOrderFn(pp.Name(), batchNodeOrderFn)
}