clusterloader2/pkg/measurement/common/service_creation_latency.go (306 lines of code) (raw):
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package common
import (
"context"
"fmt"
"sync"
"time"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/equality"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/watch"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
"k8s.io/klog/v2"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/perf-tests/clusterloader2/pkg/execservice"
"k8s.io/perf-tests/clusterloader2/pkg/measurement"
measurementutil "k8s.io/perf-tests/clusterloader2/pkg/measurement/util"
"k8s.io/perf-tests/clusterloader2/pkg/measurement/util/checker"
"k8s.io/perf-tests/clusterloader2/pkg/measurement/util/informer"
"k8s.io/perf-tests/clusterloader2/pkg/measurement/util/workerqueue"
"k8s.io/perf-tests/clusterloader2/pkg/util"
)
const (
serviceCreationLatencyName = "ServiceCreationLatency"
serviceCreationLatencyWorkers = 10
defaultServiceCreationLatencyTimeout = 10 * time.Minute
defaultCheckInterval = 10 * time.Second
pingBackoff = 1 * time.Second
pingChecks = 10
creatingPhase = "creating"
ipAssigningPhase = "ipAssigning"
reachabilityPhase = "reachability"
)
func init() {
if err := measurement.Register(serviceCreationLatencyName, createServiceCreationLatencyMeasurement); err != nil {
klog.Fatalf("cant register service %v", err)
}
}
func createServiceCreationLatencyMeasurement() measurement.Measurement {
return &serviceCreationLatencyMeasurement{
selector: util.NewObjectSelector(),
queue: workerqueue.NewWorkerQueue(serviceCreationLatencyWorkers),
creationTimes: measurementutil.NewObjectTransitionTimes(serviceCreationLatencyName),
pingCheckers: checker.NewMap(),
}
}
type serviceCreationLatencyMeasurement struct {
selector *util.ObjectSelector
waitTimeout time.Duration
stopCh chan struct{}
isRunning bool
queue workerqueue.Interface
client clientset.Interface
creationTimes *measurementutil.ObjectTransitionTimes
pingCheckers checker.Map
lock sync.Mutex
}
// Execute executes service startup latency measurement actions.
// Services can be specified by field and/or label selectors.
// If namespace is not passed by parameter, all-namespace scope is assumed.
// "start" action starts observation of the services.
// "waitForReady" waits until all services are reachable.
// "gather" returns service created latency summary.
// This measurement only works for services with ClusterIP, NodePort and LoadBalancer type.
func (s *serviceCreationLatencyMeasurement) Execute(config *measurement.Config) ([]measurement.Summary, error) {
s.client = config.ClusterFramework.GetClientSets().GetClient()
action, err := util.GetString(config.Params, "action")
if err != nil {
return nil, err
}
if !config.ClusterLoaderConfig.ExecServiceConfig.Enable {
return nil, fmt.Errorf("enable-exec-service flag not enabled")
}
switch action {
case "start":
if err := s.selector.Parse(config.Params); err != nil {
return nil, err
}
s.waitTimeout, err = util.GetDurationOrDefault(config.Params, "waitTimeout", defaultServiceCreationLatencyTimeout)
if err != nil {
return nil, err
}
return nil, s.start()
case "waitForReady":
return nil, s.waitForReady()
case "gather":
return s.gather(config.Identifier)
default:
return nil, fmt.Errorf("unknown action %v", action)
}
}
// Dispose cleans up after the measurement.
func (s *serviceCreationLatencyMeasurement) Dispose() {
if s.isRunning {
s.isRunning = false
close(s.stopCh)
}
s.queue.Stop()
s.lock.Lock()
defer s.lock.Unlock()
s.pingCheckers.Dispose()
}
// String returns a string representation of the metric.
func (s *serviceCreationLatencyMeasurement) String() string {
return serviceCreationLatencyName + ": " + s.selector.String()
}
func (s *serviceCreationLatencyMeasurement) start() error {
if s.isRunning {
klog.V(2).Infof("%s: service creation latency measurement already running", s)
return nil
}
klog.V(2).Infof("%s: starting service creation latency measurement...", s)
s.isRunning = true
s.stopCh = make(chan struct{})
i := informer.NewInformer(
&cache.ListWatch{
ListFunc: func(options v1.ListOptions) (runtime.Object, error) {
s.selector.ApplySelectors(&options)
return s.client.CoreV1().Services(s.selector.Namespace).List(context.TODO(), options)
},
WatchFunc: func(options v1.ListOptions) (watch.Interface, error) {
s.selector.ApplySelectors(&options)
return s.client.CoreV1().Services(s.selector.Namespace).Watch(context.TODO(), options)
},
},
func(oldObj, newObj interface{}) {
f := func() {
s.handleObject(oldObj, newObj)
}
s.queue.Add(&f)
},
)
return informer.StartAndSync(i, s.stopCh, informerSyncTimeout)
}
func (s *serviceCreationLatencyMeasurement) waitForReady() error {
return wait.Poll(defaultCheckInterval, s.waitTimeout, func() (bool, error) {
for _, svcType := range []corev1.ServiceType{corev1.ServiceTypeClusterIP, corev1.ServiceTypeNodePort, corev1.ServiceTypeLoadBalancer} {
reachable := s.creationTimes.Count(phaseName(reachabilityPhase, svcType))
created := s.creationTimes.Count(phaseName(creatingPhase, svcType))
klog.V(2).Infof("%s type %s: %d created, %d reachable", s, svcType, created, reachable)
if created != reachable {
return false, nil
}
}
return true, nil
})
}
var serviceCreationTransitions = map[string]measurementutil.Transition{
"create_to_available_clusterip": {
From: phaseName(creatingPhase, corev1.ServiceTypeClusterIP),
To: phaseName(reachabilityPhase, corev1.ServiceTypeClusterIP),
},
"create_to_available_nodeport": {
From: phaseName(creatingPhase, corev1.ServiceTypeNodePort),
To: phaseName(reachabilityPhase, corev1.ServiceTypeNodePort),
},
"create_to_assigned_loadbalancer": {
From: phaseName(creatingPhase, corev1.ServiceTypeLoadBalancer),
To: phaseName(ipAssigningPhase, corev1.ServiceTypeLoadBalancer),
},
"assigned_to_available_loadbalancer": {
From: phaseName(ipAssigningPhase, corev1.ServiceTypeLoadBalancer),
To: phaseName(reachabilityPhase, corev1.ServiceTypeLoadBalancer),
},
"create_to_available_loadbalancer": {
From: phaseName(creatingPhase, corev1.ServiceTypeLoadBalancer),
To: phaseName(reachabilityPhase, corev1.ServiceTypeLoadBalancer),
},
}
func (s *serviceCreationLatencyMeasurement) gather(identifier string) ([]measurement.Summary, error) {
klog.V(2).Infof("%s: gathering service created latency measurement...", s)
if !s.isRunning {
return nil, fmt.Errorf("metric %s has not been started", s)
}
// NOTE: For ClusterIP or NodePort type of service, the cluster ip or node port is assigned as part of service creation API call, so the ipAssigning phase is no sense.
serviceCreationLatency := s.creationTimes.CalculateTransitionsLatency(serviceCreationTransitions, measurementutil.MatchAll)
content, err := util.PrettyPrintJSON(measurementutil.LatencyMapToPerfData(serviceCreationLatency))
if err != nil {
return nil, err
}
summary := measurement.CreateSummary(fmt.Sprintf("%s_%s", serviceCreationLatencyName, identifier), "json", content)
return []measurement.Summary{summary}, nil
}
func (s *serviceCreationLatencyMeasurement) handleObject(oldObj, newObj interface{}) {
var oldService *corev1.Service
var newService *corev1.Service
var ok bool
oldService, ok = oldObj.(*corev1.Service)
if oldObj != nil && !ok {
klog.Errorf("%s: uncastable old object: %v", s, oldObj)
return
}
newService, ok = newObj.(*corev1.Service)
if newObj != nil && !ok {
klog.Errorf("%s: uncastable new object: %v", s, newObj)
return
}
if isEqual := oldService != nil &&
newService != nil &&
equality.Semantic.DeepEqual(oldService.Spec, newService.Spec) &&
equality.Semantic.DeepEqual(oldService.Status, newService.Status); isEqual {
return
}
// TODO(#680): Make it thread-safe.
if !s.isRunning {
return
}
if newObj == nil {
if err := s.deleteObject(oldService); err != nil {
klog.Errorf("%s: delete checker error: %v", s, err)
}
return
}
if err := s.updateObject(newService); err != nil {
klog.Errorf("%s: create checker error: %v", s, err)
}
}
func (s *serviceCreationLatencyMeasurement) deleteObject(svc *corev1.Service) error {
key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(svc)
if err != nil {
return fmt.Errorf("meta key created error: %v", err)
}
s.lock.Lock()
defer s.lock.Unlock()
s.pingCheckers.DeleteAndStop(key)
return nil
}
func (s *serviceCreationLatencyMeasurement) updateObject(svc *corev1.Service) error {
// This measurement only works for services with ClusterIP, NodePort and LoadBalancer type.
if svc.Spec.Type != corev1.ServiceTypeClusterIP && svc.Spec.Type != corev1.ServiceTypeNodePort && svc.Spec.Type != corev1.ServiceTypeLoadBalancer {
return nil
}
key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(svc)
if err != nil {
return fmt.Errorf("meta key created error: %v", err)
}
if _, exists := s.creationTimes.Get(key, phaseName(creatingPhase, svc.Spec.Type)); !exists {
s.creationTimes.Set(key, phaseName(creatingPhase, svc.Spec.Type), svc.CreationTimestamp.Time)
}
if svc.Spec.Type == corev1.ServiceTypeLoadBalancer && len(svc.Status.LoadBalancer.Ingress) < 1 {
return nil
}
// NOTE: For ClusterIP or NodePort type of service, the cluster ip or node port is assigned as part of service creation API call, so the ipAssigning phase is no sense.
if svc.Spec.Type == corev1.ServiceTypeLoadBalancer {
if _, exists := s.creationTimes.Get(key, phaseName(ipAssigningPhase, svc.Spec.Type)); exists {
return nil
}
s.creationTimes.Set(key, phaseName(ipAssigningPhase, svc.Spec.Type), time.Now())
}
pc := &pingChecker{
callerName: s.String(),
svc: svc,
creationTimes: s.creationTimes,
stopCh: make(chan struct{}),
}
pc.run()
s.lock.Lock()
defer s.lock.Unlock()
s.pingCheckers.Add(key, pc)
return nil
}
func phaseName(phase string, serviceType corev1.ServiceType) string {
return fmt.Sprintf("%s_%s", phase, serviceType)
}
type pingChecker struct {
callerName string
svc *corev1.Service
creationTimes *measurementutil.ObjectTransitionTimes
stopCh chan struct{}
}
func (p *pingChecker) run() {
key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(p.svc)
if err != nil {
klog.Errorf("%s: meta key created error: %v", p.callerName, err)
return
}
success := 0
for {
select {
case <-p.stopCh:
return
default:
if _, exists := p.creationTimes.Get(key, phaseName(reachabilityPhase, p.svc.Spec.Type)); exists {
return
}
// TODO(#685): Make ping checks less communication heavy.
pod, err := execservice.GetPod()
if err != nil {
klog.Warningf("call to execservice.GetPod() ended with error: %v", err)
success = 0
time.Sleep(pingBackoff)
continue
}
switch p.svc.Spec.Type {
case corev1.ServiceTypeClusterIP:
cmd := fmt.Sprintf("curl %s:%d", p.svc.Spec.ClusterIP, p.svc.Spec.Ports[0].Port)
_, err = execservice.RunCommand(pod, cmd)
case corev1.ServiceTypeNodePort:
cmd := fmt.Sprintf("curl %s:%d", pod.Status.HostIP, p.svc.Spec.Ports[0].NodePort)
_, err = execservice.RunCommand(pod, cmd)
case corev1.ServiceTypeLoadBalancer:
cmd := fmt.Sprintf("curl %s:%d", p.svc.Status.LoadBalancer.Ingress[0].IP, p.svc.Spec.Ports[0].Port)
_, err = execservice.RunCommand(pod, cmd)
}
if err != nil {
success = 0
time.Sleep(pingBackoff)
continue
}
success++
if success == pingChecks {
p.creationTimes.Set(key, phaseName(reachabilityPhase, p.svc.Spec.Type), time.Now())
}
}
}
}
func (p *pingChecker) Stop() {
close(p.stopCh)
}