in pkg/queue/manager.go [150:178]
func (m *Manager) AddLocalQueue(ctx context.Context, q *kueue.LocalQueue) error {
m.Lock()
defer m.Unlock()
key := Key(q)
if _, ok := m.localQueues[key]; ok {
return fmt.Errorf("queue %q already exists", q.Name)
}
qImpl := newLocalQueue(q)
m.localQueues[key] = qImpl
// Iterate through existing workloads, as workloads corresponding to this
// queue might have been added earlier.
var workloads kueue.WorkloadList
if err := m.client.List(ctx, &workloads, client.MatchingFields{utilindexer.WorkloadQueueKey: q.Name}, client.InNamespace(q.Namespace)); err != nil {
return fmt.Errorf("listing workloads that match the queue: %w", err)
}
for _, w := range workloads.Items {
w := w
if workload.IsAdmitted(&w) {
continue
}
qImpl.AddOrUpdate(workload.NewInfo(&w))
}
cq := m.clusterQueues[qImpl.ClusterQueue]
if cq != nil && cq.AddFromLocalQueue(qImpl) {
m.Broadcast()
}
return nil
}