func()

in pkg/scheduler/scheduler.go [110:209]


func (s *Scheduler) schedule(ctx context.Context) {
	log := ctrl.LoggerFrom(ctx)

	// 1. Get the heads from the queues, including their desired clusterQueue.
	// This operation blocks while the queues are empty.
	headWorkloads := s.queues.Heads(ctx)
	// No elements means the program is finishing.
	if len(headWorkloads) == 0 {
		return
	}
	startTime := time.Now()

	// 2. Take a snapshot of the cache.
	snapshot := s.cache.Snapshot()

	// 3. Calculate requirements (resource flavors, borrowing) for admitting workloads.
	entries := s.nominate(ctx, headWorkloads, snapshot)

	// 4. Sort entries based on borrowing and timestamps.
	sort.Sort(entryOrdering(entries))

	// 5. Admit entries, ensuring that no more than one workload gets
	// admitted by a cohort (if borrowing).
	// This is because there can be other workloads deeper in a clusterQueue whose
	// head got admitted that should be scheduled in the cohort before the heads
	// of other clusterQueues.
	usedCohorts := sets.New[string]()
	for i := range entries {
		e := &entries[i]
		if e.assignment.RepresentativeMode() == flavorassigner.NoFit {
			continue
		}
		cq := snapshot.ClusterQueues[e.ClusterQueue]
		if cq.Cohort != nil {
			// Having more than one workloads from the same cohort admitted in the same scheduling cycle can lead
			// to over admission if:
			//   1. One of the workloads is borrowing, since during the nomination the usage of the other workloads
			//      evaluated in the same cycle is not taken into account.
			//   2. An already admitted workload from a different cluster queue is borrowing, since all workloads
			//      evaluated in the current cycle will compete for the resources that are not borrowed.
			if usedCohorts.Has(cq.Cohort.Name) && (e.assignment.Borrows() || cq.Cohort.HasBorrowingQueues()) {
				e.status = skipped
				e.inadmissibleMsg = "other workloads in the cohort were prioritized"
				continue
			}
			// Even if there was a failure, we shouldn't admit other workloads to this
			// cohort.
			usedCohorts.Insert(cq.Cohort.Name)
		}
		log := log.WithValues("workload", klog.KObj(e.Obj), "clusterQueue", klog.KRef("", e.ClusterQueue))
		ctx := ctrl.LoggerInto(ctx, log)
		if e.assignment.RepresentativeMode() != flavorassigner.Fit {
			if len(e.preemptionTargets) != 0 {
				preempted, err := s.preemptor.IssuePreemptions(ctx, e.preemptionTargets, cq)
				if err != nil {
					log.Error(err, "Failed to preempt workloads")
				}
				if preempted != 0 {
					e.inadmissibleMsg += fmt.Sprintf(". Pending the preemption of %d workload(s)", preempted)
					e.requeueReason = queue.RequeueReasonPendingPreemption
				}
			} else {
				log.V(2).Info("Workload requires preemption, but there are no candidate workloads allowed for preemption", "preemptionReclaimWithinCohort", cq.Preemption.ReclaimWithinCohort, "preemptionWithinClusterQueue", cq.Preemption.WithinClusterQueue)
			}
			continue
		}
		if !s.cache.PodsReadyForAllAdmittedWorkloads(ctx) {
			log.V(5).Info("Waiting for all admitted workloads to be in the PodsReady condition")
			// If WaitForPodsReady is enabled and WaitForPodsReady.BlockAdmission is true
			// Block admission until all currently admitted workloads are in
			// PodsReady condition if the waitForPodsReady is enabled
			workload.UnsetAdmissionWithCondition(e.Obj, "Waiting", "waiting for all admitted workloads to be in PodsReady condition")
			if err := workload.ApplyAdmissionStatus(ctx, s.client, e.Obj, true); err != nil {
				log.Error(err, "Could not update Workload status")
			}
			s.cache.WaitForPodsReady(ctx)
			log.V(5).Info("Finished waiting for all admitted workloads to be in the PodsReady condition")
		}
		e.status = nominated
		if err := s.admit(ctx, e); err != nil {
			e.inadmissibleMsg = fmt.Sprintf("Failed to admit workload: %v", err)
		}
	}

	// 6. Requeue the heads that were not scheduled.
	result := metrics.AdmissionResultInadmissible
	for _, e := range entries {
		log.V(3).Info("Workload evaluated for admission",
			"workload", klog.KObj(e.Obj),
			"clusterQueue", klog.KRef("", e.ClusterQueue),
			"status", e.status,
			"reason", e.inadmissibleMsg)
		if e.status != assumed {
			s.requeueAndUpdate(log, ctx, e)
		} else {
			result = metrics.AdmissionResultSuccess
		}
	}
	metrics.AdmissionAttempt(result, time.Since(startTime))
}