in pkg/scheduler/actions/preempt/preempt.go [40:189]
func (pmpt *Action) Execute(ssn *framework.Session) {
klog.V(3).Infof("Enter Preempt ...")
defer klog.V(3).Infof("Leaving Preempt ...")
preemptorsMap := map[api.QueueID]*util.PriorityQueue{}
preemptorTasks := map[api.JobID]*util.PriorityQueue{}
var underRequest []*api.JobInfo
queues := map[api.QueueID]*api.QueueInfo{}
for _, job := range ssn.Jobs {
if job.IsPending() {
continue
}
if vr := ssn.JobValid(job); vr != nil && !vr.Pass {
klog.V(4).Infof("Job <%s/%s> Queue <%s> skip preemption, reason: %v, message %v", job.Namespace, job.Name, job.Queue, vr.Reason, vr.Message)
continue
}
if queue, found := ssn.Queues[job.Queue]; !found {
continue
} else if _, existed := queues[queue.UID]; !existed {
klog.V(3).Infof("Added Queue <%s> for Job <%s/%s>",
queue.Name, job.Namespace, job.Name)
queues[queue.UID] = queue
}
// check job if starting for more resources.
if ssn.JobStarving(job) {
if _, found := preemptorsMap[job.Queue]; !found {
preemptorsMap[job.Queue] = util.NewPriorityQueue(ssn.JobOrderFn)
}
preemptorsMap[job.Queue].Push(job)
underRequest = append(underRequest, job)
preemptorTasks[job.UID] = util.NewPriorityQueue(ssn.TaskOrderFn)
for _, task := range job.TaskStatusIndex[api.Pending] {
preemptorTasks[job.UID].Push(task)
}
}
}
ph := util.NewPredicateHelper()
// Preemption between Jobs within Queue.
for _, queue := range queues {
for {
preemptors := preemptorsMap[queue.UID]
// If no preemptors, no preemption.
if preemptors == nil || preemptors.Empty() {
klog.V(4).Infof("No preemptors in Queue <%s>, break.", queue.Name)
break
}
preemptorJob := preemptors.Pop().(*api.JobInfo)
stmt := framework.NewStatement(ssn)
assigned := false
for {
// If job is not request more resource, then stop preempting.
if !ssn.JobStarving(preemptorJob) {
break
}
// If not preemptor tasks, next job.
if preemptorTasks[preemptorJob.UID].Empty() {
klog.V(3).Infof("No preemptor task in job <%s/%s>.",
preemptorJob.Namespace, preemptorJob.Name)
break
}
preemptor := preemptorTasks[preemptorJob.UID].Pop().(*api.TaskInfo)
if preempted, _ := preempt(ssn, stmt, preemptor, func(task *api.TaskInfo) bool {
// Ignore non running task.
if task.Status != api.Running {
return false
}
// Ignore task with empty resource request.
if task.Resreq.IsEmpty() {
return false
}
job, found := ssn.Jobs[task.Job]
if !found {
return false
}
// Preempt other jobs within queue
return job.Queue == preemptorJob.Queue && preemptor.Job != task.Job
}, ph); preempted {
assigned = true
}
}
// Commit changes only if job is pipelined, otherwise try next job.
if ssn.JobPipelined(preemptorJob) {
stmt.Commit()
} else {
stmt.Discard()
continue
}
if assigned {
preemptors.Push(preemptorJob)
}
}
// Preemption between Task within Job.
for _, job := range underRequest {
// Fix: preemptor numbers lose when in same job
preemptorTasks[job.UID] = util.NewPriorityQueue(ssn.TaskOrderFn)
for _, task := range job.TaskStatusIndex[api.Pending] {
preemptorTasks[job.UID].Push(task)
}
for {
if _, found := preemptorTasks[job.UID]; !found {
break
}
if preemptorTasks[job.UID].Empty() {
break
}
preemptor := preemptorTasks[job.UID].Pop().(*api.TaskInfo)
stmt := framework.NewStatement(ssn)
assigned, _ := preempt(ssn, stmt, preemptor, func(task *api.TaskInfo) bool {
// Ignore non running task.
if task.Status != api.Running {
return false
}
// Ignore task with empty resource request.
if task.Resreq.IsEmpty() {
return false
}
// Preempt tasks within job.
return preemptor.Job == task.Job
}, ph)
stmt.Commit()
// If no preemption, next job.
if !assigned {
break
}
}
}
}
// call victimTasksFn to evict tasks
victimTasks(ssn)
}