in pkg/scheduler/preemption/preemption.go [226:275]
func findCandidates(wl *kueue.Workload, cq *cache.ClusterQueue, resPerFlv resourcesPerFlavor) []*workload.Info {
var candidates []*workload.Info
wlPriority := priority.Priority(wl)
if cq.Preemption.WithinClusterQueue != kueue.PreemptionPolicyNever {
considerSamePrio := (cq.Preemption.WithinClusterQueue == kueue.PreemptionPolicyLowerOrNewerEqualPriority)
preemptorTS := workload.GetQueueOrderTimestamp(wl)
for _, candidateWl := range cq.Workloads {
candidatePriority := priority.Priority(candidateWl.Obj)
if candidatePriority > wlPriority {
continue
}
if candidatePriority == wlPriority && !(considerSamePrio && preemptorTS.Before(workload.GetQueueOrderTimestamp(candidateWl.Obj))) {
continue
}
if !workloadUsesResources(candidateWl, resPerFlv) {
continue
}
candidates = append(candidates, candidateWl)
}
}
if cq.Cohort != nil && cq.Preemption.ReclaimWithinCohort != kueue.PreemptionPolicyNever {
cqs := cq.Cohort.Members
cqs.Delete(cq)
for cohortCQ := range cqs {
if !cqIsBorrowing(cohortCQ, resPerFlv) {
// Can't reclaim quota from ClusterQueues that are not borrowing.
continue
}
onlyLowerPrio := true
if cq.Preemption.ReclaimWithinCohort == kueue.PreemptionPolicyAny {
onlyLowerPrio = false
}
for _, candidateWl := range cohortCQ.Workloads {
if onlyLowerPrio && priority.Priority(candidateWl.Obj) >= priority.Priority(wl) {
continue
}
if !workloadUsesResources(candidateWl, resPerFlv) {
continue
}
candidates = append(candidates, candidateWl)
}
}
}
return candidates
}