func()

in clusterloader2/pkg/measurement/common/loadbalancer_nodesync_latency.go [126:189]


func (s *LoadBalancerNodeSyncMeasurement) measureNodeSyncLatency() error {
	ctx := context.Background()
	options := metav1.ListOptions{}
	s.selector.ApplySelectors(&options)
	svcList, err := s.client.CoreV1().Services(s.selector.Namespace).List(ctx, options)
	if err != nil {
		return err
	}

	s.lbSvcMap = map[string]v1.Service{}
	for _, svc := range svcList.Items {
		if svc.Spec.Type == v1.ServiceTypeLoadBalancer {
			s.lbSvcMap[keyFunc(svc.Namespace, svc.Name)] = svc
		}
	}
	totalLbSvc := len(s.lbSvcMap)

	// Use event informer to keep track of nodeSync events.
	stopCh := make(chan struct{})
	defer close(stopCh)

	eventInformer := s.getEventInformer()
	go eventInformer.Run(stopCh)

	// trigger node sync by picking a node and add exclude lb label
	nodeList, err := s.client.CoreV1().Nodes().List(ctx, metav1.ListOptions{})
	if err != nil {
		return err
	}

	for _, node := range nodeList.Items {
		if isCandidateNode(node) {
			s.excludedNodeName = node.Name
			break
		}
	}

	if s.excludedNodeName == "" {
		return fmt.Errorf("failed to find a node candidate to trigger nodesync from node list: %v", nodeList.Items)
	}

	defer func() {
		if err = s.labelNodeForLBs(false); err != nil {
			klog.Errorf("Failed to label node %v: %v", s.excludedNodeName, err)
		}

	}()
	if err = s.labelNodeForLBs(true); err != nil {
		return err
	}

	now := time.Now()
	for key := range s.lbSvcMap {
		s.svcNodeSyncLatencyTracker.Set(key, phaseNodeSyncStart, now)
	}

	return wait.Poll(5*time.Second, s.waitTimeout, func() (done bool, err error) {
		if s.svcNodeSyncLatencyTracker.Count(phaseNodeSyncComplete) == totalLbSvc {
			return true, nil
		}
		klog.V(2).Infof("out of a total of %v LBs, %v LB type service has %q event", totalLbSvc, s.svcNodeSyncLatencyTracker.Count(phaseNodeSyncComplete), nodeSyncEventReason)
		return false, nil
	})
}