func()

in pkg/scheduler/plugins/nodeorder/nodeorder.go [151:346]


func (pp *nodeOrderPlugin) OnSessionOpen(ssn *framework.Session) {
	weight := calculateWeight(pp.pluginArguments)
	pl := util.NewPodListerFromNode(ssn)
	nodeMap := util.GenerateNodeMapAndSlice(ssn.Nodes)

	// Register event handlers to update task info in PodLister & nodeMap
	ssn.AddEventHandler(&framework.EventHandler{
		AllocateFunc: func(event *framework.Event) {
			pod := pl.UpdateTask(event.Task, event.Task.NodeName)

			nodeName := event.Task.NodeName
			node, found := nodeMap[nodeName]
			if !found {
				klog.Warningf("node order, update pod %s/%s allocate to NOT EXIST node [%s]", pod.Namespace, pod.Name, nodeName)
			} else {
				node.AddPod(pod)
				klog.V(4).Infof("node order, update pod %s/%s allocate to node [%s]", pod.Namespace, pod.Name, nodeName)
			}
		},
		DeallocateFunc: func(event *framework.Event) {
			pod := pl.UpdateTask(event.Task, "")

			nodeName := event.Task.NodeName
			node, found := nodeMap[nodeName]
			if !found {
				klog.Warningf("node order, update pod %s/%s allocate from NOT EXIST node [%s]", pod.Namespace, pod.Name, nodeName)
			} else {
				err := node.RemovePod(pod)
				if err != nil {
					klog.Errorf("Failed to update pod %s/%s and deallocate from node [%s]: %s", pod.Namespace, pod.Name, nodeName, err.Error())
				} else {
					klog.V(4).Infof("node order, update pod %s/%s deallocate from node [%s]", pod.Namespace, pod.Name, nodeName)
				}
			}
		},
	})

	fts := feature.Features{
		EnablePodAffinityNamespaceSelector: utilFeature.DefaultFeatureGate.Enabled(features.PodAffinityNamespaceSelector),
		EnablePodDisruptionBudget:          utilFeature.DefaultFeatureGate.Enabled(features.PodDisruptionBudget),
		EnablePodOverhead:                  utilFeature.DefaultFeatureGate.Enabled(features.PodOverhead),
		EnableReadWriteOncePod:             utilFeature.DefaultFeatureGate.Enabled(features.ReadWriteOncePod),
		EnableVolumeCapacityPriority:       utilFeature.DefaultFeatureGate.Enabled(features.VolumeCapacityPriority),
		EnableCSIStorageCapacity:           utilFeature.DefaultFeatureGate.Enabled(features.CSIStorageCapacity),
	}

	// Initialize k8s scheduling plugins
	handle := k8s.NewFrameworkHandle(nodeMap, ssn.KubeClient(), ssn.InformerFactory())
	// 1. NodeResourcesLeastAllocated
	leastAllocatedArgs := &config.NodeResourcesFitArgs{
		ScoringStrategy: &config.ScoringStrategy{
			Type:      config.LeastAllocated,
			Resources: []config.ResourceSpec{{Name: "cpu", Weight: 50}, {Name: "memory", Weight: 50}},
		},
	}
	noderesources.NewFit(leastAllocatedArgs, handle, fts)
	p, _ := noderesources.NewFit(leastAllocatedArgs, handle, fts)
	leastAllocated := p.(*noderesources.Fit)

	// 2. NodeResourcesMostAllocated
	mostAllocatedArgs := &config.NodeResourcesFitArgs{
		ScoringStrategy: &config.ScoringStrategy{
			Type:      config.MostAllocated,
			Resources: []config.ResourceSpec{{Name: "cpu", Weight: 1}, {Name: "memory", Weight: 1}},
		},
	}
	noderesources.NewFit(mostAllocatedArgs, handle, fts)
	p, _ = noderesources.NewFit(mostAllocatedArgs, handle, fts)
	mostAllocation := p.(*noderesources.Fit)

	// 3. NodeResourcesBalancedAllocation
	blArgs := &config.NodeResourcesBalancedAllocationArgs{
		Resources: []config.ResourceSpec{
			{Name: string(v1.ResourceCPU), Weight: 1},
			{Name: string(v1.ResourceMemory), Weight: 1},
			{Name: "nvidia.com/gpu", Weight: 1},
		},
	}
	p, _ = noderesources.NewBalancedAllocation(blArgs, handle, fts)
	balancedAllocation := p.(*noderesources.BalancedAllocation)

	// 4. NodeAffinity
	naArgs := &config.NodeAffinityArgs{
		AddedAffinity: &v1.NodeAffinity{},
	}
	p, _ = nodeaffinity.New(naArgs, handle)
	nodeAffinity := p.(*nodeaffinity.NodeAffinity)

	// 5. ImageLocality
	p, _ = imagelocality.New(nil, handle)
	imageLocality := p.(*imagelocality.ImageLocality)

	nodeOrderFn := func(task *api.TaskInfo, node *api.NodeInfo) (float64, error) {
		var nodeScore = 0.0

		state := k8sframework.NewCycleState()
		if weight.imageLocalityWeight != 0 {
			score, status := imageLocality.Score(context.TODO(), state, task.Pod, node.Name)
			if !status.IsSuccess() {
				klog.Warningf("Image Locality Priority Failed because of Error: %v", status.AsError())
				return 0, status.AsError()
			}

			// If imageLocalityWeight is provided, host.Score is multiplied with weight, if not, host.Score is added to total score.
			nodeScore += float64(score) * float64(weight.imageLocalityWeight)
		}

		// NodeResourcesLeastAllocated
		if weight.leastReqWeight != 0 {
			score, status := leastAllocated.Score(context.TODO(), state, task.Pod, node.Name)
			if !status.IsSuccess() {
				klog.Warningf("Least Allocated Priority Failed because of Error: %v", status.AsError())
				return 0, status.AsError()
			}

			// If leastReqWeight is provided, host.Score is multiplied with weight, if not, host.Score is added to total score.
			nodeScore += float64(score) * float64(weight.leastReqWeight)
		}

		// NodeResourcesMostAllocated
		if weight.mostReqWeight != 0 {
			score, status := mostAllocation.Score(context.TODO(), state, task.Pod, node.Name)
			if !status.IsSuccess() {
				klog.Warningf("Most Allocated Priority Failed because of Error: %v", status.AsError())
				return 0, status.AsError()
			}

			// If mostRequestedWeight is provided, host.Score is multiplied with weight, it's 0 by default
			nodeScore += float64(score) * float64(weight.mostReqWeight)
		}

		// NodeResourcesBalancedAllocation
		if weight.balancedResourceWeight != 0 {
			score, status := balancedAllocation.Score(context.TODO(), state, task.Pod, node.Name)
			if !status.IsSuccess() {
				klog.Warningf("Balanced Resource Allocation Priority Failed because of Error: %v", status.AsError())
				return 0, status.AsError()
			}

			// If balancedResourceWeight is provided, host.Score is multiplied with weight, if not, host.Score is added to total score.
			nodeScore += float64(score) * float64(weight.balancedResourceWeight)
		}

		// NodeAffinity
		if weight.nodeAffinityWeight != 0 {
			score, status := nodeAffinity.Score(context.TODO(), state, task.Pod, node.Name)
			if !status.IsSuccess() {
				klog.Warningf("Calculate Node Affinity Priority Failed because of Error: %v", status.AsError())
				return 0, status.AsError()
			}

			// TODO: should we normalize the score
			// If nodeAffinityWeight is provided, host.Score is multiplied with weight, if not, host.Score is added to total score.
			nodeScore += float64(score) * float64(weight.nodeAffinityWeight)
		}

		klog.V(4).Infof("Total Score for task %s/%s on node %s is: %f", task.Namespace, task.Name, node.Name, nodeScore)
		return nodeScore, nil
	}
	ssn.AddNodeOrderFn(pp.Name(), nodeOrderFn)

	plArgs := &config.InterPodAffinityArgs{}
	p, _ = interpodaffinity.New(plArgs, handle, fts)
	interPodAffinity := p.(*interpodaffinity.InterPodAffinity)

	p, _ = tainttoleration.New(nil, handle)
	taintToleration := p.(*tainttoleration.TaintToleration)

	batchNodeOrderFn := func(task *api.TaskInfo, nodeInfo []*api.NodeInfo) (map[string]float64, error) {
		// InterPodAffinity
		state := k8sframework.NewCycleState()
		nodes := make([]*v1.Node, 0, len(nodeInfo))
		for _, node := range nodeInfo {
			nodes = append(nodes, node.Node)
		}
		nodeScores := make(map[string]float64, len(nodes))

		podAffinityScores, podErr := interPodAffinityScore(interPodAffinity, state, task.Pod, nodes, weight.podAffinityWeight)
		if podErr != nil {
			return nil, podErr
		}

		nodeTolerationScores, err := taintTolerationScore(taintToleration, state, task.Pod, nodes, weight.taintTolerationWeight)
		if err != nil {
			return nil, err
		}

		for _, node := range nodes {
			nodeScores[node.Name] = podAffinityScores[node.Name] + nodeTolerationScores[node.Name]
		}

		klog.V(4).Infof("Batch Total Score for task %s/%s is: %v", task.Namespace, task.Name, nodeScores)
		return nodeScores, nil
	}
	ssn.AddBatchNodeOrderFn(pp.Name(), batchNodeOrderFn)
}