in pkg/scheduler/preemption/preemption.go [117:144]
func (p *Preemptor) IssuePreemptions(ctx context.Context, targets []*workload.Info, cq *cache.ClusterQueue) (int, error) {
log := ctrl.LoggerFrom(ctx)
errCh := routine.NewErrorChannel()
ctx, cancel := context.WithCancel(ctx)
var successfullyPreempted int64
defer cancel()
workqueue.ParallelizeUntil(ctx, parallelPreemptions, len(targets), func(i int) {
target := targets[i]
if !apimeta.IsStatusConditionTrue(target.Obj.Status.Conditions, kueue.WorkloadEvicted) {
err := p.applyPreemption(ctx, target.Obj)
if err != nil {
errCh.SendErrorWithCancel(err, cancel)
return
}
origin := "ClusterQueue"
if cq.Name != target.ClusterQueue {
origin = "cohort"
}
log.V(3).Info("Preempted", "targetWorkload", klog.KObj(target.Obj))
p.recorder.Eventf(target.Obj, corev1.EventTypeNormal, "Preempted", "Preempted by another workload in the %s", origin)
} else {
log.V(3).Info("Preemption ongoing", "targetWorkload", klog.KObj(target.Obj))
}
atomic.AddInt64(&successfullyPreempted, 1)
})
return int(successfullyPreempted), errCh.ReceiveError()
}