func()

in pkg/scheduler/plugins/predicates/predicates.go [158:369]


func (pp *predicatesPlugin) OnSessionOpen(ssn *framework.Session) {
	pl := util.NewPodListerFromNode(ssn)
	nodeMap := util.GenerateNodeMapAndSlice(ssn.Nodes)

	pCache := predicateCacheNew()
	predicate := enablePredicate(pp.pluginArguments)

	kubeClient := ssn.KubeClient()
	// Register event handlers to update task info in PodLister & nodeMap
	ssn.AddEventHandler(&framework.EventHandler{
		AllocateFunc: func(event *framework.Event) {
			pod := pl.UpdateTask(event.Task, event.Task.NodeName)

			nodeName := event.Task.NodeName
			node, found := nodeMap[nodeName]
			if !found {
				klog.Errorf("predicates, update pod %s/%s allocate to NOT EXIST node [%s]", pod.Namespace, pod.Name, nodeName)
				return
			}

			if predicate.gpuSharingEnable && api.GetGPUResourceOfPod(pod) > 0 {
				nodeInfo, ok := ssn.Nodes[nodeName]
				if !ok {
					klog.Errorf("Failed to get node %s info from cache", nodeName)
					return
				}

				id := predicateGPU(pod, nodeInfo)
				if id < 0 {
					klog.Errorf("The node %s can't place the pod %s in ns %s", pod.Spec.NodeName, pod.Name, pod.Namespace)
					return
				}
				dev, ok := nodeInfo.GPUDevices[id]
				if !ok {
					klog.Errorf("Failed to get GPU %d from node %s", id, nodeName)
					return
				}
				patch := api.AddGPUIndexPatch(id)
				pod, err := kubeClient.CoreV1().Pods(pod.Namespace).Patch(context.TODO(), pod.Name, types.JSONPatchType, []byte(patch), metav1.PatchOptions{})
				if err != nil {
					klog.Errorf("Patch pod %s failed with patch %s: %v", pod.Name, patch, err)
					return
				}
				dev.PodMap[string(pod.UID)] = pod
				klog.V(4).Infof("predicates with gpu sharing, update pod %s/%s allocate to node [%s]", pod.Namespace, pod.Name, nodeName)
			}

			node.AddPod(pod)
			klog.V(4).Infof("predicates, update pod %s/%s allocate to node [%s]", pod.Namespace, pod.Name, nodeName)
		},
		DeallocateFunc: func(event *framework.Event) {
			pod := pl.UpdateTask(event.Task, "")
			nodeName := event.Task.NodeName
			node, found := nodeMap[nodeName]
			if !found {
				klog.Errorf("predicates, update pod %s/%s allocate from NOT EXIST node [%s]", pod.Namespace, pod.Name, nodeName)
				return
			}

			if predicate.gpuSharingEnable && api.GetGPUResourceOfPod(pod) > 0 {
				// deallocate pod gpu id
				id := api.GetGPUIndex(pod)
				patch := api.RemoveGPUIndexPatch()
				_, err := kubeClient.CoreV1().Pods(pod.Namespace).Patch(context.TODO(), pod.Name, types.JSONPatchType, []byte(patch), metav1.PatchOptions{})
				if err != nil {
					klog.Errorf("Patch pod %s failed with patch %s: %v", pod.Name, patch, err)
					return
				}

				nodeInfo, ok := ssn.Nodes[nodeName]
				if !ok {
					klog.Errorf("Failed to get node %s info from cache", nodeName)
					return
				}
				if dev, ok := nodeInfo.GPUDevices[id]; ok {
					delete(dev.PodMap, string(pod.UID))
				}

				klog.V(4).Infof("predicates with gpu sharing, update pod %s/%s deallocate from node [%s]", pod.Namespace, pod.Name, nodeName)
			}

			err := node.RemovePod(pod)
			if err != nil {
				klog.Errorf("predicates, remove pod %s/%s from node [%s] error: %v", pod.Namespace, pod.Name, nodeName, err)
				return
			}
			klog.V(4).Infof("predicates, update pod %s/%s deallocate from node [%s]", pod.Namespace, pod.Name, nodeName)
		},
	})

	// Initialize k8s plugins
	// TODO: Add more predicates, k8s.io/kubernetes/pkg/scheduler/framework/plugins/legacy_registry.go
	handle := k8s.NewFrameworkHandle(nodeMap, ssn.KubeClient(), ssn.InformerFactory())
	// 1. NodeUnschedulable
	plugin, _ := nodeunschedulable.New(nil, handle)
	nodeUnscheduleFilter := plugin.(*nodeunschedulable.NodeUnschedulable)
	// 2. NodeAffinity
	nodeAffinityArgs := config.NodeAffinityArgs{
		AddedAffinity: &v1.NodeAffinity{},
	}
	plugin, _ = nodeaffinity.New(&nodeAffinityArgs, handle)
	nodeAffinityFilter := plugin.(*nodeaffinity.NodeAffinity)
	// 3. NodePorts
	plugin, _ = nodeports.New(nil, handle)
	nodePortFilter := plugin.(*nodeports.NodePorts)
	// 4. TaintToleration
	plugin, _ = tainttoleration.New(nil, handle)
	tolerationFilter := plugin.(*tainttoleration.TaintToleration)
	// 5. InterPodAffinity
	plArgs := &config.InterPodAffinityArgs{}
	features := feature.Features{}
	plugin, _ = interpodaffinity.New(plArgs, handle, features)
	podAffinityFilter := plugin.(*interpodaffinity.InterPodAffinity)

	ssn.AddPredicateFn(pp.Name(), func(task *api.TaskInfo, node *api.NodeInfo) error {
		nodeInfo, found := nodeMap[node.Name]
		if !found {
			return fmt.Errorf("failed to predicates, node info for %s not found", node.Name)
		}

		if node.Allocatable.MaxTaskNum <= len(nodeInfo.Pods) {
			klog.V(4).Infof("NodePodNumber predicates Task <%s/%s> on Node <%s> failed",
				task.Namespace, task.Name, node.Name)
			return api.NewFitError(task, node, api.NodePodNumberExceeded)
		}

		state := k8sframework.NewCycleState()
		predicateByStablefilter := func(pod *v1.Pod, nodeInfo *k8sframework.NodeInfo) (bool, error) {
			// CheckNodeUnschedulable
			status := nodeUnscheduleFilter.Filter(context.TODO(), state, task.Pod, nodeInfo)
			if !status.IsSuccess() {
				return false, fmt.Errorf("plugin %s predicates failed %s", nodeunschedulable.Name, status.Message())
			}

			// Check NodeAffinity
			status = nodeAffinityFilter.Filter(context.TODO(), state, task.Pod, nodeInfo)
			if !status.IsSuccess() {
				return false, fmt.Errorf("plugin %s predicates failed %s", nodeaffinity.Name, status.Message())
			}

			// PodToleratesNodeTaints: TaintToleration
			status = tolerationFilter.Filter(context.TODO(), state, task.Pod, nodeInfo)
			if !status.IsSuccess() {
				return false, fmt.Errorf("plugin %s predicates failed %s", tainttoleration.Name, status.Message())
			}

			return true, nil
		}

		// Check PredicateWithCache
		{
			var err error
			var fit bool
			if predicate.cacheEnable {
				fit, err = pCache.PredicateWithCache(node.Name, task.Pod)
				if err != nil {
					fit, err = predicateByStablefilter(task.Pod, nodeInfo)
					pCache.UpdateCache(node.Name, task.Pod, fit)
				} else {
					if !fit {
						err = fmt.Errorf("plugin equivalence cache predicates failed")
					}
				}
			} else {
				fit, err = predicateByStablefilter(task.Pod, nodeInfo)
			}

			if !fit {
				return err
			}
		}

		// Check NodePorts
		nodePortFilter.PreFilter(context.TODO(), state, task.Pod)
		status := nodePortFilter.Filter(context.TODO(), state, nil, nodeInfo)
		if !status.IsSuccess() {
			return fmt.Errorf("plugin %s predicates failed %s", nodeaffinity.Name, status.Message())
		}

		// InterPodAffinity Predicate
		status = podAffinityFilter.PreFilter(context.TODO(), state, task.Pod)
		if !status.IsSuccess() {
			return fmt.Errorf("plugin %s pre-predicates failed %s", interpodaffinity.Name, status.Message())
		}

		status = podAffinityFilter.Filter(context.TODO(), state, task.Pod, nodeInfo)
		if !status.IsSuccess() {
			return fmt.Errorf("plugin %s predicates failed %s", interpodaffinity.Name, status.Message())
		}

		if predicate.gpuSharingEnable {
			// CheckGPUSharingPredicate
			fit, err := checkNodeGPUSharingPredicate(task.Pod, node)
			if err != nil {
				return err
			}

			klog.V(4).Infof("checkNodeGPUSharingPredicate predicates Task <%s/%s> on Node <%s>: fit %v",
				task.Namespace, task.Name, node.Name, fit)
		}
		if predicate.proportionalEnable {
			// Check ProportionalPredicate
			fit, err := checkNodeResourceIsProportional(task, node, predicate.proportional)
			if err != nil {
				return err
			}
			klog.V(4).Infof("checkNodeResourceIsProportional predicates Task <%s/%s> on Node <%s>: fit %v",
				task.Namespace, task.Name, node.Name, fit)
		}
		return nil
	})
}