clusterloader2/pkg/measurement/common/api_availability_measurement.go (239 lines of code) (raw):

/* Copyright 2020 The Kubernetes Authors. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ package common import ( "context" "fmt" "net/http" "strconv" "sync" "time" clientset "k8s.io/client-go/kubernetes" "k8s.io/klog/v2" "k8s.io/perf-tests/clusterloader2/pkg/errors" "k8s.io/perf-tests/clusterloader2/pkg/execservice" "k8s.io/perf-tests/clusterloader2/pkg/measurement" "k8s.io/perf-tests/clusterloader2/pkg/provider" "k8s.io/perf-tests/clusterloader2/pkg/util" ) const ( apiAvailabilityMeasurementName = "APIAvailability" ) func init() { if err := measurement.Register(apiAvailabilityMeasurementName, createAPIAvailabilityMeasurement); err != nil { klog.Fatalf("Cannot register %s: %v", apiAvailabilityMeasurementName, err) } } func createAPIAvailabilityMeasurement() measurement.Measurement { return &apiAvailabilityMeasurement{} } type apiAvailabilityMeasurement struct { isRunning bool isPaused bool pauseCh chan struct{} unpauseCh chan struct{} stopCh chan struct{} pollFrequency time.Duration hostIPs []string summaries []measurement.Summary clusterLevelMetrics *apiAvailabilityMetrics threshold float64 // Metrics per host internal IP. hostLevelMetrics map[string]*apiAvailabilityMetrics hostPollTimeoutSeconds int wg sync.WaitGroup lock sync.Mutex } func (a *apiAvailabilityMeasurement) updateHostAvailabilityMetrics(c clientset.Interface, provider provider.Provider) { wg := sync.WaitGroup{} wg.Add(len(a.hostIPs)) mu := sync.Mutex{} for _, ip := range a.hostIPs { ip := ip go func() { defer wg.Done() statusCode, err := a.pollHost(ip) availability := statusCode == strconv.Itoa(http.StatusOK) if err != nil { klog.Warningf("execservice issue: %s", err.Error()) } if !availability { klog.Warningf("host %s not available; HTTP status code: %s", ip, statusCode) } mu.Lock() defer mu.Unlock() a.hostLevelMetrics[ip].update(availability) }() } wg.Wait() } func (a *apiAvailabilityMeasurement) pollHost(hostIP string) (string, error) { pod, err := execservice.GetPod() if err != nil { return "", fmt.Errorf("problem with GetPod(): %w", err) } cmd := fmt.Sprintf("curl --connect-timeout %d -s -k -w \"%%{http_code}\" -o /dev/null https://%s:443/readyz", a.hostPollTimeoutSeconds, hostIP) output, err := execservice.RunCommand(pod, cmd) if err != nil { return "", fmt.Errorf("problem with RunCommand(): %w", err) } return output, nil } func (a *apiAvailabilityMeasurement) updateClusterAvailabilityMetrics(c clientset.Interface) { result := c.CoreV1().RESTClient().Get().AbsPath("/readyz").Do(context.Background()) status := 0 result.StatusCode(&status) availability := status == http.StatusOK if !availability { klog.Warningf("cluster not available; HTTP status code: %d", status) } a.clusterLevelMetrics.update(availability) } func (a *apiAvailabilityMeasurement) Execute(config *measurement.Config) ([]measurement.Summary, error) { action, err := util.GetString(config.Params, "action") if err != nil { return nil, err } a.lock.Lock() defer a.lock.Unlock() switch action { case "start": return nil, a.start(config) case "pause": a.pause() return nil, nil case "unpause": a.unpause() return nil, nil case "gather": return a.gather() default: return nil, fmt.Errorf("unknown action %v", action) } } func (a *apiAvailabilityMeasurement) start(config *measurement.Config) error { if a.isRunning { klog.V(2).Infof("%s: measurement already running", a) return nil } if err := a.initFields(config); err != nil { return err } k8sClient := config.ClusterFramework.GetClientSets().GetClient() provider := config.ClusterFramework.GetClusterConfig().Provider a.wg.Add(1) go func() { defer a.wg.Done() for { if a.isPaused { select { case <-a.unpauseCh: a.isPaused = false case <-a.stopCh: return } } select { case <-a.pauseCh: a.isPaused = true case <-time.After(a.pollFrequency): a.updateClusterAvailabilityMetrics(k8sClient) if a.hostLevelAvailabilityEnabled() { a.updateHostAvailabilityMetrics(k8sClient, provider) } case <-a.stopCh: return } } }() return nil } func (a *apiAvailabilityMeasurement) initFields(config *measurement.Config) error { a.isRunning = true a.stopCh = make(chan struct{}) a.pauseCh = make(chan struct{}) a.unpauseCh = make(chan struct{}) frequency, err := util.GetDuration(config.Params, "pollFrequency") if err != nil { return err } a.pollFrequency = frequency threshold, err := util.GetFloat64OrDefault(config.Params, "threshold", 0.0) if err != nil { return err } a.threshold = threshold a.clusterLevelMetrics = &apiAvailabilityMetrics{} if config.ClusterLoaderConfig.ExecServiceConfig.Enable { a.hostIPs = config.ClusterFramework.GetClusterConfig().MasterInternalIPs if len(a.hostIPs) == 0 { klog.V(2).Infof("%s: host internal IP(s) are not provided, therefore only cluster-level availability will be measured", a) return nil } a.hostLevelMetrics = map[string]*apiAvailabilityMetrics{} for _, ip := range a.hostIPs { a.hostLevelMetrics[ip] = &apiAvailabilityMetrics{} } hostPollTimeoutSeconds, err := util.GetIntOrDefault(config.Params, "hostPollTimeoutSeconds", 5) if err != nil { return err } a.hostPollTimeoutSeconds = hostPollTimeoutSeconds } else { klog.V(2).Infof("%s: exec service is not enabled, therefore only cluster-level availability will be measured", a) } return nil } func (a *apiAvailabilityMeasurement) hostLevelAvailabilityEnabled() bool { return len(a.hostLevelMetrics) > 0 } func (a *apiAvailabilityMeasurement) pause() { if !a.isRunning { klog.V(2).Infof("%s: measurement is not running", a) return } if a.isPaused { klog.Warningf("%s: measurement already paused", a) return } a.pauseCh <- struct{}{} klog.V(2).Infof("%s: pausing the measurement (stopping checking the availability)", a) } func (a *apiAvailabilityMeasurement) unpause() { if !a.isRunning { klog.V(2).Infof("%s: measurement is not running", a) return } if !a.isPaused { klog.Warningf("%s: measurement already unpaused", a) return } a.unpauseCh <- struct{}{} klog.V(2).Infof("%s: unpausing the measurement", a) } func (a *apiAvailabilityMeasurement) gather() ([]measurement.Summary, error) { if !a.isRunning { return nil, nil } close(a.stopCh) a.wg.Wait() a.isRunning = false klog.V(2).Infof("%s: gathering summaries", apiAvailabilityMeasurementName) output := apiAvailabilityOutput{ ClusterSummary: createClusterSummary(a.clusterLevelMetrics, a.pollFrequency), } if a.hostLevelAvailabilityEnabled() { output.HostSummaries = createHostSummary(a.hostLevelMetrics, a.hostIPs, a.pollFrequency) } content, err := util.PrettyPrintJSON(output) if err != nil { return nil, err } summary := measurement.CreateSummary(apiAvailabilityMeasurementName, "json", content) a.summaries = append(a.summaries, summary) if sli := output.ClusterSummary.AvailabilityPercentage; sli < a.threshold { err = errors.NewMetricViolationError("API availability", fmt.Sprintf("SLO not fulfilled (expected >= %.2f, got: %.2f)", a.threshold, sli)) } return a.summaries, err } func (a *apiAvailabilityMeasurement) Dispose() {} func (a *apiAvailabilityMeasurement) String() string { return apiAvailabilityMeasurementName }