pkg/scheduler/api/job_info.go (537 lines of code) (raw):

/* Copyright 2017 The Kubernetes Authors. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ package api import ( "encoding/json" "errors" "fmt" "sort" "strconv" "strings" "time" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" "k8s.io/klog" volumescheduling "k8s.io/kubernetes/pkg/scheduler/framework/plugins/volumebinding" batch "volcano.sh/apis/pkg/apis/batch/v1alpha1" "volcano.sh/apis/pkg/apis/scheduling" "volcano.sh/apis/pkg/apis/scheduling/v1beta1" ) // DisruptionBudget define job min pod available and max pod unvailable value type DisruptionBudget struct { MinAvailable string MaxUnavilable string } // NewDisruptionBudget create disruption budget for job func NewDisruptionBudget(minAvailable, maxUnavilable string) *DisruptionBudget { disruptionBudget := &DisruptionBudget{ MinAvailable: minAvailable, MaxUnavilable: maxUnavilable, } return disruptionBudget } // Clone return a clone of DisruptionBudget func (db *DisruptionBudget) Clone() *DisruptionBudget { return &DisruptionBudget{ MinAvailable: db.MinAvailable, MaxUnavilable: db.MaxUnavilable, } } // JobWaitingTime is maximum waiting time that a job could stay Pending in service level agreement // when job waits longer than waiting time, it should be inqueue at once, and cluster should reserve resources for it const JobWaitingTime = "sla-waiting-time" // TaskID is UID type for Task type TaskID types.UID // TransactionContext holds all the fields that needed by scheduling transaction type TransactionContext struct { NodeName string Status TaskStatus } // Clone return a clone of TransactionContext func (ctx *TransactionContext) Clone() *TransactionContext { if ctx == nil { return nil } clone := *ctx return &clone } type TopologyInfo struct { Policy string ResMap map[int]v1.ResourceList // key: numa ID } func (info *TopologyInfo) Clone() *TopologyInfo { copyInfo := &TopologyInfo{ Policy: info.Policy, ResMap: make(map[int]v1.ResourceList), } for numaID, resList := range info.ResMap { copyInfo.ResMap[numaID] = resList.DeepCopy() } return copyInfo } // TaskInfo will have all infos about the task type TaskInfo struct { UID TaskID Job JobID Name string Namespace string // Resreq is the resource that used when task running. Resreq *Resource // InitResreq is the resource that used to launch a task. InitResreq *Resource TransactionContext // LastTransaction holds the context of last scheduling transaction LastTransaction *TransactionContext Priority int32 VolumeReady bool Preemptable bool BestEffort bool // RevocableZone support set volcano.sh/revocable-zone annotaion or label for pod/podgroup // we only support empty value or * value for this version and we will support specify revocable zone name for futrue release // empty value means workload can not use revocable node // * value means workload can use all the revocable node for during node active revocable time. RevocableZone string NumaInfo *TopologyInfo PodVolumes *volumescheduling.PodVolumes Pod *v1.Pod } func getJobID(pod *v1.Pod) JobID { if gn, found := pod.Annotations[v1beta1.KubeGroupNameAnnotationKey]; found && len(gn) != 0 { // Make sure Pod and PodGroup belong to the same namespace. jobID := fmt.Sprintf("%s/%s", pod.Namespace, gn) return JobID(jobID) } return "" } func getTaskID(pod *v1.Pod) TaskID { if ts, found := pod.Annotations[batch.TaskSpecKey]; found && len(ts) != 0 { return TaskID(ts) } return "" } const TaskPriorityAnnotation = "volcano.sh/task-priority" // NewTaskInfo creates new taskInfo object for a Pod func NewTaskInfo(pod *v1.Pod) *TaskInfo { initResReq := GetPodResourceRequest(pod) resReq := initResReq bestEffort := initResReq.IsEmpty() preemptable := GetPodPreemptable(pod) revocableZone := GetPodRevocableZone(pod) topologyInfo := GetPodTopologyInfo(pod) jobID := getJobID(pod) ti := &TaskInfo{ UID: TaskID(pod.UID), Job: jobID, Name: pod.Name, Namespace: pod.Namespace, Priority: 1, Pod: pod, Resreq: resReq, InitResreq: initResReq, Preemptable: preemptable, BestEffort: bestEffort, RevocableZone: revocableZone, NumaInfo: topologyInfo, TransactionContext: TransactionContext{ NodeName: pod.Spec.NodeName, Status: getTaskStatus(pod), }, } if pod.Spec.Priority != nil { ti.Priority = *pod.Spec.Priority } if taskPriority, ok := pod.Annotations[TaskPriorityAnnotation]; ok { if priority, err := strconv.ParseInt(taskPriority, 10, 32); err == nil { ti.Priority = int32(priority) } } return ti } // GetTransactionContext get transaction context of a task func (ti *TaskInfo) GetTransactionContext() TransactionContext { return ti.TransactionContext } // GenerateLastTxContext generate and set context of last transaction for a task func (ti *TaskInfo) GenerateLastTxContext() { ctx := ti.GetTransactionContext() ti.LastTransaction = &ctx } // ClearLastTxContext clear context of last transaction for a task func (ti *TaskInfo) ClearLastTxContext() { ti.LastTransaction = nil } func (ti *TaskInfo) SetPodResourceDecision() error { if ti.NumaInfo == nil || len(ti.NumaInfo.ResMap) == 0 { return nil } klog.V(4).Infof("%v/%v resource decision: %v", ti.Namespace, ti.Name, ti.NumaInfo.ResMap) decision := PodResourceDecision{ NUMAResources: ti.NumaInfo.ResMap, } layout, err := json.Marshal(&decision) if err != nil { return err } metav1.SetMetaDataAnnotation(&ti.Pod.ObjectMeta, topologyDecisionAnnotation, string(layout[:])) return nil } func (ti *TaskInfo) UnsetPodResourceDecision() { delete(ti.Pod.Annotations, topologyDecisionAnnotation) } // Clone is used for cloning a task func (ti *TaskInfo) Clone() *TaskInfo { return &TaskInfo{ UID: ti.UID, Job: ti.Job, Name: ti.Name, Namespace: ti.Namespace, Priority: ti.Priority, PodVolumes: ti.PodVolumes, Pod: ti.Pod, Resreq: ti.Resreq.Clone(), InitResreq: ti.InitResreq.Clone(), VolumeReady: ti.VolumeReady, Preemptable: ti.Preemptable, BestEffort: ti.BestEffort, RevocableZone: ti.RevocableZone, NumaInfo: ti.NumaInfo.Clone(), TransactionContext: TransactionContext{ NodeName: ti.NodeName, Status: ti.Status, }, LastTransaction: ti.LastTransaction.Clone(), } } func (ti *TaskInfo) GetTaskSpecKey() TaskID { if ti.Pod == nil { return "" } return getTaskID(ti.Pod) } // String returns the taskInfo details in a string func (ti TaskInfo) String() string { if ti.NumaInfo == nil { return fmt.Sprintf("Task (%v:%v/%v): job %v, status %v, pri %v"+ "resreq %v, preemptable %v, revocableZone %v", ti.UID, ti.Namespace, ti.Name, ti.Job, ti.Status, ti.Priority, ti.Resreq, ti.Preemptable, ti.RevocableZone) } return fmt.Sprintf("Task (%v:%v/%v): job %v, status %v, pri %v"+ "resreq %v, preemptable %v, revocableZone %v, numaInfo %v", ti.UID, ti.Namespace, ti.Name, ti.Job, ti.Status, ti.Priority, ti.Resreq, ti.Preemptable, ti.RevocableZone, *ti.NumaInfo) } // JobID is the type of JobInfo's ID. type JobID types.UID type tasksMap map[TaskID]*TaskInfo // NodeResourceMap stores resource in a node type NodeResourceMap map[string]*Resource // JobInfo will have all info of a Job type JobInfo struct { UID JobID Name string Namespace string Queue QueueID Priority int32 MinAvailable int32 WaitingTime *time.Duration JobFitErrors string NodesFitErrors map[TaskID]*FitErrors // All tasks of the Job. TaskStatusIndex map[TaskStatus]tasksMap Tasks tasksMap TaskMinAvailable map[TaskID]int32 TaskMinAvailableTotal int32 Allocated *Resource TotalRequest *Resource CreationTimestamp metav1.Time PodGroup *PodGroup ScheduleStartTimestamp metav1.Time Preemptable bool // RevocableZone support set volcano.sh/revocable-zone annotaion or label for pod/podgroup // we only support empty value or * value for this version and we will support specify revocable zone name for futrue release // empty value means workload can not use revocable node // * value means workload can use all the revocable node for during node active revocable time. RevocableZone string Budget *DisruptionBudget } // NewJobInfo creates a new jobInfo for set of tasks func NewJobInfo(uid JobID, tasks ...*TaskInfo) *JobInfo { job := &JobInfo{ UID: uid, MinAvailable: 0, NodesFitErrors: make(map[TaskID]*FitErrors), Allocated: EmptyResource(), TotalRequest: EmptyResource(), TaskStatusIndex: map[TaskStatus]tasksMap{}, Tasks: tasksMap{}, TaskMinAvailable: map[TaskID]int32{}, } for _, task := range tasks { job.AddTaskInfo(task) } return job } // UnsetPodGroup removes podGroup details from a job func (ji *JobInfo) UnsetPodGroup() { ji.PodGroup = nil } // SetPodGroup sets podGroup details to a job func (ji *JobInfo) SetPodGroup(pg *PodGroup) { ji.Name = pg.Name ji.Namespace = pg.Namespace ji.MinAvailable = pg.Spec.MinMember ji.Queue = QueueID(pg.Spec.Queue) ji.CreationTimestamp = pg.GetCreationTimestamp() var err error ji.WaitingTime, err = ji.extractWaitingTime(pg) if err != nil { klog.Warningf("Error occurs in parsing waiting time for job <%s/%s>, err: %s.", pg.Namespace, pg.Name, err.Error()) ji.WaitingTime = nil } ji.Preemptable = ji.extractPreemptable(pg) ji.RevocableZone = ji.extractRevocableZone(pg) ji.Budget = ji.extractBudget(pg) taskMinAvailableTotal := int32(0) for task, member := range pg.Spec.MinTaskMember { ji.TaskMinAvailable[TaskID(task)] = member taskMinAvailableTotal += member } ji.TaskMinAvailableTotal = taskMinAvailableTotal ji.PodGroup = pg } // extractWaitingTime reads sla waiting time for job from podgroup annotations // TODO: should also read from given field in volcano job spec func (ji *JobInfo) extractWaitingTime(pg *PodGroup) (*time.Duration, error) { if _, exist := pg.Annotations[JobWaitingTime]; !exist { return nil, nil } jobWaitingTime, err := time.ParseDuration(pg.Annotations[JobWaitingTime]) if err != nil { return nil, err } if jobWaitingTime <= 0 { return nil, errors.New("invalid sla waiting time") } return &jobWaitingTime, nil } // extractPreemptable return volcano.sh/preemptable value for job func (ji *JobInfo) extractPreemptable(pg *PodGroup) bool { // check annotaion first if len(pg.Annotations) > 0 { if value, found := pg.Annotations[v1beta1.PodPreemptable]; found { b, err := strconv.ParseBool(value) if err != nil { klog.Warningf("invalid %s=%s", v1beta1.PodPreemptable, value) return false } return b } } // it annotation does not exit, check label if len(pg.Labels) > 0 { if value, found := pg.Labels[v1beta1.PodPreemptable]; found { b, err := strconv.ParseBool(value) if err != nil { klog.Warningf("invalid %s=%s", v1beta1.PodPreemptable, value) return false } return b } } return false } // extractRevocableZone return volcano.sh/revocable-zone value for pod/podgroup func (ji *JobInfo) extractRevocableZone(pg *PodGroup) string { // check annotation first if len(pg.Annotations) > 0 { if value, found := pg.Annotations[v1beta1.RevocableZone]; found { if value != "*" { return "" } return value } if value, found := pg.Annotations[v1beta1.PodPreemptable]; found { if b, err := strconv.ParseBool(value); err == nil && b { return "*" } } } return "" } // extractBudget return budget value for job func (ji *JobInfo) extractBudget(pg *PodGroup) *DisruptionBudget { if len(pg.Annotations) > 0 { if value, found := pg.Annotations[v1beta1.JDBMinAvailable]; found { return NewDisruptionBudget(value, "") } else if value, found := pg.Annotations[v1beta1.JDBMaxUnavailable]; found { return NewDisruptionBudget("", value) } } return NewDisruptionBudget("", "") } // GetMinResources return the min resources of podgroup. func (ji *JobInfo) GetMinResources() *Resource { if ji.PodGroup.Spec.MinResources == nil { return EmptyResource() } return NewResource(*ji.PodGroup.Spec.MinResources) } func (ji *JobInfo) addTaskIndex(ti *TaskInfo) { if _, found := ji.TaskStatusIndex[ti.Status]; !found { ji.TaskStatusIndex[ti.Status] = tasksMap{} } ji.TaskStatusIndex[ti.Status][ti.UID] = ti } // AddTaskInfo is used to add a task to a job func (ji *JobInfo) AddTaskInfo(ti *TaskInfo) { ji.Tasks[ti.UID] = ti ji.addTaskIndex(ti) ji.TotalRequest.Add(ti.Resreq) if AllocatedStatus(ti.Status) { ji.Allocated.Add(ti.Resreq) } } // UpdateTaskStatus is used to update task's status in a job. // If error occurs both task and job are guaranteed to be in the original state. func (ji *JobInfo) UpdateTaskStatus(task *TaskInfo, status TaskStatus) error { if err := validateStatusUpdate(task.Status, status); err != nil { return err } // First remove the task (if exist) from the task list. if _, found := ji.Tasks[task.UID]; found { if err := ji.DeleteTaskInfo(task); err != nil { return err } } // Update task's status to the target status once task addition is guaranteed to succeed. task.Status = status ji.AddTaskInfo(task) return nil } func (ji *JobInfo) deleteTaskIndex(ti *TaskInfo) { if tasks, found := ji.TaskStatusIndex[ti.Status]; found { delete(tasks, ti.UID) if len(tasks) == 0 { delete(ji.TaskStatusIndex, ti.Status) } } } // DeleteTaskInfo is used to delete a task from a job func (ji *JobInfo) DeleteTaskInfo(ti *TaskInfo) error { if task, found := ji.Tasks[ti.UID]; found { ji.TotalRequest.Sub(task.Resreq) if AllocatedStatus(task.Status) { ji.Allocated.Sub(task.Resreq) } delete(ji.Tasks, task.UID) ji.deleteTaskIndex(task) return nil } return fmt.Errorf("failed to find task <%v/%v> in job <%v/%v>", ti.Namespace, ti.Name, ji.Namespace, ji.Name) } // Clone is used to clone a jobInfo object func (ji *JobInfo) Clone() *JobInfo { info := &JobInfo{ UID: ji.UID, Name: ji.Name, Namespace: ji.Namespace, Queue: ji.Queue, Priority: ji.Priority, MinAvailable: ji.MinAvailable, WaitingTime: ji.WaitingTime, JobFitErrors: ji.JobFitErrors, NodesFitErrors: make(map[TaskID]*FitErrors), Allocated: EmptyResource(), TotalRequest: EmptyResource(), PodGroup: ji.PodGroup.Clone(), TaskStatusIndex: map[TaskStatus]tasksMap{}, TaskMinAvailable: ji.TaskMinAvailable, TaskMinAvailableTotal: ji.TaskMinAvailableTotal, Tasks: tasksMap{}, Preemptable: ji.Preemptable, RevocableZone: ji.RevocableZone, Budget: ji.Budget.Clone(), } ji.CreationTimestamp.DeepCopyInto(&info.CreationTimestamp) for _, task := range ji.Tasks { info.AddTaskInfo(task.Clone()) } return info } // String returns a jobInfo object in string format func (ji JobInfo) String() string { res := "" i := 0 for _, task := range ji.Tasks { res += fmt.Sprintf("\n\t %d: %v", i, task) i++ } return fmt.Sprintf("Job (%v): namespace %v (%v), name %v, minAvailable %d, podGroup %+v, preemptable %+v, revocableZone %+v, minAvailable %+v, maxAvailable %+v", ji.UID, ji.Namespace, ji.Queue, ji.Name, ji.MinAvailable, ji.PodGroup, ji.Preemptable, ji.RevocableZone, ji.Budget.MinAvailable, ji.Budget.MaxUnavilable) + res } // FitError returns detailed information on why a job's task failed to fit on // each available node func (ji *JobInfo) FitError() string { sortReasonsHistogram := func(reasons map[string]int) []string { reasonStrings := []string{} for k, v := range reasons { reasonStrings = append(reasonStrings, fmt.Sprintf("%v %v", v, k)) } sort.Strings(reasonStrings) return reasonStrings } // Stat histogram for all tasks of the job reasons := make(map[string]int) for status, taskMap := range ji.TaskStatusIndex { reasons[status.String()] += len(taskMap) } reasons["minAvailable"] = int(ji.MinAvailable) reasonMsg := fmt.Sprintf("%v, %v", scheduling.PodGroupNotReady, strings.Join(sortReasonsHistogram(reasons), ", ")) // Stat histogram for pending tasks only reasons = make(map[string]int) for uid := range ji.TaskStatusIndex[Pending] { reason, _ := ji.TaskSchedulingReason(uid) reasons[reason]++ } if len(reasons) > 0 { reasonMsg += "; " + fmt.Sprintf("%s: %s", Pending.String(), strings.Join(sortReasonsHistogram(reasons), ", ")) } return reasonMsg } // TaskSchedulingReason get detailed reason and message of the given task // It returns detailed reason and message for tasks based on last scheduling transaction. func (ji *JobInfo) TaskSchedulingReason(tid TaskID) (reason string, msg string) { taskInfo, exists := ji.Tasks[tid] if !exists { return "", "" } // Get detailed scheduling reason based on LastTransaction ctx := taskInfo.GetTransactionContext() if taskInfo.LastTransaction != nil { ctx = *taskInfo.LastTransaction } msg = ji.JobFitErrors switch status := ctx.Status; status { case Allocated, Pipelined: // Pod is schedulable msg = fmt.Sprintf("Pod %s/%s can possibly be assigned to %s", taskInfo.Namespace, taskInfo.Name, ctx.NodeName) if status == Pipelined { msg += " once resource is released" } return PodReasonSchedulable, msg case Pending: if fe := ji.NodesFitErrors[tid]; fe != nil { // Pod is not schedulable return PodReasonUnschedulable, fe.Error() } // Pod is not scheduled yet return PodReasonUnschedulable, msg default: return status.String(), msg } } // ReadyTaskNum returns the number of tasks that are ready or that is best-effort. func (ji *JobInfo) ReadyTaskNum() int32 { occupied := 0 occupied += len(ji.TaskStatusIndex[Bound]) occupied += len(ji.TaskStatusIndex[Binding]) occupied += len(ji.TaskStatusIndex[Running]) occupied += len(ji.TaskStatusIndex[Allocated]) occupied += len(ji.TaskStatusIndex[Succeeded]) if tasks, found := ji.TaskStatusIndex[Pending]; found { for _, task := range tasks { if task.BestEffort { occupied++ } } } return int32(occupied) } // WaitingTaskNum returns the number of tasks that are pipelined. func (ji *JobInfo) WaitingTaskNum() int32 { return int32(len(ji.TaskStatusIndex[Pipelined])) } // CheckTaskMinAvailable returns whether each task of job is valid. func (ji *JobInfo) CheckTaskMinAvailable() bool { // if job minAvailable is less than sumof(task minAvailable), skip this check if ji.MinAvailable < ji.TaskMinAvailableTotal { return true } actual := map[TaskID]int32{} for status, tasks := range ji.TaskStatusIndex { if AllocatedStatus(status) || status == Succeeded || status == Pipelined || status == Pending { for _, task := range tasks { actual[getTaskID(task.Pod)]++ } } } klog.V(4).Infof("job %s/%s actual: %+v, ji.TaskMinAvailable: %+v", ji.Name, ji.Namespace, actual, ji.TaskMinAvailable) for task, minAvailable := range ji.TaskMinAvailable { if act, ok := actual[task]; !ok || act < minAvailable { return false } } return true } // ValidTaskNum returns the number of tasks that are valid. func (ji *JobInfo) ValidTaskNum() int32 { occupied := 0 for status, tasks := range ji.TaskStatusIndex { if AllocatedStatus(status) || status == Succeeded || status == Pipelined || status == Pending { occupied += len(tasks) } } return int32(occupied) } // Ready returns whether job is ready for run func (ji *JobInfo) Ready() bool { occupied := ji.ReadyTaskNum() return occupied >= ji.MinAvailable } // IsPending returns whether job is in pending status func (ji *JobInfo) IsPending() bool { if ji.PodGroup == nil || ji.PodGroup.Status.Phase == scheduling.PodGroupPending || ji.PodGroup.Status.Phase == "" { return true } return false }