func()

in pkg/queue/manager.go [67:107]


func (m *Manager) AddClusterQueue(ctx context.Context, cq *kueue.ClusterQueue) error {
	m.Lock()
	defer m.Unlock()

	if _, ok := m.clusterQueues[cq.Name]; ok {
		return errClusterQueueAlreadyExists
	}

	cqImpl, err := newClusterQueue(cq)
	if err != nil {
		return err
	}
	m.clusterQueues[cq.Name] = cqImpl

	cohort := cq.Spec.Cohort
	if cohort != "" {
		m.addCohort(cohort, cq.Name)
	}

	// Iterate through existing queues, as queues corresponding to this cluster
	// queue might have been added earlier.
	var queues kueue.LocalQueueList
	if err := m.client.List(ctx, &queues, client.MatchingFields{utilindexer.QueueClusterQueueKey: cq.Name}); err != nil {
		return fmt.Errorf("listing queues pointing to the cluster queue: %w", err)
	}
	addedWorkloads := false
	for _, q := range queues.Items {
		qImpl := m.localQueues[Key(&q)]
		if qImpl != nil {
			added := cqImpl.AddFromLocalQueue(qImpl)
			addedWorkloads = addedWorkloads || added
		}
	}

	queued := m.queueAllInadmissibleWorkloadsInCohort(ctx, cqImpl)
	m.reportPendingWorkloads(cq.Name, cqImpl)
	if queued || addedWorkloads {
		m.Broadcast()
	}
	return nil
}