in pkg/controllers/queue/queue_controller.go [97:163]
func (c *queuecontroller) Initialize(opt *framework.ControllerOption) error {
c.vcClient = opt.VolcanoClient
c.kubeClient = opt.KubeClient
factory := informerfactory.NewSharedInformerFactory(c.vcClient, 0)
queueInformer := factory.Scheduling().V1beta1().Queues()
pgInformer := factory.Scheduling().V1beta1().PodGroups()
eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartLogging(klog.Infof)
eventBroadcaster.StartRecordingToSink(&corev1.EventSinkImpl{Interface: c.kubeClient.CoreV1().Events("")})
c.queueInformer = queueInformer
c.pgInformer = pgInformer
c.queueLister = queueInformer.Lister()
c.queueSynced = queueInformer.Informer().HasSynced
c.pgLister = pgInformer.Lister()
c.pgSynced = pgInformer.Informer().HasSynced
c.queue = workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter())
c.commandQueue = workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter())
c.podGroups = make(map[string]map[string]struct{})
c.recorder = eventBroadcaster.NewRecorder(versionedscheme.Scheme, v1.EventSource{Component: "vc-controller-manager"})
c.maxRequeueNum = opt.MaxRequeueNum
if c.maxRequeueNum < 0 {
c.maxRequeueNum = -1
}
queueInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: c.addQueue,
UpdateFunc: c.updateQueue,
DeleteFunc: c.deleteQueue,
})
pgInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: c.addPodGroup,
UpdateFunc: c.updatePodGroup,
DeleteFunc: c.deletePodGroup,
})
c.cmdInformer = informerfactory.NewSharedInformerFactory(c.vcClient, 0).Bus().V1alpha1().Commands()
c.cmdInformer.Informer().AddEventHandler(cache.FilteringResourceEventHandler{
FilterFunc: func(obj interface{}) bool {
switch v := obj.(type) {
case *busv1alpha1.Command:
return IsQueueReference(v.TargetObject)
default:
return false
}
},
Handler: cache.ResourceEventHandlerFuncs{
AddFunc: c.addCommand,
},
})
c.cmdLister = c.cmdInformer.Lister()
c.cmdSynced = c.cmdInformer.Informer().HasSynced
queuestate.SyncQueue = c.syncQueue
queuestate.OpenQueue = c.openQueue
queuestate.CloseQueue = c.closeQueue
c.syncHandler = c.handleQueue
c.syncCommandHandler = c.handleCommand
c.enqueueQueue = c.enqueue
return nil
}