in pkg/scheduler/actions/preempt/preempt.go [193:272]
func preempt(
ssn *framework.Session,
stmt *framework.Statement,
preemptor *api.TaskInfo,
filter func(*api.TaskInfo) bool,
predicateHelper util.PredicateHelper,
) (bool, error) {
assigned := false
allNodes := ssn.NodeList
predicateNodes, _ := predicateHelper.PredicateNodes(preemptor, allNodes, ssn.PredicateFn)
nodeScores := util.PrioritizeNodes(preemptor, predicateNodes, ssn.BatchNodeOrderFn, ssn.NodeOrderMapFn, ssn.NodeOrderReduceFn)
selectedNodes := util.SortNodes(nodeScores)
for _, node := range selectedNodes {
klog.V(3).Infof("Considering Task <%s/%s> on Node <%s>.",
preemptor.Namespace, preemptor.Name, node.Name)
var preemptees []*api.TaskInfo
for _, task := range node.Tasks {
if filter == nil {
preemptees = append(preemptees, task.Clone())
} else if filter(task) {
preemptees = append(preemptees, task.Clone())
}
}
victims := ssn.Preemptable(preemptor, preemptees)
metrics.UpdatePreemptionVictimsCount(len(victims))
if err := util.ValidateVictims(preemptor, node, victims); err != nil {
klog.V(3).Infof("No validated victims on Node <%s>: %v", node.Name, err)
continue
}
victimsQueue := util.NewPriorityQueue(func(l, r interface{}) bool {
return !ssn.TaskOrderFn(l, r)
})
for _, victim := range victims {
victimsQueue.Push(victim)
}
// Preempt victims for tasks, pick lowest priority task first.
preempted := api.EmptyResource()
for !victimsQueue.Empty() {
// If reclaimed enough resources, break loop to avoid Sub panic.
if preemptor.InitResreq.LessEqual(node.FutureIdle(), api.Zero) {
break
}
preemptee := victimsQueue.Pop().(*api.TaskInfo)
klog.V(3).Infof("Try to preempt Task <%s/%s> for Task <%s/%s>",
preemptee.Namespace, preemptee.Name, preemptor.Namespace, preemptor.Name)
if err := stmt.Evict(preemptee, "preempt"); err != nil {
klog.Errorf("Failed to preempt Task <%s/%s> for Task <%s/%s>: %v",
preemptee.Namespace, preemptee.Name, preemptor.Namespace, preemptor.Name, err)
continue
}
preempted.Add(preemptee.Resreq)
}
metrics.RegisterPreemptionAttempts()
klog.V(3).Infof("Preempted <%v> for Task <%s/%s> requested <%v>.",
preempted, preemptor.Namespace, preemptor.Name, preemptor.InitResreq)
if preemptor.InitResreq.LessEqual(node.FutureIdle(), api.Zero) {
if err := stmt.Pipeline(preemptor, node.Name); err != nil {
klog.Errorf("Failed to pipeline Task <%s/%s> on Node <%s>",
preemptor.Namespace, preemptor.Name, node.Name)
}
// Ignore pipeline error, will be corrected in next scheduling loop.
assigned = true
break
}
}
return assigned, nil
}