clusterloader2/pkg/measurement/common/scheduler_latency.go (264 lines of code) (raw):

/* Copyright 2018 The Kubernetes Authors. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ package common import ( "context" "fmt" "math" "strings" "time" "github.com/prometheus/common/model" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" clientset "k8s.io/client-go/kubernetes" "k8s.io/klog/v2" schedulermetric "k8s.io/kubernetes/pkg/scheduler/metrics" "k8s.io/perf-tests/clusterloader2/pkg/measurement" measurementutil "k8s.io/perf-tests/clusterloader2/pkg/measurement/util" "k8s.io/perf-tests/clusterloader2/pkg/provider" "k8s.io/perf-tests/clusterloader2/pkg/util" ) const ( schedulerLatencyMetricName = "SchedulingMetrics" e2eSchedulingDurationMetricName = model.LabelValue(schedulermetric.SchedulerSubsystem + "_e2e_scheduling_duration_seconds_bucket") schedulingAlgorithmDurationMetricName = model.LabelValue(schedulermetric.SchedulerSubsystem + "_scheduling_algorithm_duration_seconds_bucket") frameworkExtensionPointDurationMetricName = model.LabelValue(schedulermetric.SchedulerSubsystem + "_framework_extension_point_duration_seconds_bucket") preemptionEvaluationMetricName = model.LabelValue(schedulermetric.SchedulerSubsystem + "_scheduling_algorithm_preemption_evaluation_seconds_bucket") singleRestCallTimeout = 5 * time.Minute // kubeSchedulerPort is the default port for the scheduler status server. kubeSchedulerPort = 10259 ) var ( extentionsPoints = []string{ "PreFilter", "Filter", "PostFilter", "PreScore", "Score", "PreBind", "Bind", "PostBind", "Reserve", "Unreserve", "Permit", } ) func init() { if err := measurement.Register(schedulerLatencyMetricName, createSchedulerLatencyMeasurement); err != nil { klog.Fatalf("Cannot register %s: %v", schedulerLatencyMetricName, err) } } func createSchedulerLatencyMeasurement() measurement.Measurement { return &schedulerLatencyMeasurement{} } type schedulerLatencyMeasurement struct { initialLatency schedulerLatencyMetrics } type schedulerLatencyMetrics struct { e2eSchedulingDurationHist *measurementutil.Histogram schedulingAlgorithmDurationHist *measurementutil.Histogram preemptionEvaluationHist *measurementutil.Histogram frameworkExtensionPointDurationHist map[string]*measurementutil.Histogram } // Execute supports two actions: // - reset - Resets latency data on api scheduler side. // - gather - Gathers and prints current scheduler latency data. func (s *schedulerLatencyMeasurement) Execute(config *measurement.Config) ([]measurement.Summary, error) { provider := config.ClusterFramework.GetClusterConfig().Provider SSHToMasterSupported := provider.Features().SupportSSHToMaster c := config.ClusterFramework.GetClientSets().GetClient() nodes, err := c.CoreV1().Nodes().List(context.TODO(), metav1.ListOptions{}) if err != nil { return nil, err } var masterRegistered = false for _, node := range nodes.Items { if util.LegacyIsMasterNode(&node) { masterRegistered = true } } if provider.Features().SchedulerInsecurePortDisabled || (!SSHToMasterSupported && !masterRegistered) { klog.Warningf("unable to fetch scheduler metrics for provider: %s", provider.Name()) return nil, nil } action, err := util.GetString(config.Params, "action") if err != nil { return nil, err } masterIP, err := util.GetStringOrDefault(config.Params, "masterIP", config.ClusterFramework.GetClusterConfig().GetMasterIP()) if err != nil { return nil, err } masterName, err := util.GetStringOrDefault(config.Params, "masterName", config.ClusterFramework.GetClusterConfig().MasterName) if err != nil { return nil, err } switch action { case "reset": klog.V(2).Infof("%s: start collecting latency initial metrics in scheduler...", s) return nil, s.getSchedulingInitialLatency(config.ClusterFramework.GetClientSets().GetClient(), masterIP, provider, masterName, masterRegistered) case "start": klog.V(2).Infof("%s: start collecting latency metrics in scheduler...", s) return nil, s.getSchedulingInitialLatency(config.ClusterFramework.GetClientSets().GetClient(), masterIP, provider, masterName, masterRegistered) case "gather": klog.V(2).Infof("%s: gathering latency metrics in scheduler...", s) return s.getSchedulingLatency(config.ClusterFramework.GetClientSets().GetClient(), masterIP, provider, masterName, masterRegistered) default: return nil, fmt.Errorf("unknown action %v", action) } } // Dispose cleans up after the measurement. func (*schedulerLatencyMeasurement) Dispose() {} // String returns string representation of this measurement. func (*schedulerLatencyMeasurement) String() string { return schedulerLatencyMetricName } // HistogramSub is a helper function to substract two histograms func HistogramSub(finalHist, initialHist *measurementutil.Histogram) *measurementutil.Histogram { for k := range finalHist.Buckets { finalHist.Buckets[k] = finalHist.Buckets[k] - initialHist.Buckets[k] } return finalHist } func (m *schedulerLatencyMetrics) substract(sub schedulerLatencyMetrics) { if sub.preemptionEvaluationHist != nil { m.preemptionEvaluationHist = HistogramSub(m.preemptionEvaluationHist, sub.preemptionEvaluationHist) } if sub.schedulingAlgorithmDurationHist != nil { m.schedulingAlgorithmDurationHist = HistogramSub(m.schedulingAlgorithmDurationHist, sub.schedulingAlgorithmDurationHist) } if sub.e2eSchedulingDurationHist != nil { m.e2eSchedulingDurationHist = HistogramSub(m.e2eSchedulingDurationHist, sub.e2eSchedulingDurationHist) } for _, ep := range extentionsPoints { if sub.frameworkExtensionPointDurationHist[ep] != nil { m.frameworkExtensionPointDurationHist[ep] = HistogramSub(m.frameworkExtensionPointDurationHist[ep], sub.frameworkExtensionPointDurationHist[ep]) } } } func (s *schedulerLatencyMeasurement) setQuantiles(metrics schedulerLatencyMetrics) (schedulingMetrics, error) { result := schedulingMetrics{ FrameworkExtensionPointDuration: make(map[string]*measurementutil.LatencyMetric), } for _, ePoint := range extentionsPoints { result.FrameworkExtensionPointDuration[ePoint] = &measurementutil.LatencyMetric{} } if err := SetQuantileFromHistogram(&result.E2eSchedulingLatency, metrics.e2eSchedulingDurationHist); err != nil { return result, err } if err := SetQuantileFromHistogram(&result.SchedulingLatency, metrics.schedulingAlgorithmDurationHist); err != nil { return result, err } for _, ePoint := range extentionsPoints { if err := SetQuantileFromHistogram(result.FrameworkExtensionPointDuration[ePoint], metrics.frameworkExtensionPointDurationHist[ePoint]); err != nil { return result, err } } if err := SetQuantileFromHistogram(&result.PreemptionEvaluationLatency, metrics.preemptionEvaluationHist); err != nil { return result, err } return result, nil } // getSchedulingLatency retrieves scheduler latency metrics. func (s *schedulerLatencyMeasurement) getSchedulingLatency(c clientset.Interface, host string, provider provider.Provider, masterName string, masterRegistered bool) ([]measurement.Summary, error) { schedulerMetrics, err := s.getSchedulingMetrics(c, host, provider, masterName, masterRegistered) if err != nil { return nil, err } schedulerMetrics.substract(s.initialLatency) result, err := s.setQuantiles(schedulerMetrics) if err != nil { return nil, err } content, err := util.PrettyPrintJSON(result) if err != nil { return nil, err } summary := measurement.CreateSummary(schedulerLatencyMetricName, "json", content) return []measurement.Summary{summary}, nil } // getSchedulingInitialLatency retrieves initial values of scheduler latency metrics func (s *schedulerLatencyMeasurement) getSchedulingInitialLatency(c clientset.Interface, host string, provider provider.Provider, masterName string, masterRegistered bool) error { var err error s.initialLatency, err = s.getSchedulingMetrics(c, host, provider, masterName, masterRegistered) if err != nil { return err } return nil } // getSchedulingMetrics gets scheduler latency metrics func (s *schedulerLatencyMeasurement) getSchedulingMetrics(c clientset.Interface, host string, provider provider.Provider, masterName string, masterRegistered bool) (schedulerLatencyMetrics, error) { e2eSchedulingDurationHist := measurementutil.NewHistogram(nil) schedulingAlgorithmDurationHist := measurementutil.NewHistogram(nil) preemptionEvaluationHist := measurementutil.NewHistogram(nil) frameworkExtensionPointDurationHist := make(map[string]*measurementutil.Histogram) latencyMetrics := schedulerLatencyMetrics{ e2eSchedulingDurationHist, schedulingAlgorithmDurationHist, preemptionEvaluationHist, frameworkExtensionPointDurationHist} for _, ePoint := range extentionsPoints { frameworkExtensionPointDurationHist[ePoint] = measurementutil.NewHistogram(nil) } data, err := s.sendRequestToScheduler(c, "GET", host, provider, masterName, masterRegistered) if err != nil { return latencyMetrics, err } samples, err := measurementutil.ExtractMetricSamples(data) if err != nil { return latencyMetrics, err } for _, sample := range samples { switch sample.Metric[model.MetricNameLabel] { case e2eSchedulingDurationMetricName: measurementutil.ConvertSampleToHistogram(sample, e2eSchedulingDurationHist) case schedulingAlgorithmDurationMetricName: measurementutil.ConvertSampleToHistogram(sample, schedulingAlgorithmDurationHist) case frameworkExtensionPointDurationMetricName: ePoint := string(sample.Metric["extension_point"]) if _, exists := frameworkExtensionPointDurationHist[ePoint]; exists { measurementutil.ConvertSampleToHistogram(sample, frameworkExtensionPointDurationHist[ePoint]) } case preemptionEvaluationMetricName: measurementutil.ConvertSampleToHistogram(sample, preemptionEvaluationHist) } } return latencyMetrics, nil } // SetQuantileFromHistogram sets quantile of LatencyMetric from Histogram func SetQuantileFromHistogram(metric *measurementutil.LatencyMetric, hist *measurementutil.Histogram) error { quantiles := []float64{0.5, 0.9, 0.99} for _, quantile := range quantiles { histQuantile, err := hist.Quantile(quantile) if err != nil { return err } // NaN is returned only when there are less than two buckets. // In which case all quantiles are NaN and all latency metrics are untouched. if !math.IsNaN(histQuantile) { metric.SetQuantile(quantile, time.Duration(int64(histQuantile*float64(time.Second)))) } } return nil } // sendRequestToScheduler sends request to kube scheduler metrics func (s *schedulerLatencyMeasurement) sendRequestToScheduler(c clientset.Interface, op, host string, provider provider.Provider, masterName string, masterRegistered bool) (string, error) { opUpper := strings.ToUpper(op) if opUpper != "GET" && opUpper != "DELETE" { return "", fmt.Errorf("unknown REST request") } var responseText string if masterRegistered { ctx, cancel := context.WithTimeout(context.Background(), singleRestCallTimeout) defer cancel() body, err := c.CoreV1().RESTClient().Verb(opUpper). Namespace(metav1.NamespaceSystem). Resource("pods"). Name(fmt.Sprintf("https:kube-scheduler-%v:%v", masterName, kubeSchedulerPort)). SubResource("proxy"). Suffix("metrics"). Do(ctx).Raw() if err != nil { klog.Errorf("Send request to scheduler failed with err: %v", err) return "", err } responseText = string(body) } else { cmd := "curl -X " + opUpper + " -k https://localhost:10259/metrics" sshResult, err := measurementutil.SSH(cmd, host+":22", provider) if err != nil || sshResult.Code != 0 { return "", fmt.Errorf("unexpected error (code: %d) in ssh connection to master: %#v", sshResult.Code, err) } responseText = sshResult.Stdout } return responseText, nil } type schedulingMetrics struct { FrameworkExtensionPointDuration map[string]*measurementutil.LatencyMetric `json:"frameworkExtensionPointDuration"` PreemptionEvaluationLatency measurementutil.LatencyMetric `json:"preemptionEvaluationLatency"` E2eSchedulingLatency measurementutil.LatencyMetric `json:"e2eSchedulingLatency"` // To track scheduling latency without binding, this allows to easier present the ceiling of the scheduler throughput. SchedulingLatency measurementutil.LatencyMetric `json:"schedulingLatency"` }