pkg/scheduler/plugins/tdm/tdm.go (261 lines of code) (raw):

/* Copyright 2021 The Volcano Authors. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ package tdm import ( "fmt" "strings" "time" "k8s.io/apimachinery/pkg/util/intstr" "k8s.io/klog" k8sFramework "k8s.io/kubernetes/pkg/scheduler/framework" "volcano.sh/volcano/pkg/scheduler/api" "volcano.sh/volcano/pkg/scheduler/framework" tutil "volcano.sh/volcano/pkg/scheduler/plugins/util" "volcano.sh/volcano/pkg/scheduler/util" ) const ( // PluginName indicates name of volcano scheduler plugin. PluginName = "tdm" // revocableZoneLayout revocable zone layout revocableZoneLayout = "15:04" revocableZoneLabelPrefix = "tdm.revocable-zone." evictPeriodLabel = "tdm.evict.period" defaultPodEvictNum = 1 ) var lastEvictAt time.Time /* actions: "enqueue, reclaim, allocate, preempt" tiers: - plugins: - name: tdm arguments: tdm.revocable-zone.rz1: 10:00-21:00 tdm.revocable-zone.rz2: 12:00-14:00 tdm.evict.period: 1m */ type tdmPlugin struct { revocableZone map[string]string // evictPeriod // default 1m evictPeriod time.Duration } // New function returns prioritizePlugin object func New(args framework.Arguments) framework.Plugin { revocableZone := make(map[string]string) evictPeriod := time.Minute for k, v := range args { if strings.Contains(k, revocableZoneLabelPrefix) { revocableZone[strings.Replace(k, revocableZoneLabelPrefix, "", 1)] = v.(string) } } if period, ok := args[evictPeriodLabel]; ok { if d, err := time.ParseDuration(period.(string)); err == nil { evictPeriod = d } } return &tdmPlugin{revocableZone, evictPeriod} } func (tp *tdmPlugin) Name() string { return PluginName } func parseRevocableZone(rzRaw string) (start, end time.Time, err error) { rzValues := strings.Split(strings.TrimSpace(rzRaw), "-") if len(rzValues) != 2 { err = fmt.Errorf("revocable zone %v format error", rzRaw) return } t1, err := time.Parse(revocableZoneLayout, rzValues[0]) if err != nil { return } t2, err := time.Parse(revocableZoneLayout, rzValues[1]) if err != nil { return } now := time.Now() start = time.Date(now.Year(), now.Month(), now.Day(), t1.Hour(), t1.Minute(), 0, 0, now.Location()) if t1.After(t2) || t1.Equal(t2) { end = time.Date(now.Year(), now.Month(), now.Day()+1, t2.Hour(), t2.Minute(), 0, 0, now.Location()) } else { end = time.Date(now.Year(), now.Month(), now.Day(), t2.Hour(), t2.Minute(), 0, 0, now.Location()) } return } func (tp *tdmPlugin) availableRevocableZone(rz string) error { // rzRaw format 00:00-23:59 rzRaw, ok := tp.revocableZone[rz] if !ok { return fmt.Errorf("revocable zone %v not support", rz) } now := time.Now() start, end, err := parseRevocableZone(rzRaw) if err != nil { return err } if now.Unix() < start.Unix() || now.Unix() > end.Unix() { return fmt.Errorf("current time beyond revocable zone %v:%v", rz, rzRaw) } return nil } func (tp *tdmPlugin) OnSessionOpen(ssn *framework.Session) { klog.V(4).Infof("Enter tdm plugin ...") if klog.V(4) { defer func() { klog.V(4).Infof("Leaving tdm plugin.") }() } // tdm plugin just handle revocable node predicateFn := func(task *api.TaskInfo, node *api.NodeInfo) error { if node.RevocableZone == "" { return nil } if err := tp.availableRevocableZone(node.RevocableZone); err != nil { return fmt.Errorf("plugin %s predicates %w", tp.Name(), err) } klog.V(4).Infof("TDM node %v revocable zone %v:%v is active", node.Name, node.RevocableZone, tp.revocableZone[node.RevocableZone]) if len(task.RevocableZone) == 0 { msg := fmt.Sprintf("task %s/%s is not allow to dispatch to revocable node %s", task.Namespace, task.Name, node.Name) return fmt.Errorf("plugin %s predicates %s", tp.Name(), msg) } klog.V(4).Infof("TDM filter for Task %s/%s on node %s pass.", task.Namespace, task.Name, node.Name) return nil } // tdm plugin just handle revocable node nodeOrderFn := func(task *api.TaskInfo, node *api.NodeInfo) (float64, error) { score := 0.0 if node.RevocableZone == "" { return score, nil } if err := tp.availableRevocableZone(node.RevocableZone); err != nil { klog.V(4).Infof("TDM not available %s", err) return score, err } if len(task.RevocableZone) == 0 { klog.V(4).Infof("TDM task %s/%s is not allow to dispatch to revocable node %s", task.Namespace, task.Name, node.Name) return score, nil } score = float64(k8sFramework.MaxNodeScore) klog.V(4).Infof("TDM score for Task %s/%s on node %s is: %v", task.Namespace, task.Name, node.Name, score) return score, nil } preemptableFn := func(preemptor *api.TaskInfo, preemptees []*api.TaskInfo) ([]*api.TaskInfo, int) { // for the preemptable or can use revocablezone workload, they can not preempt other tasks. if preemptor.Preemptable || len(preemptor.RevocableZone) > 0 { klog.V(4).Infof("TDM task %s/%s is preemptable, do nothing skip", preemptor.Namespace, preemptor.Name) return nil, tutil.Reject } var victims []*api.TaskInfo tasksMap := make(map[api.JobID][]*api.TaskInfo) // find preemptable tasks which appear on none revocable node for _, task := range preemptees { if !task.Preemptable || task.Status != api.Running { continue } node, ok := ssn.Nodes[task.NodeName] if !ok { continue } if node.RevocableZone != "" { continue } tasksMap[task.Job] = append(tasksMap[task.Job], task) } for jobID, preemptableTasks := range tasksMap { if job, ok := ssn.Jobs[jobID]; ok { victims = append(victims, tp.maxVictims(job, preemptableTasks)...) } } klog.V(4).Infof("TDM victims are %+v", victims) return victims, tutil.Permit } victimsFn := func() []*api.TaskInfo { if lastEvictAt.Add(tp.evictPeriod).After(time.Now()) { klog.V(4).Infof("TDM next evict time at %v", lastEvictAt) return nil } klog.V(4).Infof("TDM start to find victims") // find preemptable task on timeout revocable zone node victims := make([]*api.TaskInfo, 0) for rz := range tp.revocableZone { if err := tp.availableRevocableZone(rz); err != nil { klog.V(4).Infof("TDM revocable zone %v disactive, %v", rz, err) // rz disactive, then evict preemptable tasks by job from the revocable node for jobID, preemtableTasks := range tp.revocableNodePreemptableTask(rz, ssn) { if job, ok := ssn.Jobs[jobID]; ok { victims = append(victims, tp.maxVictims(job, preemtableTasks)...) } } } } // need to consider concurrency? lastEvictAt = time.Now() klog.V(4).Infof("TDM got %v victims", len(victims)) return victims } jobOrderFn := func(l, r interface{}) int { lv := l.(*api.JobInfo) rv := r.(*api.JobInfo) if lv.Preemptable == rv.Preemptable { return 0 } if !lv.Preemptable { return -1 } return 1 } jobPipelinedFn := func(obj interface{}) int { jobInfo := obj.(*api.JobInfo) occupied := jobInfo.WaitingTaskNum() + jobInfo.ReadyTaskNum() if occupied >= jobInfo.MinAvailable { return tutil.Permit } return tutil.Reject } jobStarvingFn := func(obj interface{}) bool { jobInfo := obj.(*api.JobInfo) // allow none preemptable elastic job (deployment) preempt task if jobInfo.Preemptable { return false } return len(jobInfo.TaskStatusIndex[api.Pending]) > 0 } ssn.AddPredicateFn(tp.Name(), predicateFn) ssn.AddNodeOrderFn(tp.Name(), nodeOrderFn) ssn.AddPreemptableFn(tp.Name(), preemptableFn) ssn.AddVictimTasksFns(tp.Name(), victimsFn) ssn.AddJobOrderFn(tp.Name(), jobOrderFn) ssn.AddJobPipelinedFn(tp.Name(), jobPipelinedFn) ssn.AddJobStarvingFns(tp.Name(), jobStarvingFn) } func (tp *tdmPlugin) maxVictims(job *api.JobInfo, victims []*api.TaskInfo) []*api.TaskInfo { maxPodEvictNum := tp.getMaxPodEvictNum(job) targetNum := util.GetMinInt(maxPodEvictNum, len(victims)) klog.V(3).Infof("Job <%s/%s> max evict:%v, potential victims number:%v, max victims number:%v", job.Namespace, job.Name, maxPodEvictNum, len(victims), targetNum) return victims[:targetNum] } // get max pod evict number from job budget configure func (tp *tdmPlugin) getMaxPodEvictNum(job *api.JobInfo) int { jobRunningTaskNum := len(job.TaskStatusIndex[api.Running]) if job.Budget.MaxUnavilable != "" { maxUnavilable := tp.parseIntStr(job.Budget.MaxUnavilable, len(job.Tasks)) finalTaskNum := len(job.TaskStatusIndex[api.Succeeded]) + len(job.TaskStatusIndex[api.Failed]) realUnavilable := len(job.Tasks) - finalTaskNum - jobRunningTaskNum if realUnavilable >= maxUnavilable { return 0 } return maxUnavilable - realUnavilable } if job.Budget.MinAvailable != "" { minAvailable := tp.parseIntStr(job.Budget.MinAvailable, len(job.Tasks)) if jobRunningTaskNum >= minAvailable { return jobRunningTaskNum - minAvailable } } return defaultPodEvictNum } func (tp *tdmPlugin) parseIntStr(input string, taskNum int) int { resultValue := 0 tmp := intstr.Parse(input) switch tmp.Type { case intstr.Int: resultValue = tmp.IntValue() case intstr.String: if v, err := intstr.GetValueFromIntOrPercent(&tmp, taskNum, true); err == nil { resultValue = v } else { klog.Warningf("TDM get percent value err: %v", err) } } return resultValue } func (tp *tdmPlugin) revocableNodePreemptableTask(rz string, ssn *framework.Session) map[api.JobID][]*api.TaskInfo { tasksMap := make(map[api.JobID][]*api.TaskInfo) for _, node := range ssn.RevocableNodes { if node.RevocableZone != rz { continue } for _, task := range node.Tasks { if task.Preemptable { if task.Status == api.Running { tasksMap[task.Job] = append(tasksMap[task.Job], task) } } } } return tasksMap } func (tp *tdmPlugin) OnSessionClose(ssn *framework.Session) {}