func NewControlledPodsIndexer()

in clusterloader2/pkg/measurement/util/controlled_pods_indexer.go [75:142]


func NewControlledPodsIndexer(podsInformer coreinformers.PodInformer, rsInformer appsinformers.ReplicaSetInformer) (*ControlledPodsIndexer, error) {
	if err := podsInformer.Informer().AddIndexers(cache.Indexers{controllerUIDIndex: controllerUIDIndexFunc}); err != nil {
		return nil, fmt.Errorf("failed to register indexer: %w", err)
	}

	// We need a separate storage from rsInformer as we postpone deletion until all pods are removed.
	rsIndexer := cache.NewIndexer(deletionHandlingUIDKeyFunc, cache.Indexers{controllerUIDIndex: controllerUIDIndexFunc})

	cpi := &ControlledPodsIndexer{
		podsIndexer:       podsInformer.Informer().GetIndexer(),
		podsSynced:        podsInformer.Informer().HasSynced,
		rsIndexer:         rsIndexer,
		rsSynced:          rsInformer.Informer().HasSynced,
		rsPendingDeletion: make(map[types.UID]bool),
	}

	podsInformer.Informer().AddEventHandler(
		cache.ResourceEventHandlerFuncs{
			UpdateFunc: func(oldObj, newObj interface{}) {
				oldOwnerUID, _ := getControllerInfo(oldObj)
				newOwnerUID, _ := getControllerInfo(newObj)
				if oldOwnerUID == newOwnerUID {
					return
				}

				cpi.lock.Lock()
				defer cpi.lock.Unlock()
				if err := cpi.clearRSDataIfPossibleLocked(oldOwnerUID); err != nil {
					klog.Errorf("error while deleting %v: %v", oldOwnerUID, err)
				}
			},
			DeleteFunc: func(obj interface{}) {
				ownerUID, _ := getControllerInfo(obj)

				cpi.lock.Lock()
				defer cpi.lock.Unlock()
				if err := cpi.clearRSDataIfPossibleLocked(ownerUID); err != nil {
					klog.Errorf("error while deleting %v: %v", ownerUID, err)
				}
			},
		},
	)
	rsInformer.Informer().AddEventHandler(
		cache.ResourceEventHandlerFuncs{
			AddFunc: func(obj interface{}) {
				if err := rsIndexer.Add(obj); err != nil {
					klog.Errorf("error while adding %v: %v", obj, err)
				}
			},
			UpdateFunc: func(_, newObj interface{}) {
				if err := rsIndexer.Update(newObj); err != nil {
					klog.Errorf("error while updating %v: %v", newObj, err)
				}
			},
			DeleteFunc: func(obj interface{}) {
				rsUID := getObjUID(obj)
				cpi.lock.Lock()
				defer cpi.lock.Unlock()
				cpi.rsPendingDeletion[rsUID] = true
				if err := cpi.clearRSDataIfPossibleLocked(rsUID); err != nil {
					klog.Errorf("error while deleting %v: %v", rsUID, err)
				}
			},
		},
	)

	return cpi, nil
}