in pkg/scheduler/actions/enqueue/enqueue.go [43:103]
func (enqueue *Action) Execute(ssn *framework.Session) {
klog.V(3).Infof("Enter Enqueue ...")
defer klog.V(3).Infof("Leaving Enqueue ...")
queues := util.NewPriorityQueue(ssn.QueueOrderFn)
queueMap := map[api.QueueID]*api.QueueInfo{}
jobsMap := map[api.QueueID]*util.PriorityQueue{}
for _, job := range ssn.Jobs {
if job.ScheduleStartTimestamp.IsZero() {
ssn.Jobs[job.UID].ScheduleStartTimestamp = metav1.Time{
Time: time.Now(),
}
}
if queue, found := ssn.Queues[job.Queue]; !found {
klog.Errorf("Failed to find Queue <%s> for Job <%s/%s>",
job.Queue, job.Namespace, job.Name)
continue
} else if _, existed := queueMap[queue.UID]; !existed {
klog.V(5).Infof("Added Queue <%s> for Job <%s/%s>",
queue.Name, job.Namespace, job.Name)
queueMap[queue.UID] = queue
queues.Push(queue)
}
if job.IsPending() {
if _, found := jobsMap[job.Queue]; !found {
jobsMap[job.Queue] = util.NewPriorityQueue(ssn.JobOrderFn)
}
klog.V(5).Infof("Added Job <%s/%s> into Queue <%s>", job.Namespace, job.Name, job.Queue)
jobsMap[job.Queue].Push(job)
}
}
klog.V(3).Infof("Try to enqueue PodGroup to %d Queues", len(jobsMap))
for {
if queues.Empty() {
break
}
queue := queues.Pop().(*api.QueueInfo)
// Found "high" priority job
jobs, found := jobsMap[queue.UID]
if !found || jobs.Empty() {
continue
}
job := jobs.Pop().(*api.JobInfo)
if job.PodGroup.Spec.MinResources == nil || ssn.JobEnqueueable(job) {
ssn.JobEnqueued(job)
job.PodGroup.Status.Phase = scheduling.PodGroupInqueue
ssn.Jobs[job.UID] = job
}
// Added Queue back until no job in Queue.
queues.Push(queue)
}
}