in clusterloader2/pkg/measurement/common/loadbalancer_nodesync_latency.go [126:189]
func (s *LoadBalancerNodeSyncMeasurement) measureNodeSyncLatency() error {
ctx := context.Background()
options := metav1.ListOptions{}
s.selector.ApplySelectors(&options)
svcList, err := s.client.CoreV1().Services(s.selector.Namespace).List(ctx, options)
if err != nil {
return err
}
s.lbSvcMap = map[string]v1.Service{}
for _, svc := range svcList.Items {
if svc.Spec.Type == v1.ServiceTypeLoadBalancer {
s.lbSvcMap[keyFunc(svc.Namespace, svc.Name)] = svc
}
}
totalLbSvc := len(s.lbSvcMap)
// Use event informer to keep track of nodeSync events.
stopCh := make(chan struct{})
defer close(stopCh)
eventInformer := s.getEventInformer()
go eventInformer.Run(stopCh)
// trigger node sync by picking a node and add exclude lb label
nodeList, err := s.client.CoreV1().Nodes().List(ctx, metav1.ListOptions{})
if err != nil {
return err
}
for _, node := range nodeList.Items {
if isCandidateNode(node) {
s.excludedNodeName = node.Name
break
}
}
if s.excludedNodeName == "" {
return fmt.Errorf("failed to find a node candidate to trigger nodesync from node list: %v", nodeList.Items)
}
defer func() {
if err = s.labelNodeForLBs(false); err != nil {
klog.Errorf("Failed to label node %v: %v", s.excludedNodeName, err)
}
}()
if err = s.labelNodeForLBs(true); err != nil {
return err
}
now := time.Now()
for key := range s.lbSvcMap {
s.svcNodeSyncLatencyTracker.Set(key, phaseNodeSyncStart, now)
}
return wait.Poll(5*time.Second, s.waitTimeout, func() (done bool, err error) {
if s.svcNodeSyncLatencyTracker.Count(phaseNodeSyncComplete) == totalLbSvc {
return true, nil
}
klog.V(2).Infof("out of a total of %v LBs, %v LB type service has %q event", totalLbSvc, s.svcNodeSyncLatencyTracker.Count(phaseNodeSyncComplete), nodeSyncEventReason)
return false, nil
})
}