in pkg/scheduler/scheduler.go [110:209]
func (s *Scheduler) schedule(ctx context.Context) {
log := ctrl.LoggerFrom(ctx)
// 1. Get the heads from the queues, including their desired clusterQueue.
// This operation blocks while the queues are empty.
headWorkloads := s.queues.Heads(ctx)
// No elements means the program is finishing.
if len(headWorkloads) == 0 {
return
}
startTime := time.Now()
// 2. Take a snapshot of the cache.
snapshot := s.cache.Snapshot()
// 3. Calculate requirements (resource flavors, borrowing) for admitting workloads.
entries := s.nominate(ctx, headWorkloads, snapshot)
// 4. Sort entries based on borrowing and timestamps.
sort.Sort(entryOrdering(entries))
// 5. Admit entries, ensuring that no more than one workload gets
// admitted by a cohort (if borrowing).
// This is because there can be other workloads deeper in a clusterQueue whose
// head got admitted that should be scheduled in the cohort before the heads
// of other clusterQueues.
usedCohorts := sets.New[string]()
for i := range entries {
e := &entries[i]
if e.assignment.RepresentativeMode() == flavorassigner.NoFit {
continue
}
cq := snapshot.ClusterQueues[e.ClusterQueue]
if cq.Cohort != nil {
// Having more than one workloads from the same cohort admitted in the same scheduling cycle can lead
// to over admission if:
// 1. One of the workloads is borrowing, since during the nomination the usage of the other workloads
// evaluated in the same cycle is not taken into account.
// 2. An already admitted workload from a different cluster queue is borrowing, since all workloads
// evaluated in the current cycle will compete for the resources that are not borrowed.
if usedCohorts.Has(cq.Cohort.Name) && (e.assignment.Borrows() || cq.Cohort.HasBorrowingQueues()) {
e.status = skipped
e.inadmissibleMsg = "other workloads in the cohort were prioritized"
continue
}
// Even if there was a failure, we shouldn't admit other workloads to this
// cohort.
usedCohorts.Insert(cq.Cohort.Name)
}
log := log.WithValues("workload", klog.KObj(e.Obj), "clusterQueue", klog.KRef("", e.ClusterQueue))
ctx := ctrl.LoggerInto(ctx, log)
if e.assignment.RepresentativeMode() != flavorassigner.Fit {
if len(e.preemptionTargets) != 0 {
preempted, err := s.preemptor.IssuePreemptions(ctx, e.preemptionTargets, cq)
if err != nil {
log.Error(err, "Failed to preempt workloads")
}
if preempted != 0 {
e.inadmissibleMsg += fmt.Sprintf(". Pending the preemption of %d workload(s)", preempted)
e.requeueReason = queue.RequeueReasonPendingPreemption
}
} else {
log.V(2).Info("Workload requires preemption, but there are no candidate workloads allowed for preemption", "preemptionReclaimWithinCohort", cq.Preemption.ReclaimWithinCohort, "preemptionWithinClusterQueue", cq.Preemption.WithinClusterQueue)
}
continue
}
if !s.cache.PodsReadyForAllAdmittedWorkloads(ctx) {
log.V(5).Info("Waiting for all admitted workloads to be in the PodsReady condition")
// If WaitForPodsReady is enabled and WaitForPodsReady.BlockAdmission is true
// Block admission until all currently admitted workloads are in
// PodsReady condition if the waitForPodsReady is enabled
workload.UnsetAdmissionWithCondition(e.Obj, "Waiting", "waiting for all admitted workloads to be in PodsReady condition")
if err := workload.ApplyAdmissionStatus(ctx, s.client, e.Obj, true); err != nil {
log.Error(err, "Could not update Workload status")
}
s.cache.WaitForPodsReady(ctx)
log.V(5).Info("Finished waiting for all admitted workloads to be in the PodsReady condition")
}
e.status = nominated
if err := s.admit(ctx, e); err != nil {
e.inadmissibleMsg = fmt.Sprintf("Failed to admit workload: %v", err)
}
}
// 6. Requeue the heads that were not scheduled.
result := metrics.AdmissionResultInadmissible
for _, e := range entries {
log.V(3).Info("Workload evaluated for admission",
"workload", klog.KObj(e.Obj),
"clusterQueue", klog.KRef("", e.ClusterQueue),
"status", e.status,
"reason", e.inadmissibleMsg)
if e.status != assumed {
s.requeueAndUpdate(log, ctx, e)
} else {
result = metrics.AdmissionResultSuccess
}
}
metrics.AdmissionAttempt(result, time.Since(startTime))
}