in pkg/scheduler/util/scheduler_helper.go [70:132]
func PrioritizeNodes(task *api.TaskInfo, nodes []*api.NodeInfo, batchFn api.BatchNodeOrderFn, mapFn api.NodeOrderMapFn, reduceFn api.NodeOrderReduceFn) map[float64][]*api.NodeInfo {
pluginNodeScoreMap := map[string]k8sframework.NodeScoreList{}
nodeOrderScoreMap := map[string]float64{}
nodeScores := map[float64][]*api.NodeInfo{}
var workerLock sync.Mutex
scoreNode := func(index int) {
node := nodes[index]
mapScores, orderScore, err := mapFn(task, node)
if err != nil {
klog.Errorf("Error in Calculating Priority for the node:%v", err)
return
}
workerLock.Lock()
for plugin, score := range mapScores {
nodeScoreMap, ok := pluginNodeScoreMap[plugin]
if !ok {
nodeScoreMap = k8sframework.NodeScoreList{}
}
hp := k8sframework.NodeScore{}
hp.Name = node.Name
hp.Score = int64(math.Floor(score))
pluginNodeScoreMap[plugin] = append(nodeScoreMap, hp)
}
nodeOrderScoreMap[node.Name] = orderScore
workerLock.Unlock()
}
workqueue.ParallelizeUntil(context.TODO(), 16, len(nodes), scoreNode)
reduceScores, err := reduceFn(task, pluginNodeScoreMap)
if err != nil {
klog.Errorf("Error in Calculating Priority for the node:%v", err)
return nodeScores
}
batchNodeScore, err := batchFn(task, nodes)
if err != nil {
klog.Errorf("Error in Calculating batch Priority for the node, err %v", err)
return nodeScores
}
for _, node := range nodes {
if score, found := reduceScores[node.Name]; found {
if orderScore, ok := nodeOrderScoreMap[node.Name]; ok {
score += orderScore
}
if batchScore, ok := batchNodeScore[node.Name]; ok {
score += batchScore
}
nodeScores[score] = append(nodeScores[score], node)
} else {
// If no plugin is applied to this node, the default is 0.0
score = 0.0
if orderScore, ok := nodeOrderScoreMap[node.Name]; ok {
score += orderScore
}
if batchScore, ok := batchNodeScore[node.Name]; ok {
score += batchScore
}
nodeScores[score] = append(nodeScores[score], node)
}
}
return nodeScores
}