in pkg/scheduler/plugins/predicates/predicates.go [158:369]
func (pp *predicatesPlugin) OnSessionOpen(ssn *framework.Session) {
pl := util.NewPodListerFromNode(ssn)
nodeMap := util.GenerateNodeMapAndSlice(ssn.Nodes)
pCache := predicateCacheNew()
predicate := enablePredicate(pp.pluginArguments)
kubeClient := ssn.KubeClient()
// Register event handlers to update task info in PodLister & nodeMap
ssn.AddEventHandler(&framework.EventHandler{
AllocateFunc: func(event *framework.Event) {
pod := pl.UpdateTask(event.Task, event.Task.NodeName)
nodeName := event.Task.NodeName
node, found := nodeMap[nodeName]
if !found {
klog.Errorf("predicates, update pod %s/%s allocate to NOT EXIST node [%s]", pod.Namespace, pod.Name, nodeName)
return
}
if predicate.gpuSharingEnable && api.GetGPUResourceOfPod(pod) > 0 {
nodeInfo, ok := ssn.Nodes[nodeName]
if !ok {
klog.Errorf("Failed to get node %s info from cache", nodeName)
return
}
id := predicateGPU(pod, nodeInfo)
if id < 0 {
klog.Errorf("The node %s can't place the pod %s in ns %s", pod.Spec.NodeName, pod.Name, pod.Namespace)
return
}
dev, ok := nodeInfo.GPUDevices[id]
if !ok {
klog.Errorf("Failed to get GPU %d from node %s", id, nodeName)
return
}
patch := api.AddGPUIndexPatch(id)
pod, err := kubeClient.CoreV1().Pods(pod.Namespace).Patch(context.TODO(), pod.Name, types.JSONPatchType, []byte(patch), metav1.PatchOptions{})
if err != nil {
klog.Errorf("Patch pod %s failed with patch %s: %v", pod.Name, patch, err)
return
}
dev.PodMap[string(pod.UID)] = pod
klog.V(4).Infof("predicates with gpu sharing, update pod %s/%s allocate to node [%s]", pod.Namespace, pod.Name, nodeName)
}
node.AddPod(pod)
klog.V(4).Infof("predicates, update pod %s/%s allocate to node [%s]", pod.Namespace, pod.Name, nodeName)
},
DeallocateFunc: func(event *framework.Event) {
pod := pl.UpdateTask(event.Task, "")
nodeName := event.Task.NodeName
node, found := nodeMap[nodeName]
if !found {
klog.Errorf("predicates, update pod %s/%s allocate from NOT EXIST node [%s]", pod.Namespace, pod.Name, nodeName)
return
}
if predicate.gpuSharingEnable && api.GetGPUResourceOfPod(pod) > 0 {
// deallocate pod gpu id
id := api.GetGPUIndex(pod)
patch := api.RemoveGPUIndexPatch()
_, err := kubeClient.CoreV1().Pods(pod.Namespace).Patch(context.TODO(), pod.Name, types.JSONPatchType, []byte(patch), metav1.PatchOptions{})
if err != nil {
klog.Errorf("Patch pod %s failed with patch %s: %v", pod.Name, patch, err)
return
}
nodeInfo, ok := ssn.Nodes[nodeName]
if !ok {
klog.Errorf("Failed to get node %s info from cache", nodeName)
return
}
if dev, ok := nodeInfo.GPUDevices[id]; ok {
delete(dev.PodMap, string(pod.UID))
}
klog.V(4).Infof("predicates with gpu sharing, update pod %s/%s deallocate from node [%s]", pod.Namespace, pod.Name, nodeName)
}
err := node.RemovePod(pod)
if err != nil {
klog.Errorf("predicates, remove pod %s/%s from node [%s] error: %v", pod.Namespace, pod.Name, nodeName, err)
return
}
klog.V(4).Infof("predicates, update pod %s/%s deallocate from node [%s]", pod.Namespace, pod.Name, nodeName)
},
})
// Initialize k8s plugins
// TODO: Add more predicates, k8s.io/kubernetes/pkg/scheduler/framework/plugins/legacy_registry.go
handle := k8s.NewFrameworkHandle(nodeMap, ssn.KubeClient(), ssn.InformerFactory())
// 1. NodeUnschedulable
plugin, _ := nodeunschedulable.New(nil, handle)
nodeUnscheduleFilter := plugin.(*nodeunschedulable.NodeUnschedulable)
// 2. NodeAffinity
nodeAffinityArgs := config.NodeAffinityArgs{
AddedAffinity: &v1.NodeAffinity{},
}
plugin, _ = nodeaffinity.New(&nodeAffinityArgs, handle)
nodeAffinityFilter := plugin.(*nodeaffinity.NodeAffinity)
// 3. NodePorts
plugin, _ = nodeports.New(nil, handle)
nodePortFilter := plugin.(*nodeports.NodePorts)
// 4. TaintToleration
plugin, _ = tainttoleration.New(nil, handle)
tolerationFilter := plugin.(*tainttoleration.TaintToleration)
// 5. InterPodAffinity
plArgs := &config.InterPodAffinityArgs{}
features := feature.Features{}
plugin, _ = interpodaffinity.New(plArgs, handle, features)
podAffinityFilter := plugin.(*interpodaffinity.InterPodAffinity)
ssn.AddPredicateFn(pp.Name(), func(task *api.TaskInfo, node *api.NodeInfo) error {
nodeInfo, found := nodeMap[node.Name]
if !found {
return fmt.Errorf("failed to predicates, node info for %s not found", node.Name)
}
if node.Allocatable.MaxTaskNum <= len(nodeInfo.Pods) {
klog.V(4).Infof("NodePodNumber predicates Task <%s/%s> on Node <%s> failed",
task.Namespace, task.Name, node.Name)
return api.NewFitError(task, node, api.NodePodNumberExceeded)
}
state := k8sframework.NewCycleState()
predicateByStablefilter := func(pod *v1.Pod, nodeInfo *k8sframework.NodeInfo) (bool, error) {
// CheckNodeUnschedulable
status := nodeUnscheduleFilter.Filter(context.TODO(), state, task.Pod, nodeInfo)
if !status.IsSuccess() {
return false, fmt.Errorf("plugin %s predicates failed %s", nodeunschedulable.Name, status.Message())
}
// Check NodeAffinity
status = nodeAffinityFilter.Filter(context.TODO(), state, task.Pod, nodeInfo)
if !status.IsSuccess() {
return false, fmt.Errorf("plugin %s predicates failed %s", nodeaffinity.Name, status.Message())
}
// PodToleratesNodeTaints: TaintToleration
status = tolerationFilter.Filter(context.TODO(), state, task.Pod, nodeInfo)
if !status.IsSuccess() {
return false, fmt.Errorf("plugin %s predicates failed %s", tainttoleration.Name, status.Message())
}
return true, nil
}
// Check PredicateWithCache
{
var err error
var fit bool
if predicate.cacheEnable {
fit, err = pCache.PredicateWithCache(node.Name, task.Pod)
if err != nil {
fit, err = predicateByStablefilter(task.Pod, nodeInfo)
pCache.UpdateCache(node.Name, task.Pod, fit)
} else {
if !fit {
err = fmt.Errorf("plugin equivalence cache predicates failed")
}
}
} else {
fit, err = predicateByStablefilter(task.Pod, nodeInfo)
}
if !fit {
return err
}
}
// Check NodePorts
nodePortFilter.PreFilter(context.TODO(), state, task.Pod)
status := nodePortFilter.Filter(context.TODO(), state, nil, nodeInfo)
if !status.IsSuccess() {
return fmt.Errorf("plugin %s predicates failed %s", nodeaffinity.Name, status.Message())
}
// InterPodAffinity Predicate
status = podAffinityFilter.PreFilter(context.TODO(), state, task.Pod)
if !status.IsSuccess() {
return fmt.Errorf("plugin %s pre-predicates failed %s", interpodaffinity.Name, status.Message())
}
status = podAffinityFilter.Filter(context.TODO(), state, task.Pod, nodeInfo)
if !status.IsSuccess() {
return fmt.Errorf("plugin %s predicates failed %s", interpodaffinity.Name, status.Message())
}
if predicate.gpuSharingEnable {
// CheckGPUSharingPredicate
fit, err := checkNodeGPUSharingPredicate(task.Pod, node)
if err != nil {
return err
}
klog.V(4).Infof("checkNodeGPUSharingPredicate predicates Task <%s/%s> on Node <%s>: fit %v",
task.Namespace, task.Name, node.Name, fit)
}
if predicate.proportionalEnable {
// Check ProportionalPredicate
fit, err := checkNodeResourceIsProportional(task, node, predicate.proportional)
if err != nil {
return err
}
klog.V(4).Infof("checkNodeResourceIsProportional predicates Task <%s/%s> on Node <%s>: fit %v",
task.Namespace, task.Name, node.Name, fit)
}
return nil
})
}