in pkg/scheduler/plugins/tdm/tdm.go [139:301]
func (tp *tdmPlugin) OnSessionOpen(ssn *framework.Session) {
klog.V(4).Infof("Enter tdm plugin ...")
if klog.V(4) {
defer func() {
klog.V(4).Infof("Leaving tdm plugin.")
}()
}
// tdm plugin just handle revocable node
predicateFn := func(task *api.TaskInfo, node *api.NodeInfo) error {
if node.RevocableZone == "" {
return nil
}
if err := tp.availableRevocableZone(node.RevocableZone); err != nil {
return fmt.Errorf("plugin %s predicates %w", tp.Name(), err)
}
klog.V(4).Infof("TDM node %v revocable zone %v:%v is active", node.Name, node.RevocableZone, tp.revocableZone[node.RevocableZone])
if len(task.RevocableZone) == 0 {
msg := fmt.Sprintf("task %s/%s is not allow to dispatch to revocable node %s", task.Namespace, task.Name, node.Name)
return fmt.Errorf("plugin %s predicates %s", tp.Name(), msg)
}
klog.V(4).Infof("TDM filter for Task %s/%s on node %s pass.", task.Namespace, task.Name, node.Name)
return nil
}
// tdm plugin just handle revocable node
nodeOrderFn := func(task *api.TaskInfo, node *api.NodeInfo) (float64, error) {
score := 0.0
if node.RevocableZone == "" {
return score, nil
}
if err := tp.availableRevocableZone(node.RevocableZone); err != nil {
klog.V(4).Infof("TDM not available %s", err)
return score, err
}
if len(task.RevocableZone) == 0 {
klog.V(4).Infof("TDM task %s/%s is not allow to dispatch to revocable node %s", task.Namespace, task.Name, node.Name)
return score, nil
}
score = float64(k8sFramework.MaxNodeScore)
klog.V(4).Infof("TDM score for Task %s/%s on node %s is: %v", task.Namespace, task.Name, node.Name, score)
return score, nil
}
preemptableFn := func(preemptor *api.TaskInfo, preemptees []*api.TaskInfo) ([]*api.TaskInfo, int) {
// for the preemptable or can use revocablezone workload, they can not preempt other tasks.
if preemptor.Preemptable || len(preemptor.RevocableZone) > 0 {
klog.V(4).Infof("TDM task %s/%s is preemptable, do nothing skip", preemptor.Namespace, preemptor.Name)
return nil, tutil.Reject
}
var victims []*api.TaskInfo
tasksMap := make(map[api.JobID][]*api.TaskInfo)
// find preemptable tasks which appear on none revocable node
for _, task := range preemptees {
if !task.Preemptable || task.Status != api.Running {
continue
}
node, ok := ssn.Nodes[task.NodeName]
if !ok {
continue
}
if node.RevocableZone != "" {
continue
}
tasksMap[task.Job] = append(tasksMap[task.Job], task)
}
for jobID, preemptableTasks := range tasksMap {
if job, ok := ssn.Jobs[jobID]; ok {
victims = append(victims, tp.maxVictims(job, preemptableTasks)...)
}
}
klog.V(4).Infof("TDM victims are %+v", victims)
return victims, tutil.Permit
}
victimsFn := func() []*api.TaskInfo {
if lastEvictAt.Add(tp.evictPeriod).After(time.Now()) {
klog.V(4).Infof("TDM next evict time at %v", lastEvictAt)
return nil
}
klog.V(4).Infof("TDM start to find victims")
// find preemptable task on timeout revocable zone node
victims := make([]*api.TaskInfo, 0)
for rz := range tp.revocableZone {
if err := tp.availableRevocableZone(rz); err != nil {
klog.V(4).Infof("TDM revocable zone %v disactive, %v", rz, err)
// rz disactive, then evict preemptable tasks by job from the revocable node
for jobID, preemtableTasks := range tp.revocableNodePreemptableTask(rz, ssn) {
if job, ok := ssn.Jobs[jobID]; ok {
victims = append(victims, tp.maxVictims(job, preemtableTasks)...)
}
}
}
}
// need to consider concurrency?
lastEvictAt = time.Now()
klog.V(4).Infof("TDM got %v victims", len(victims))
return victims
}
jobOrderFn := func(l, r interface{}) int {
lv := l.(*api.JobInfo)
rv := r.(*api.JobInfo)
if lv.Preemptable == rv.Preemptable {
return 0
}
if !lv.Preemptable {
return -1
}
return 1
}
jobPipelinedFn := func(obj interface{}) int {
jobInfo := obj.(*api.JobInfo)
occupied := jobInfo.WaitingTaskNum() + jobInfo.ReadyTaskNum()
if occupied >= jobInfo.MinAvailable {
return tutil.Permit
}
return tutil.Reject
}
jobStarvingFn := func(obj interface{}) bool {
jobInfo := obj.(*api.JobInfo)
// allow none preemptable elastic job (deployment) preempt task
if jobInfo.Preemptable {
return false
}
return len(jobInfo.TaskStatusIndex[api.Pending]) > 0
}
ssn.AddPredicateFn(tp.Name(), predicateFn)
ssn.AddNodeOrderFn(tp.Name(), nodeOrderFn)
ssn.AddPreemptableFn(tp.Name(), preemptableFn)
ssn.AddVictimTasksFns(tp.Name(), victimsFn)
ssn.AddJobOrderFn(tp.Name(), jobOrderFn)
ssn.AddJobPipelinedFn(tp.Name(), jobPipelinedFn)
ssn.AddJobStarvingFns(tp.Name(), jobStarvingFn)
}