func()

in pkg/scheduler/plugins/numaaware/numaaware.go [79:187]


func (pp *numaPlugin) OnSessionOpen(ssn *framework.Session) {
	weight := calculateWeight(pp.pluginArguments)
	numaNodes := api.GenerateNumaNodes(ssn.Nodes)
	pp.nodeResSets = api.GenerateNodeResNumaSets(ssn.Nodes)

	ssn.AddEventHandler(&framework.EventHandler{
		AllocateFunc: func(event *framework.Event) {
			node := pp.nodeResSets[event.Task.NodeName]
			if _, ok := pp.assignRes[event.Task.UID]; !ok {
				return
			}

			resNumaSets, ok := pp.assignRes[event.Task.UID][event.Task.NodeName]
			if !ok {
				return
			}

			node.Allocate(resNumaSets)
			pp.taskBindNodeMap[event.Task.UID] = event.Task.NodeName
		},
		DeallocateFunc: func(event *framework.Event) {
			node := pp.nodeResSets[event.Task.NodeName]
			if _, ok := pp.assignRes[event.Task.UID]; !ok {
				return
			}

			resNumaSets, ok := pp.assignRes[event.Task.UID][event.Task.NodeName]
			if !ok {
				return
			}

			delete(pp.taskBindNodeMap, event.Task.UID)
			node.Release(resNumaSets)
		},
	})

	predicateFn := func(task *api.TaskInfo, node *api.NodeInfo) error {
		if v1qos.GetPodQOS(task.Pod) != v1.PodQOSGuaranteed {
			klog.V(3).Infof("task %s isn't Guaranteed pod", task.Name)
			return nil
		}

		if fit, err := filterNodeByPolicy(task, node, pp.nodeResSets); !fit {
			return err
		}

		resNumaSets := pp.nodeResSets[node.Name].Clone()

		taskPolicy := policy.GetPolicy(node, numaNodes[node.Name])
		allResAssignMap := make(map[string]cpuset.CPUSet)
		for _, container := range task.Pod.Spec.Containers {
			providersHints := policy.AccumulateProvidersHints(&container, node.NumaSchedulerInfo, resNumaSets, pp.hintProviders)
			hit, admit := taskPolicy.Predicate(providersHints)
			if !admit {
				return fmt.Errorf("plugin %s predicates failed for task %s container %s on node %s",
					pp.Name(), task.Name, container.Name, node.Name)
			}

			klog.V(4).Infof("[numaaware] hits for task %s container '%v': %v on node %s, besthit: %v",
				task.Name, container.Name, providersHints, node.Name, hit)
			resAssignMap := policy.Allocate(&container, &hit, node.NumaSchedulerInfo, resNumaSets, pp.hintProviders)
			for resName, assign := range resAssignMap {
				allResAssignMap[resName] = allResAssignMap[resName].Union(assign)
				resNumaSets[resName] = resNumaSets[resName].Difference(assign)
			}
		}

		pp.Lock()
		defer pp.Unlock()
		if _, ok := pp.assignRes[task.UID]; !ok {
			pp.assignRes[task.UID] = make(map[string]api.ResNumaSets)
		}

		pp.assignRes[task.UID][node.Name] = allResAssignMap

		klog.V(4).Infof(" task %s's on node<%s> resAssignMap: %v",
			task.Name, node.Name, pp.assignRes[task.UID][node.Name])

		return nil
	}

	ssn.AddPredicateFn(pp.Name(), predicateFn)

	batchNodeOrderFn := func(task *api.TaskInfo, nodeInfo []*api.NodeInfo) (map[string]float64, error) {
		nodeScores := make(map[string]float64, len(nodeInfo))
		if task.NumaInfo == nil || task.NumaInfo.Policy == "" || task.NumaInfo.Policy == "none" {
			return nodeScores, nil
		}

		if _, found := pp.assignRes[task.UID]; !found {
			return nodeScores, nil
		}

		scoreList := getNodeNumaNumForTask(nodeInfo, pp.assignRes[task.UID])
		util.NormalizeScore(api.DefaultMaxNodeScore, true, scoreList)

		for idx, scoreNode := range scoreList {
			scoreNode.Score *= int64(weight)
			nodeName := nodeInfo[idx].Name
			nodeScores[nodeName] = float64(scoreNode.Score)
		}

		klog.V(4).Infof("numa-aware plugin Score for task %s/%s is: %v",
			task.Namespace, task.Name, nodeScores)
		return nodeScores, nil
	}

	ssn.AddBatchNodeOrderFn(pp.Name(), batchNodeOrderFn)
}