clusterloader2/pkg/measurement/common/etcd_metrics.go (204 lines of code) (raw):
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package common
import (
"fmt"
"math"
"os"
"sync"
"time"
"github.com/prometheus/common/model"
"k8s.io/klog/v2"
"k8s.io/perf-tests/clusterloader2/pkg/measurement"
measurementutil "k8s.io/perf-tests/clusterloader2/pkg/measurement/util"
"k8s.io/perf-tests/clusterloader2/pkg/provider"
"k8s.io/perf-tests/clusterloader2/pkg/util"
)
const (
etcdMetricsMetricName = "EtcdMetrics"
)
func init() {
if err := measurement.Register(etcdMetricsMetricName, createEtcdMetricsMeasurement); err != nil {
klog.Fatalf("Cannot register %s: %v", etcdMetricsMetricName, err)
}
}
func createEtcdMetricsMeasurement() measurement.Measurement {
return &etcdMetricsMeasurement{
stopCh: make(chan struct{}),
wg: &sync.WaitGroup{},
metrics: newEtcdMetrics(),
}
}
type etcdMetricsMeasurement struct {
sync.Mutex
isRunning bool
stopCh chan struct{}
wg *sync.WaitGroup
metrics *etcdMetrics
}
// Execute supports two actions:
// - start - Starts collecting etcd metrics.
// - gather - Gathers and prints etcd metrics summary.
func (e *etcdMetricsMeasurement) Execute(config *measurement.Config) ([]measurement.Summary, error) {
provider := config.ClusterFramework.GetClusterConfig().Provider
// Etcd is only exposed on localhost level. We are using ssh method
if !provider.Features().SupportSSHToMaster {
klog.Warningf("not grabbing etcd metrics through master SSH: unsupported for provider, %s", config.ClusterFramework.GetClusterConfig().Provider.Name())
return nil, nil
}
action, err := util.GetString(config.Params, "action")
if err != nil {
return nil, err
}
hosts := config.ClusterFramework.GetClusterConfig().MasterIPs
if len(hosts) < 1 {
klog.Warningf("ETCD measurements will be disabled due to no MasterIps: %v", hosts)
return nil, nil
}
etcdInsecurePort := config.ClusterFramework.GetClusterConfig().EtcdInsecurePort
switch action {
case "start":
klog.V(2).Infof("%s: starting etcd metrics collecting...", e)
waitTime, err := util.GetDurationOrDefault(config.Params, "waitTime", time.Minute)
if err != nil {
return nil, err
}
for _, h := range hosts {
e.startCollecting(h, provider, waitTime, etcdInsecurePort)
}
return nil, nil
case "gather":
for _, h := range hosts {
if err = e.stopAndSummarize(h, provider, etcdInsecurePort); err != nil {
return nil, err
}
}
content, err := util.PrettyPrintJSON(e.metrics)
if err != nil {
return nil, err
}
summary := measurement.CreateSummary(etcdMetricsMetricName, "json", content)
return []measurement.Summary{summary}, nil
default:
return nil, fmt.Errorf("unknown action %v", action)
}
}
// Dispose cleans up after the measurement.
func (e *etcdMetricsMeasurement) Dispose() {
if e.isRunning {
e.isRunning = false
close(e.stopCh)
e.wg.Wait()
}
}
func (e *etcdMetricsMeasurement) String() string {
return etcdMetricsMetricName
}
func (e *etcdMetricsMeasurement) startCollecting(host string, provider provider.Provider, interval time.Duration, port int) {
e.isRunning = true
e.wg.Add(1)
collectEtcdDatabaseSize := func() error {
dbSize, err := e.getEtcdDatabaseSize(host, provider, port)
if err != nil {
return err
}
e.Lock()
defer e.Unlock()
e.metrics.MaxDatabaseSize = math.Max(e.metrics.MaxDatabaseSize, dbSize)
return nil
}
go func() {
defer e.wg.Done()
for {
select {
case <-time.After(interval):
err := collectEtcdDatabaseSize()
if err != nil {
klog.Errorf("%s: failed to collect etcd database size", e)
continue
}
case <-e.stopCh:
return
}
}
}()
}
func (e *etcdMetricsMeasurement) stopAndSummarize(host string, provider provider.Provider, port int) error {
defer e.Dispose()
// Do some one-off collection of metrics.
samples, err := e.getEtcdMetrics(host, provider, port)
if err != nil {
return err
}
collectEtcdMetrics := func(sample *model.Sample) {
var hist *measurementutil.HistogramVec
switch sample.Metric[model.MetricNameLabel] {
case "etcd_disk_backend_commit_duration_seconds_bucket":
hist = &e.metrics.BackendCommitDuration
case "etcd_debugging_snap_save_total_duration_seconds_bucket":
hist = &e.metrics.SnapshotSaveTotalDuration
case "etcd_disk_wal_fsync_duration_seconds_bucket":
hist = &e.metrics.WalFsyncDuration
case "etcd_network_peer_round_trip_time_seconds_bucket":
hist = &e.metrics.PeerRoundTripTime
default:
return
}
e.Lock()
measurementutil.ConvertSampleToBucket(sample, hist)
e.Unlock()
}
for _, sample := range samples {
collectEtcdMetrics(sample)
}
return nil
}
func (e *etcdMetricsMeasurement) getEtcdMetrics(host string, provider provider.Provider, port int) ([]*model.Sample, error) {
// In https://github.com/kubernetes/kubernetes/pull/74690, mTLS is enabled for etcd server
// in order to bypass TLS credential requirement when checking etc /metrics and /health, you
// need to provide the insecure http port number to access etcd, http://localhost:2382 for
// example.
cmd := fmt.Sprintf("curl http://localhost:%d/metrics", port)
samples, err := e.sshEtcdMetrics(cmd, host, provider)
if err == nil {
return samples, nil
}
klog.Warningf("%s: call on %d port (%s) failed due to %v. Falling back to default 2379 port.", e, port, cmd, err)
// Use old endpoint if new one fails, "2379" is hard-coded here as well, it is kept as is since
// we don't want to bloat the cluster config only for a fall-back attempt.
etcdCert, etcdKey, etcdHost := os.Getenv("ETCD_CERTIFICATE"), os.Getenv("ETCD_KEY"), os.Getenv("ETCD_HOST")
if etcdHost == "" {
etcdHost = "localhost"
}
if etcdCert == "" || etcdKey == "" {
klog.Warning("empty etcd cert or key, using http")
cmd = fmt.Sprintf("curl http://%s:2379/metrics", etcdHost)
} else {
cmd = fmt.Sprintf("curl -k --cert %s --key %s https://%s:2379/metrics", etcdCert, etcdKey, etcdHost)
}
return e.sshEtcdMetrics(cmd, host, provider)
}
func (e *etcdMetricsMeasurement) sshEtcdMetrics(cmd, host string, provider provider.Provider) ([]*model.Sample, error) {
sshResult, err := measurementutil.SSH(cmd, host+":22", provider)
if err != nil {
return nil, fmt.Errorf("unexpected error (code: %d) in ssh connection to master: %#v", sshResult.Code, err)
} else if sshResult.Code != 0 {
return nil, fmt.Errorf("failed running command: %s on the host: %s, result: %+v", cmd, host, sshResult)
}
data := sshResult.Stdout
return measurementutil.ExtractMetricSamples(data)
}
func (e *etcdMetricsMeasurement) getEtcdDatabaseSize(host string, provider provider.Provider, port int) (float64, error) {
samples, err := e.getEtcdMetrics(host, provider, port)
if err != nil {
return 0, err
}
for _, sample := range samples {
if sample.Metric[model.MetricNameLabel] == "etcd_debugging_mvcc_db_total_size_in_bytes" ||
sample.Metric[model.MetricNameLabel] == "etcd_mvcc_db_total_size_in_bytes" {
return float64(sample.Value), nil
}
}
return 0, fmt.Errorf("couldn't find etcd database size metric")
}
type etcdMetrics struct {
BackendCommitDuration measurementutil.HistogramVec `json:"backendCommitDuration"`
SnapshotSaveTotalDuration measurementutil.HistogramVec `json:"snapshotSaveTotalDuration"`
PeerRoundTripTime measurementutil.HistogramVec `json:"peerRoundTripTime"`
WalFsyncDuration measurementutil.HistogramVec `json:"walFsyncDuration"`
MaxDatabaseSize float64 `json:"maxDatabaseSize"`
}
func newEtcdMetrics() *etcdMetrics {
return &etcdMetrics{
BackendCommitDuration: make(measurementutil.HistogramVec, 0),
SnapshotSaveTotalDuration: make(measurementutil.HistogramVec, 0),
PeerRoundTripTime: make(measurementutil.HistogramVec, 0),
WalFsyncDuration: make(measurementutil.HistogramVec, 0),
}
}