in pkg/queue/manager.go [67:107]
func (m *Manager) AddClusterQueue(ctx context.Context, cq *kueue.ClusterQueue) error {
m.Lock()
defer m.Unlock()
if _, ok := m.clusterQueues[cq.Name]; ok {
return errClusterQueueAlreadyExists
}
cqImpl, err := newClusterQueue(cq)
if err != nil {
return err
}
m.clusterQueues[cq.Name] = cqImpl
cohort := cq.Spec.Cohort
if cohort != "" {
m.addCohort(cohort, cq.Name)
}
// Iterate through existing queues, as queues corresponding to this cluster
// queue might have been added earlier.
var queues kueue.LocalQueueList
if err := m.client.List(ctx, &queues, client.MatchingFields{utilindexer.QueueClusterQueueKey: cq.Name}); err != nil {
return fmt.Errorf("listing queues pointing to the cluster queue: %w", err)
}
addedWorkloads := false
for _, q := range queues.Items {
qImpl := m.localQueues[Key(&q)]
if qImpl != nil {
added := cqImpl.AddFromLocalQueue(qImpl)
addedWorkloads = addedWorkloads || added
}
}
queued := m.queueAllInadmissibleWorkloadsInCohort(ctx, cqImpl)
m.reportPendingWorkloads(cq.Name, cqImpl)
if queued || addedWorkloads {
m.Broadcast()
}
return nil
}