pkg/scheduler/api/node_info.go (412 lines of code) (raw):

/* Copyright 2021 The Volcano Authors. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ package api import ( "fmt" "strconv" v1 "k8s.io/api/core/v1" "k8s.io/klog" "volcano.sh/apis/pkg/apis/scheduling/v1beta1" ) type AllocateFailError struct { Reason string } func (o *AllocateFailError) Error() string { return o.Reason } // NodeInfo is node level aggregated information. type NodeInfo struct { Name string Node *v1.Node // The state of node State NodeState // The releasing resource on that node Releasing *Resource // The pipelined resource on that node Pipelined *Resource // The idle resource on that node Idle *Resource // The used resource on that node, including running and terminating // pods Used *Resource Allocatable *Resource Capability *Resource Tasks map[TaskID]*TaskInfo NumaInfo *NumatopoInfo NumaChgFlag NumaChgFlag NumaSchedulerInfo *NumatopoInfo RevocableZone string // Used to store custom information Others map[string]interface{} GPUDevices map[int]*GPUDevice // enable node resource oversubscription OversubscriptionNode bool // OfflineJobEvicting true means node resource usage too high then dispatched pod can not use oversubscription resource OfflineJobEvicting bool // Resource Oversubscription feature: the Oversubscription Resource reported in annotation OversubscriptionResource *Resource } // FutureIdle returns resources that will be idle in the future: // // That is current idle resources plus released resources minus pipelined resources. func (ni *NodeInfo) FutureIdle() *Resource { return ni.Idle.Clone().Add(ni.Releasing).Sub(ni.Pipelined) } // GetNodeAllocatable return node Allocatable without OversubscriptionResource resource func (ni *NodeInfo) GetNodeAllocatable() *Resource { return NewResource(ni.Node.Status.Allocatable) } // NodeState defines the current state of node. type NodeState struct { Phase NodePhase Reason string } // NewNodeInfo is used to create new nodeInfo object func NewNodeInfo(node *v1.Node) *NodeInfo { nodeInfo := &NodeInfo{ Releasing: EmptyResource(), Pipelined: EmptyResource(), Idle: EmptyResource(), Used: EmptyResource(), Allocatable: EmptyResource(), Capability: EmptyResource(), OversubscriptionResource: EmptyResource(), Tasks: make(map[TaskID]*TaskInfo), GPUDevices: make(map[int]*GPUDevice), } nodeInfo.setOversubscription(node) if node != nil { nodeInfo.Name = node.Name nodeInfo.Node = node nodeInfo.Idle = NewResource(node.Status.Allocatable).Add(nodeInfo.OversubscriptionResource) nodeInfo.Allocatable = NewResource(node.Status.Allocatable).Add(nodeInfo.OversubscriptionResource) nodeInfo.Capability = NewResource(node.Status.Capacity).Add(nodeInfo.OversubscriptionResource) } nodeInfo.setNodeGPUInfo(node) nodeInfo.setNodeState(node) nodeInfo.setRevocableZone(node) return nodeInfo } // RefreshNumaSchedulerInfoByCrd used to update scheduler numa information based the CRD numatopo func (ni *NodeInfo) RefreshNumaSchedulerInfoByCrd() { if ni.NumaInfo == nil { ni.NumaSchedulerInfo = nil return } tmp := ni.NumaInfo.DeepCopy() if ni.NumaChgFlag == NumaInfoMoreFlag { ni.NumaSchedulerInfo = tmp } else if ni.NumaChgFlag == NumaInfoLessFlag { numaResMap := ni.NumaSchedulerInfo.NumaResMap for resName, resInfo := range tmp.NumaResMap { klog.V(5).Infof("resource %s Allocatable : current %v new %v on node %s", resName, numaResMap[resName], resInfo, ni.Name) if numaResMap[resName].Allocatable.Size() >= resInfo.Allocatable.Size() { numaResMap[resName].Allocatable = resInfo.Allocatable.Clone() numaResMap[resName].Capacity = resInfo.Capacity } } } ni.NumaChgFlag = NumaInfoResetFlag } // Clone used to clone nodeInfo Object func (ni *NodeInfo) Clone() *NodeInfo { res := NewNodeInfo(ni.Node) for _, p := range ni.Tasks { res.AddTask(p) } if ni.NumaInfo != nil { res.NumaInfo = ni.NumaInfo.DeepCopy() } if ni.NumaSchedulerInfo != nil { res.NumaSchedulerInfo = ni.NumaSchedulerInfo.DeepCopy() klog.V(5).Infof("node[%s]", ni.Name) for resName, resInfo := range res.NumaSchedulerInfo.NumaResMap { klog.V(5).Infof("current resource %s : %v", resName, resInfo) } klog.V(5).Infof("current Policies : %v", res.NumaSchedulerInfo.Policies) } res.Others = ni.Others return res } // Ready returns whether node is ready for scheduling func (ni *NodeInfo) Ready() bool { return ni.State.Phase == Ready } func (ni *NodeInfo) setRevocableZone(node *v1.Node) { if node == nil { klog.Warningf("the argument node is null.") return } revocableZone := "" if len(node.Labels) > 0 { if value, found := node.Labels[v1beta1.RevocableZone]; found { revocableZone = value } } ni.RevocableZone = revocableZone } // Check node if enable Oversubscription and set Oversubscription resources // Only support oversubscription cpu and memory resource for this version func (ni *NodeInfo) setOversubscription(node *v1.Node) { if node == nil { return } ni.OversubscriptionNode = false ni.OfflineJobEvicting = false if len(node.Labels) > 0 { if value, found := node.Labels[OversubscriptionNode]; found { b, err := strconv.ParseBool(value) if err == nil { ni.OversubscriptionNode = b } else { ni.OversubscriptionNode = false } klog.V(5).Infof("Set node %s Oversubscription to %v", node.Name, ni.OversubscriptionNode) } } if len(node.Annotations) > 0 { if value, found := node.Annotations[OfflineJobEvicting]; found { b, err := strconv.ParseBool(value) if err == nil { ni.OfflineJobEvicting = b } else { ni.OfflineJobEvicting = false } klog.V(5).Infof("Set node %s OfflineJobEvicting to %v", node.Name, ni.OfflineJobEvicting) } if value, found := node.Annotations[OversubscriptionCPU]; found { ni.OversubscriptionResource.MilliCPU, _ = strconv.ParseFloat(value, 64) klog.V(5).Infof("Set node %s Oversubscription CPU to %v", node.Name, ni.OversubscriptionResource.MilliCPU) } if value, found := node.Annotations[OversubscriptionMemory]; found { ni.OversubscriptionResource.Memory, _ = strconv.ParseFloat(value, 64) klog.V(5).Infof("Set node %s Oversubscription Memory to %v", node.Name, ni.OversubscriptionResource.Memory) } } } func (ni *NodeInfo) setNodeState(node *v1.Node) { // If node is nil, the node is un-initialized in cache if node == nil { ni.State = NodeState{ Phase: NotReady, Reason: "UnInitialized", } return } // set NodeState according to resources if !ni.Used.LessEqual(ni.Allocatable, Zero) { ni.State = NodeState{ Phase: NotReady, Reason: "OutOfSync", } return } // If node not ready, e.g. power off for _, cond := range node.Status.Conditions { if cond.Type == v1.NodeReady && cond.Status != v1.ConditionTrue { ni.State = NodeState{ Phase: NotReady, Reason: "NotReady", } klog.Warningf("set the node %s status to %s.", node.Name, NotReady.String()) return } } // Node is ready (ignore node conditions because of taint/toleration) ni.State = NodeState{ Phase: Ready, Reason: "", } klog.V(4).Infof("set the node %s status to %s.", node.Name, Ready.String()) } func (ni *NodeInfo) setNodeGPUInfo(node *v1.Node) { if node == nil { return } memory, ok := node.Status.Capacity[VolcanoGPUResource] if !ok { return } totalMemory := memory.Value() res, ok := node.Status.Capacity[VolcanoGPUNumber] if !ok { return } gpuNumber := res.Value() if gpuNumber == 0 { klog.Warningf("invalid %s=%s", VolcanoGPUNumber, res.String()) return } memoryPerCard := uint(totalMemory / gpuNumber) for i := 0; i < int(gpuNumber); i++ { ni.GPUDevices[i] = NewGPUDevice(i, memoryPerCard) } } // SetNode sets kubernetes node object to nodeInfo object func (ni *NodeInfo) SetNode(node *v1.Node) { ni.setNodeState(node) if !ni.Ready() { klog.Warningf("Failed to set node info for %s, phase: %s, reason: %s", ni.Name, ni.State.Phase, ni.State.Reason) return } // Dry run, make sure all fields other than `State` are in the original state. copy := ni.Clone() copy.setNode(node) copy.setNodeState(node) if !copy.Ready() { klog.Warningf("SetNode makes node %s not ready, phase: %s, reason: %s", copy.Name, copy.State.Phase, copy.State.Reason) // Set state of node to !Ready, left other fields untouched ni.State = copy.State return } ni.setNode(node) } // setNode sets kubernetes node object to nodeInfo object without assertion func (ni *NodeInfo) setNode(node *v1.Node) { ni.setOversubscription(node) ni.setNodeGPUInfo(node) ni.setRevocableZone(node) ni.Name = node.Name ni.Node = node ni.Allocatable = NewResource(node.Status.Allocatable).Add(ni.OversubscriptionResource) ni.Capability = NewResource(node.Status.Capacity).Add(ni.OversubscriptionResource) ni.Releasing = EmptyResource() ni.Pipelined = EmptyResource() ni.Idle = NewResource(node.Status.Allocatable).Add(ni.OversubscriptionResource) ni.Used = EmptyResource() for _, ti := range ni.Tasks { switch ti.Status { case Releasing: ni.Idle.sub(ti.Resreq) // sub without assertion ni.Releasing.Add(ti.Resreq) ni.Used.Add(ti.Resreq) ni.AddGPUResource(ti.Pod) case Pipelined: ni.Pipelined.Add(ti.Resreq) default: ni.Idle.sub(ti.Resreq) // sub without assertion ni.Used.Add(ti.Resreq) ni.AddGPUResource(ti.Pod) } } } func (ni *NodeInfo) allocateIdleResource(ti *TaskInfo) error { if ti.Resreq.LessEqual(ni.Idle, Zero) { ni.Idle.Sub(ti.Resreq) return nil } return &AllocateFailError{Reason: fmt.Sprintf( "cannot allocate resource, <%s> idle: %s <%s/%s> req: %s", ni.Name, ni.Idle.String(), ti.Namespace, ti.Name, ti.Resreq.String(), )} } // AddTask is used to add a task in nodeInfo object // // If error occurs both task and node are guaranteed to be in the original state. func (ni *NodeInfo) AddTask(task *TaskInfo) error { if len(task.NodeName) > 0 && len(ni.Name) > 0 && task.NodeName != ni.Name { return fmt.Errorf("task <%v/%v> already on different node <%v>", task.Namespace, task.Name, task.NodeName) } key := PodKey(task.Pod) if _, found := ni.Tasks[key]; found { return fmt.Errorf("task <%v/%v> already on node <%v>", task.Namespace, task.Name, ni.Name) } // Node will hold a copy of task to make sure the status // change will not impact resource in node. ti := task.Clone() if ni.Node != nil { switch ti.Status { case Releasing: if err := ni.allocateIdleResource(ti); err != nil { return err } ni.Releasing.Add(ti.Resreq) ni.Used.Add(ti.Resreq) ni.AddGPUResource(ti.Pod) case Pipelined: ni.Pipelined.Add(ti.Resreq) default: if err := ni.allocateIdleResource(ti); err != nil { return err } ni.Used.Add(ti.Resreq) ni.AddGPUResource(ti.Pod) } } if ni.NumaInfo != nil { ni.NumaInfo.AddTask(ti) } // Update task node name upon successful task addition. task.NodeName = ni.Name ti.NodeName = ni.Name ni.Tasks[key] = ti return nil } // RemoveTask used to remove a task from nodeInfo object. // // If error occurs both task and node are guaranteed to be in the original state. func (ni *NodeInfo) RemoveTask(ti *TaskInfo) error { key := PodKey(ti.Pod) task, found := ni.Tasks[key] if !found { klog.Warningf("failed to find task <%v/%v> on host <%v>", ti.Namespace, ti.Name, ni.Name) return nil } if ni.Node != nil { switch task.Status { case Releasing: ni.Releasing.Sub(task.Resreq) ni.Idle.Add(task.Resreq) ni.Used.Sub(task.Resreq) ni.SubGPUResource(ti.Pod) case Pipelined: ni.Pipelined.Sub(task.Resreq) default: ni.Idle.Add(task.Resreq) ni.Used.Sub(task.Resreq) ni.SubGPUResource(ti.Pod) } } if ni.NumaInfo != nil { ni.NumaInfo.RemoveTask(ti) } delete(ni.Tasks, key) return nil } // UpdateTask is used to update a task in nodeInfo object. // // If error occurs both task and node are guaranteed to be in the original state. func (ni *NodeInfo) UpdateTask(ti *TaskInfo) error { if err := ni.RemoveTask(ti); err != nil { return err } if err := ni.AddTask(ti); err != nil { // This should never happen if task removal was successful, // because only possible error during task addition is when task is still on a node. klog.Fatalf("Failed to add Task <%s,%s> to Node <%s> during task update", ti.Namespace, ti.Name, ni.Name) } return nil } // String returns nodeInfo details in string format func (ni NodeInfo) String() string { tasks := "" i := 0 for _, task := range ni.Tasks { tasks += fmt.Sprintf("\n\t %d: %v", i, task) i++ } return fmt.Sprintf("Node (%s): allocatable<%v> idle <%v>, used <%v>, releasing <%v>, oversubscribution <%v>, "+ "state <phase %s, reaseon %s>, oversubscributionNode <%v>, offlineJobEvicting <%v>,taints <%v>%s", ni.Name, ni.Allocatable, ni.Idle, ni.Used, ni.Releasing, ni.OversubscriptionResource, ni.State.Phase, ni.State.Reason, ni.OversubscriptionNode, ni.OfflineJobEvicting, ni.Node.Spec.Taints, tasks) } // Pods returns all pods running in that node func (ni *NodeInfo) Pods() (pods []*v1.Pod) { for _, t := range ni.Tasks { pods = append(pods, t.Pod) } return } // GetDevicesIdleGPUMemory returns all the idle GPU memory by gpu card. func (ni *NodeInfo) GetDevicesIdleGPUMemory() map[int]uint { devicesAllGPUMemory := ni.getDevicesAllGPUMemory() devicesUsedGPUMemory := ni.getDevicesUsedGPUMemory() res := map[int]uint{} for id, allMemory := range devicesAllGPUMemory { if usedMemory, found := devicesUsedGPUMemory[id]; found { res[id] = allMemory - usedMemory } else { res[id] = allMemory } } return res } func (ni *NodeInfo) getDevicesUsedGPUMemory() map[int]uint { res := map[int]uint{} for _, device := range ni.GPUDevices { res[device.ID] = device.getUsedGPUMemory() } return res } func (ni *NodeInfo) getDevicesAllGPUMemory() map[int]uint { res := map[int]uint{} for _, device := range ni.GPUDevices { res[device.ID] = device.Memory } return res } // AddGPUResource adds the pod to GPU pool if it is assigned func (ni *NodeInfo) AddGPUResource(pod *v1.Pod) { gpuRes := GetGPUResourceOfPod(pod) if gpuRes > 0 { id := GetGPUIndex(pod) if dev := ni.GPUDevices[id]; dev != nil { dev.PodMap[string(pod.UID)] = pod } } } // SubGPUResource frees the gpu hold by the pod func (ni *NodeInfo) SubGPUResource(pod *v1.Pod) { gpuRes := GetGPUResourceOfPod(pod) if gpuRes > 0 { id := GetGPUIndex(pod) if dev := ni.GPUDevices[id]; dev != nil { delete(dev.PodMap, string(pod.UID)) } } }