func()

in pkg/scheduler/plugins/tdm/tdm.go [139:301]


func (tp *tdmPlugin) OnSessionOpen(ssn *framework.Session) {
	klog.V(4).Infof("Enter tdm plugin ...")
	if klog.V(4) {
		defer func() {
			klog.V(4).Infof("Leaving tdm plugin.")
		}()
	}

	// tdm plugin just handle revocable node
	predicateFn := func(task *api.TaskInfo, node *api.NodeInfo) error {
		if node.RevocableZone == "" {
			return nil
		}

		if err := tp.availableRevocableZone(node.RevocableZone); err != nil {
			return fmt.Errorf("plugin %s predicates %w", tp.Name(), err)
		}

		klog.V(4).Infof("TDM node %v revocable zone %v:%v is active", node.Name, node.RevocableZone, tp.revocableZone[node.RevocableZone])

		if len(task.RevocableZone) == 0 {
			msg := fmt.Sprintf("task %s/%s is not allow to dispatch to revocable node %s", task.Namespace, task.Name, node.Name)
			return fmt.Errorf("plugin %s predicates %s", tp.Name(), msg)
		}

		klog.V(4).Infof("TDM filter for Task %s/%s on node %s pass.", task.Namespace, task.Name, node.Name)
		return nil
	}

	// tdm plugin just handle revocable node
	nodeOrderFn := func(task *api.TaskInfo, node *api.NodeInfo) (float64, error) {
		score := 0.0

		if node.RevocableZone == "" {
			return score, nil
		}

		if err := tp.availableRevocableZone(node.RevocableZone); err != nil {
			klog.V(4).Infof("TDM not available %s", err)
			return score, err
		}

		if len(task.RevocableZone) == 0 {
			klog.V(4).Infof("TDM task %s/%s is not allow to dispatch to revocable node %s", task.Namespace, task.Name, node.Name)
			return score, nil
		}

		score = float64(k8sFramework.MaxNodeScore)

		klog.V(4).Infof("TDM score for Task %s/%s on node %s is: %v", task.Namespace, task.Name, node.Name, score)
		return score, nil
	}

	preemptableFn := func(preemptor *api.TaskInfo, preemptees []*api.TaskInfo) ([]*api.TaskInfo, int) {
		// for the preemptable or can use revocablezone workload, they can not preempt other tasks.
		if preemptor.Preemptable || len(preemptor.RevocableZone) > 0 {
			klog.V(4).Infof("TDM task %s/%s is preemptable, do nothing skip", preemptor.Namespace, preemptor.Name)
			return nil, tutil.Reject
		}

		var victims []*api.TaskInfo
		tasksMap := make(map[api.JobID][]*api.TaskInfo)

		// find preemptable tasks which appear on none revocable node
		for _, task := range preemptees {
			if !task.Preemptable || task.Status != api.Running {
				continue
			}

			node, ok := ssn.Nodes[task.NodeName]
			if !ok {
				continue
			}

			if node.RevocableZone != "" {
				continue
			}

			tasksMap[task.Job] = append(tasksMap[task.Job], task)
		}

		for jobID, preemptableTasks := range tasksMap {
			if job, ok := ssn.Jobs[jobID]; ok {
				victims = append(victims, tp.maxVictims(job, preemptableTasks)...)
			}
		}

		klog.V(4).Infof("TDM victims are %+v", victims)

		return victims, tutil.Permit
	}

	victimsFn := func() []*api.TaskInfo {
		if lastEvictAt.Add(tp.evictPeriod).After(time.Now()) {
			klog.V(4).Infof("TDM next evict time at %v", lastEvictAt)
			return nil
		}

		klog.V(4).Infof("TDM start to find victims")

		// find preemptable task on timeout revocable zone node
		victims := make([]*api.TaskInfo, 0)
		for rz := range tp.revocableZone {
			if err := tp.availableRevocableZone(rz); err != nil {
				klog.V(4).Infof("TDM revocable zone %v disactive, %v", rz, err)
				// rz disactive, then evict preemptable tasks by job from the revocable node
				for jobID, preemtableTasks := range tp.revocableNodePreemptableTask(rz, ssn) {
					if job, ok := ssn.Jobs[jobID]; ok {
						victims = append(victims, tp.maxVictims(job, preemtableTasks)...)
					}
				}
			}
		}

		// need to consider concurrency?
		lastEvictAt = time.Now()

		klog.V(4).Infof("TDM got %v victims", len(victims))

		return victims
	}

	jobOrderFn := func(l, r interface{}) int {
		lv := l.(*api.JobInfo)
		rv := r.(*api.JobInfo)

		if lv.Preemptable == rv.Preemptable {
			return 0
		}

		if !lv.Preemptable {
			return -1
		}

		return 1
	}

	jobPipelinedFn := func(obj interface{}) int {
		jobInfo := obj.(*api.JobInfo)
		occupied := jobInfo.WaitingTaskNum() + jobInfo.ReadyTaskNum()
		if occupied >= jobInfo.MinAvailable {
			return tutil.Permit
		}
		return tutil.Reject
	}

	jobStarvingFn := func(obj interface{}) bool {
		jobInfo := obj.(*api.JobInfo)
		// allow none preemptable elastic job (deployment) preempt task
		if jobInfo.Preemptable {
			return false
		}
		return len(jobInfo.TaskStatusIndex[api.Pending]) > 0
	}

	ssn.AddPredicateFn(tp.Name(), predicateFn)
	ssn.AddNodeOrderFn(tp.Name(), nodeOrderFn)
	ssn.AddPreemptableFn(tp.Name(), preemptableFn)
	ssn.AddVictimTasksFns(tp.Name(), victimsFn)
	ssn.AddJobOrderFn(tp.Name(), jobOrderFn)
	ssn.AddJobPipelinedFn(tp.Name(), jobPipelinedFn)
	ssn.AddJobStarvingFns(tp.Name(), jobStarvingFn)
}