func()

in pkg/scheduler/preemption/preemption.go [117:144]


func (p *Preemptor) IssuePreemptions(ctx context.Context, targets []*workload.Info, cq *cache.ClusterQueue) (int, error) {
	log := ctrl.LoggerFrom(ctx)
	errCh := routine.NewErrorChannel()
	ctx, cancel := context.WithCancel(ctx)
	var successfullyPreempted int64
	defer cancel()
	workqueue.ParallelizeUntil(ctx, parallelPreemptions, len(targets), func(i int) {
		target := targets[i]
		if !apimeta.IsStatusConditionTrue(target.Obj.Status.Conditions, kueue.WorkloadEvicted) {
			err := p.applyPreemption(ctx, target.Obj)
			if err != nil {
				errCh.SendErrorWithCancel(err, cancel)
				return
			}

			origin := "ClusterQueue"
			if cq.Name != target.ClusterQueue {
				origin = "cohort"
			}
			log.V(3).Info("Preempted", "targetWorkload", klog.KObj(target.Obj))
			p.recorder.Eventf(target.Obj, corev1.EventTypeNormal, "Preempted", "Preempted by another workload in the %s", origin)
		} else {
			log.V(3).Info("Preemption ongoing", "targetWorkload", klog.KObj(target.Obj))
		}
		atomic.AddInt64(&successfullyPreempted, 1)
	})
	return int(successfullyPreempted), errCh.ReceiveError()
}