in pkg/scheduler/plugins/nodeorder/nodeorder.go [151:346]
func (pp *nodeOrderPlugin) OnSessionOpen(ssn *framework.Session) {
weight := calculateWeight(pp.pluginArguments)
pl := util.NewPodListerFromNode(ssn)
nodeMap := util.GenerateNodeMapAndSlice(ssn.Nodes)
// Register event handlers to update task info in PodLister & nodeMap
ssn.AddEventHandler(&framework.EventHandler{
AllocateFunc: func(event *framework.Event) {
pod := pl.UpdateTask(event.Task, event.Task.NodeName)
nodeName := event.Task.NodeName
node, found := nodeMap[nodeName]
if !found {
klog.Warningf("node order, update pod %s/%s allocate to NOT EXIST node [%s]", pod.Namespace, pod.Name, nodeName)
} else {
node.AddPod(pod)
klog.V(4).Infof("node order, update pod %s/%s allocate to node [%s]", pod.Namespace, pod.Name, nodeName)
}
},
DeallocateFunc: func(event *framework.Event) {
pod := pl.UpdateTask(event.Task, "")
nodeName := event.Task.NodeName
node, found := nodeMap[nodeName]
if !found {
klog.Warningf("node order, update pod %s/%s allocate from NOT EXIST node [%s]", pod.Namespace, pod.Name, nodeName)
} else {
err := node.RemovePod(pod)
if err != nil {
klog.Errorf("Failed to update pod %s/%s and deallocate from node [%s]: %s", pod.Namespace, pod.Name, nodeName, err.Error())
} else {
klog.V(4).Infof("node order, update pod %s/%s deallocate from node [%s]", pod.Namespace, pod.Name, nodeName)
}
}
},
})
fts := feature.Features{
EnablePodAffinityNamespaceSelector: utilFeature.DefaultFeatureGate.Enabled(features.PodAffinityNamespaceSelector),
EnablePodDisruptionBudget: utilFeature.DefaultFeatureGate.Enabled(features.PodDisruptionBudget),
EnablePodOverhead: utilFeature.DefaultFeatureGate.Enabled(features.PodOverhead),
EnableReadWriteOncePod: utilFeature.DefaultFeatureGate.Enabled(features.ReadWriteOncePod),
EnableVolumeCapacityPriority: utilFeature.DefaultFeatureGate.Enabled(features.VolumeCapacityPriority),
EnableCSIStorageCapacity: utilFeature.DefaultFeatureGate.Enabled(features.CSIStorageCapacity),
}
// Initialize k8s scheduling plugins
handle := k8s.NewFrameworkHandle(nodeMap, ssn.KubeClient(), ssn.InformerFactory())
// 1. NodeResourcesLeastAllocated
leastAllocatedArgs := &config.NodeResourcesFitArgs{
ScoringStrategy: &config.ScoringStrategy{
Type: config.LeastAllocated,
Resources: []config.ResourceSpec{{Name: "cpu", Weight: 50}, {Name: "memory", Weight: 50}},
},
}
noderesources.NewFit(leastAllocatedArgs, handle, fts)
p, _ := noderesources.NewFit(leastAllocatedArgs, handle, fts)
leastAllocated := p.(*noderesources.Fit)
// 2. NodeResourcesMostAllocated
mostAllocatedArgs := &config.NodeResourcesFitArgs{
ScoringStrategy: &config.ScoringStrategy{
Type: config.MostAllocated,
Resources: []config.ResourceSpec{{Name: "cpu", Weight: 1}, {Name: "memory", Weight: 1}},
},
}
noderesources.NewFit(mostAllocatedArgs, handle, fts)
p, _ = noderesources.NewFit(mostAllocatedArgs, handle, fts)
mostAllocation := p.(*noderesources.Fit)
// 3. NodeResourcesBalancedAllocation
blArgs := &config.NodeResourcesBalancedAllocationArgs{
Resources: []config.ResourceSpec{
{Name: string(v1.ResourceCPU), Weight: 1},
{Name: string(v1.ResourceMemory), Weight: 1},
{Name: "nvidia.com/gpu", Weight: 1},
},
}
p, _ = noderesources.NewBalancedAllocation(blArgs, handle, fts)
balancedAllocation := p.(*noderesources.BalancedAllocation)
// 4. NodeAffinity
naArgs := &config.NodeAffinityArgs{
AddedAffinity: &v1.NodeAffinity{},
}
p, _ = nodeaffinity.New(naArgs, handle)
nodeAffinity := p.(*nodeaffinity.NodeAffinity)
// 5. ImageLocality
p, _ = imagelocality.New(nil, handle)
imageLocality := p.(*imagelocality.ImageLocality)
nodeOrderFn := func(task *api.TaskInfo, node *api.NodeInfo) (float64, error) {
var nodeScore = 0.0
state := k8sframework.NewCycleState()
if weight.imageLocalityWeight != 0 {
score, status := imageLocality.Score(context.TODO(), state, task.Pod, node.Name)
if !status.IsSuccess() {
klog.Warningf("Image Locality Priority Failed because of Error: %v", status.AsError())
return 0, status.AsError()
}
// If imageLocalityWeight is provided, host.Score is multiplied with weight, if not, host.Score is added to total score.
nodeScore += float64(score) * float64(weight.imageLocalityWeight)
}
// NodeResourcesLeastAllocated
if weight.leastReqWeight != 0 {
score, status := leastAllocated.Score(context.TODO(), state, task.Pod, node.Name)
if !status.IsSuccess() {
klog.Warningf("Least Allocated Priority Failed because of Error: %v", status.AsError())
return 0, status.AsError()
}
// If leastReqWeight is provided, host.Score is multiplied with weight, if not, host.Score is added to total score.
nodeScore += float64(score) * float64(weight.leastReqWeight)
}
// NodeResourcesMostAllocated
if weight.mostReqWeight != 0 {
score, status := mostAllocation.Score(context.TODO(), state, task.Pod, node.Name)
if !status.IsSuccess() {
klog.Warningf("Most Allocated Priority Failed because of Error: %v", status.AsError())
return 0, status.AsError()
}
// If mostRequestedWeight is provided, host.Score is multiplied with weight, it's 0 by default
nodeScore += float64(score) * float64(weight.mostReqWeight)
}
// NodeResourcesBalancedAllocation
if weight.balancedResourceWeight != 0 {
score, status := balancedAllocation.Score(context.TODO(), state, task.Pod, node.Name)
if !status.IsSuccess() {
klog.Warningf("Balanced Resource Allocation Priority Failed because of Error: %v", status.AsError())
return 0, status.AsError()
}
// If balancedResourceWeight is provided, host.Score is multiplied with weight, if not, host.Score is added to total score.
nodeScore += float64(score) * float64(weight.balancedResourceWeight)
}
// NodeAffinity
if weight.nodeAffinityWeight != 0 {
score, status := nodeAffinity.Score(context.TODO(), state, task.Pod, node.Name)
if !status.IsSuccess() {
klog.Warningf("Calculate Node Affinity Priority Failed because of Error: %v", status.AsError())
return 0, status.AsError()
}
// TODO: should we normalize the score
// If nodeAffinityWeight is provided, host.Score is multiplied with weight, if not, host.Score is added to total score.
nodeScore += float64(score) * float64(weight.nodeAffinityWeight)
}
klog.V(4).Infof("Total Score for task %s/%s on node %s is: %f", task.Namespace, task.Name, node.Name, nodeScore)
return nodeScore, nil
}
ssn.AddNodeOrderFn(pp.Name(), nodeOrderFn)
plArgs := &config.InterPodAffinityArgs{}
p, _ = interpodaffinity.New(plArgs, handle, fts)
interPodAffinity := p.(*interpodaffinity.InterPodAffinity)
p, _ = tainttoleration.New(nil, handle)
taintToleration := p.(*tainttoleration.TaintToleration)
batchNodeOrderFn := func(task *api.TaskInfo, nodeInfo []*api.NodeInfo) (map[string]float64, error) {
// InterPodAffinity
state := k8sframework.NewCycleState()
nodes := make([]*v1.Node, 0, len(nodeInfo))
for _, node := range nodeInfo {
nodes = append(nodes, node.Node)
}
nodeScores := make(map[string]float64, len(nodes))
podAffinityScores, podErr := interPodAffinityScore(interPodAffinity, state, task.Pod, nodes, weight.podAffinityWeight)
if podErr != nil {
return nil, podErr
}
nodeTolerationScores, err := taintTolerationScore(taintToleration, state, task.Pod, nodes, weight.taintTolerationWeight)
if err != nil {
return nil, err
}
for _, node := range nodes {
nodeScores[node.Name] = podAffinityScores[node.Name] + nodeTolerationScores[node.Name]
}
klog.V(4).Infof("Batch Total Score for task %s/%s is: %v", task.Namespace, task.Name, nodeScores)
return nodeScores, nil
}
ssn.AddBatchNodeOrderFn(pp.Name(), batchNodeOrderFn)
}