clusterloader2/pkg/measurement/common/profile.go (221 lines of code) (raw):

/* Copyright 2018 The Kubernetes Authors. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ package common import ( "context" "fmt" "sync" "time" goerrors "github.com/go-errors/errors" rbacv1 "k8s.io/api/rbac/v1" apierrs "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" clientset "k8s.io/client-go/kubernetes" "k8s.io/klog/v2" "k8s.io/perf-tests/clusterloader2/pkg/framework/client" "k8s.io/perf-tests/clusterloader2/pkg/measurement" measurementutil "k8s.io/perf-tests/clusterloader2/pkg/measurement/util" "k8s.io/perf-tests/clusterloader2/pkg/provider" "k8s.io/perf-tests/clusterloader2/pkg/util" ) const ( cpuProfileName = "CPUProfile" memoryProfileName = "MemoryProfile" mutexProfileName = "MutexProfile" ) func init() { if err := measurement.Register(cpuProfileName, createProfileMeasurementFactory(cpuProfileName, "profile")); err != nil { klog.Fatalf("Cannot register %s: %v", cpuProfileName, err) } if err := measurement.Register(memoryProfileName, createProfileMeasurementFactory(memoryProfileName, "heap")); err != nil { klog.Fatalf("Cannot register %s: %v", memoryProfileName, err) } if err := measurement.Register(mutexProfileName, createProfileMeasurementFactory(mutexProfileName, "mutex")); err != nil { klog.Fatalf("Cannot register %s: %v", mutexProfileName, err) } } type profileConfig struct { componentName string provider provider.Provider hosts []string kind string } func (p *profileMeasurement) populateProfileConfig(config *measurement.Config) error { var err error if p.config.componentName, err = util.GetString(config.Params, "componentName"); err != nil { return err } p.config.provider = config.ClusterFramework.GetClusterConfig().Provider p.config.hosts = config.ClusterFramework.GetClusterConfig().MasterIPs return nil } type profileMeasurement struct { name string config *profileConfig summaries []measurement.Summary isRunning bool stopCh chan struct{} wg sync.WaitGroup } func createProfileMeasurementFactory(name, kind string) func() measurement.Measurement { return func() measurement.Measurement { return &profileMeasurement{ name: name, config: &profileConfig{kind: kind}, } } } func (p *profileMeasurement) start(config *measurement.Config, SSHToMasterSupported bool) error { if err := p.populateProfileConfig(config); err != nil { return err } if len(p.config.hosts) < 1 { klog.Warning("Profile measurements will be disabled due to no MasterIps") return nil } k8sClient := config.ClusterFramework.GetClientSets().GetClient() if p.shouldExposeAPIServerDebugEndpoint() { if err := exposeAPIServerDebugEndpoint(k8sClient); err != nil { klog.Warningf("error while exposing kube-apiserver /debug endpoint: %v", err) } } p.summaries = make([]measurement.Summary, 0) p.isRunning = true p.stopCh = make(chan struct{}) p.wg.Add(1) profileFrequency := 5 * time.Minute go func() { defer p.wg.Done() for { select { case <-p.stopCh: return case <-time.After(profileFrequency): profileSummaries, err := p.gatherProfile(k8sClient, SSHToMasterSupported, config) if err != nil { klog.Errorf("failed to gather profile for %#v: %v", *p.config, err) continue } if profileSummaries != nil { p.summaries = append(p.summaries, profileSummaries...) } } } }() return nil } func (p *profileMeasurement) stop() { if !p.isRunning { return } close(p.stopCh) p.wg.Wait() } // Execute gathers memory profile of a given component. func (p *profileMeasurement) Execute(config *measurement.Config) ([]measurement.Summary, error) { provider := config.ClusterFramework.GetClusterConfig().Provider SSHToMasterSupported := provider.Features().SupportSSHToMaster APIServerPprofEnabled := config.ClusterFramework.GetClusterConfig().APIServerPprofByClientEnabled if !SSHToMasterSupported && APIServerPprofEnabled { klog.Warningf("fetching profile data from is not possible from provider: %s", provider.Name()) return nil, nil } action, err := util.GetString(config.Params, "action") if err != nil { return nil, err } switch action { case "start": if p.isRunning { klog.V(2).Infof("%s: measurement already running", p) return nil, nil } return nil, p.start(config, SSHToMasterSupported) case "gather": p.stop() return p.summaries, nil default: return nil, fmt.Errorf("unknown action %v", action) } } // Dispose cleans up after the measurement. func (*profileMeasurement) Dispose() {} // String returns string representation of this measurement. func (p *profileMeasurement) String() string { return p.name } func (p *profileMeasurement) gatherProfile(c clientset.Interface, SSHToMasterSupported bool, config *measurement.Config) ([]measurement.Summary, error) { getCommand, err := p.getProfileCommand(config) if err != nil { return nil, goerrors.Errorf("profile gathering failed during retrieving profile command: %v", err) } var summaries []measurement.Summary for _, host := range p.config.hosts { profilePrefix := fmt.Sprintf("%s_%s_%s", host, p.config.componentName, p.name) // Get the profile data over SSH. // Start by checking that the provider allows us to do so. if !SSHToMasterSupported { // SSH to master for this provider is not possible. // For kube-apiserver, we can still fetch the profile using a RESTClient and pprof. // TODO(#246): This will connect to a random master in HA (multi-master) clusters, fix it. if p.config.componentName == "kube-apiserver" { body, err := c.CoreV1().RESTClient().Get().AbsPath("/debug/pprof/" + p.config.kind).DoRaw(context.TODO()) if err != nil { return nil, err } summary := measurement.CreateSummary(profilePrefix, "pprof", string(body)) summaries = append(summaries, summary) break } // Only logging error for gke. SSHing to gke master is not supported. klog.Warningf("%s: failed to execute curl command on master through SSH", p.name) return nil, nil } sshResult, err := measurementutil.SSH(getCommand, host+":22", p.config.provider) if err != nil { return nil, fmt.Errorf("failed to execute curl command on master node %s through SSH: %v", host, err) } summaries = append(summaries, measurement.CreateSummary(profilePrefix, "pprof", sshResult.Stdout)) } return summaries, nil } func (p *profileMeasurement) shouldExposeAPIServerDebugEndpoint() bool { return p.config.componentName == "kube-apiserver" } func (p *profileMeasurement) getProfileCommand(config *measurement.Config) (string, error) { profileProtocol, profilePort, err := config.ClusterFramework.GetClusterConfig().Provider.GetComponentProtocolAndPort(p.config.componentName) if err != nil { return "", goerrors.Errorf("get profile command failed finding component protocol/port: %v", err) } var command string if p.config.componentName == "etcd" { etcdCert := config.ClusterFramework.GetClusterConfig().EtcdCertificatePath etcdKey := config.ClusterFramework.GetClusterConfig().EtcdKeyPath command = fmt.Sprintf("curl -s -k --cert %s --key %s %slocalhost:%v/debug/pprof/%s", etcdCert, etcdKey, profileProtocol, profilePort, p.config.kind) } else { command = fmt.Sprintf("curl -s -k %slocalhost:%v/debug/pprof/%s", profileProtocol, profilePort, p.config.kind) } return command, nil } func exposeAPIServerDebugEndpoint(c clientset.Interface) error { klog.V(2).Info("Exposing kube-apiserver debug endpoint for anonymous access") createClusterRole := func() error { _, err := c.RbacV1().ClusterRoles().Create(context.TODO(), &rbacv1.ClusterRole{ ObjectMeta: metav1.ObjectMeta{Name: "apiserver-debug-viewer"}, Rules: []rbacv1.PolicyRule{ {Verbs: []string{"get"}, NonResourceURLs: []string{"/debug/*"}}, }, }, metav1.CreateOptions{}) return err } createClusterRoleBinding := func() error { _, err := c.RbacV1().ClusterRoleBindings().Create(context.TODO(), &rbacv1.ClusterRoleBinding{ ObjectMeta: metav1.ObjectMeta{Name: "anonymous:apiserver-debug-viewer"}, RoleRef: rbacv1.RoleRef{Kind: "ClusterRole", Name: "apiserver-debug-viewer"}, Subjects: []rbacv1.Subject{ {Kind: "User", Name: "system:anonymous"}, }, }, metav1.CreateOptions{}) return err } if err := retryCreateFunction(createClusterRole); err != nil { return err } if err := retryCreateFunction(createClusterRoleBinding); err != nil { return err } return nil } func retryCreateFunction(f func() error) error { return client.RetryWithExponentialBackOff( client.RetryFunction(f, client.Allow(apierrs.IsAlreadyExists))) }