clusterloader2/pkg/measurement/common/ooms_tracker.go (242 lines of code) (raw):
/*
Copyright 2020 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package common
import (
"context"
"fmt"
"regexp"
"strings"
"sync"
"time"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/watch"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/pager"
"k8s.io/klog/v2"
"k8s.io/perf-tests/clusterloader2/pkg/measurement"
"k8s.io/perf-tests/clusterloader2/pkg/measurement/util/informer"
"k8s.io/perf-tests/clusterloader2/pkg/util"
)
const (
clusterOOMsTrackerEnabledParamName = "clusterOOMsTrackerEnabled"
clusterOOMsTrackerName = "ClusterOOMsTracker"
clusterOOMsIgnoredProcessesParamName = "clusterOOMsIgnoredProcesses"
informerTimeout = time.Minute
oomEventReason = "OOMKilling"
initialListPageSize = 10000
)
var (
oomEventMsgRegex = regexp.MustCompile(`Killed process (\d+) \((.+)\) total-vm:(\d+kB), anon-rss:\d+kB, file-rss:\d+kB.*`)
)
func init() {
if err := measurement.Register(clusterOOMsTrackerName, createClusterOOMsTrackerMeasurement); err != nil {
klog.Fatalf("Cannot register %s: %v", clusterOOMsTrackerName, err)
}
}
func createClusterOOMsTrackerMeasurement() measurement.Measurement {
return &clusterOOMsTrackerMeasurement{}
}
type clusterOOMsTrackerMeasurement struct {
selector *util.ObjectSelector
msgRegex *regexp.Regexp
isRunning bool
startTime time.Time
stopCh chan struct{}
lock sync.Mutex
processIgnored map[string]bool
resourceVersionRecorded map[string]bool
ooms []oomEvent
}
// TODO: Reevaluate if we can add new fields here when node-problem-detector
// starts using new events.
type oomEvent struct {
Node string `json:"node"`
Process string `json:"process"`
ProcessMemory string `json:"memory"`
ProcessID string `json:"pid"`
Time time.Time `json:"time"`
}
func (m *clusterOOMsTrackerMeasurement) Execute(config *measurement.Config) ([]measurement.Summary, error) {
clusterOOMsTrackerEnabled, err := util.GetBoolOrDefault(config.Params, clusterOOMsTrackerEnabledParamName, false)
if err != nil {
return nil, fmt.Errorf("problem with getting %s param: %w", clusterOOMsTrackerEnabledParamName, err)
}
if !clusterOOMsTrackerEnabled {
klog.V(1).Info("skipping tracking of OOMs in the cluster")
return nil, nil
}
action, err := util.GetString(config.Params, "action")
if err != nil {
return nil, fmt.Errorf("problem with getting %s param: %w", "action", err)
}
switch action {
case "start":
if err = m.start(config); err != nil {
return nil, fmt.Errorf("starting cluster OOMs measurement problem: %w", err)
}
return nil, nil
case "gather":
m.lock.Lock()
defer m.lock.Unlock()
return m.gather()
default:
return nil, fmt.Errorf("unknown action %v", action)
}
}
func (m *clusterOOMsTrackerMeasurement) Dispose() {
m.stop()
}
func (m *clusterOOMsTrackerMeasurement) String() string {
return clusterOOMsTrackerName
}
func (m *clusterOOMsTrackerMeasurement) getOOMsTrackerInformer(ctx context.Context, client clientset.Interface) cache.SharedInformer {
listFunc := func(options metav1.ListOptions) (runtime.Object, error) {
o := metav1.ListOptions{
Limit: 1,
}
result, err := client.CoreV1().Events(metav1.NamespaceAll).List(ctx, o)
if err != nil {
return nil, err
}
result.Continue = ""
result.Items = nil
return result, nil
}
watchFunc := func(options metav1.ListOptions) (watch.Interface, error) {
options.FieldSelector = m.selector.FieldSelector
return client.CoreV1().Events(metav1.NamespaceAll).Watch(ctx, options)
}
i := cache.NewSharedInformer(&cache.ListWatch{ListFunc: listFunc, WatchFunc: watchFunc}, nil, 0)
i.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
m.handleOOMEvent(obj)
},
UpdateFunc: func(_, obj interface{}) {
m.handleOOMEvent(obj)
},
DeleteFunc: func(_ interface{}) {},
})
return i
}
func (m *clusterOOMsTrackerMeasurement) handlePriorOOMs(ctx context.Context, client clientset.Interface) error {
pg := pager.New(pager.SimplePageFunc(func(opts metav1.ListOptions) (runtime.Object, error) {
return client.CoreV1().Events(metav1.NamespaceAll).List(ctx, opts)
}))
pg.PageSize = initialListPageSize
if err := pg.EachListItem(ctx, metav1.ListOptions{}, func(obj runtime.Object) error {
m.handleOOMEvent(obj)
return nil
}); err != nil {
return err
}
return nil
}
func (m *clusterOOMsTrackerMeasurement) start(config *measurement.Config) error {
if m.isRunning {
klog.V(2).Infof("%s: cluster OOMs tracking measurement already running", m)
return nil
}
klog.V(2).Infof("%s: starting cluster OOMs tracking measurement...", m)
if err := m.initFields(config); err != nil {
return fmt.Errorf("problem with OOMs tracking measurement fields initialization: %w", err)
}
ctx := context.Background()
client := config.ClusterFramework.GetClientSets().GetClient()
// Watching for OOM events from node-problem-detector below.
i := m.getOOMsTrackerInformer(ctx, client)
if err := informer.StartAndSync(i, m.stopCh, informerTimeout); err != nil {
return fmt.Errorf("problem with OOM events informer starting: %w", err)
}
// Searching for OOM events that happened before the measurement start.
// We're running this *after* initiating the informer above because doing
// the same in the reverse order might make us miss some OOMs.
if err := m.handlePriorOOMs(ctx, client); err != nil {
return fmt.Errorf("problem with handling prior OOMs: %w", err)
}
return nil
}
func (m *clusterOOMsTrackerMeasurement) initFields(config *measurement.Config) error {
m.isRunning = true
m.startTime = time.Now()
m.stopCh = make(chan struct{})
m.selector = &util.ObjectSelector{
FieldSelector: fields.Set{"reason": oomEventReason}.AsSelector().String(),
Namespace: metav1.NamespaceAll,
}
m.msgRegex = oomEventMsgRegex
m.resourceVersionRecorded = make(map[string]bool)
ignoredProcessesString, err := util.GetStringOrDefault(config.Params, clusterOOMsIgnoredProcessesParamName, "")
if err != nil {
return err
}
m.processIgnored = make(map[string]bool)
if ignoredProcessesString != "" {
processNames := strings.Split(ignoredProcessesString, ",")
for _, processName := range processNames {
m.processIgnored[processName] = true
}
}
return nil
}
func (m *clusterOOMsTrackerMeasurement) stop() {
if m.isRunning {
m.isRunning = false
close(m.stopCh)
}
}
func (m *clusterOOMsTrackerMeasurement) gather() ([]measurement.Summary, error) {
klog.V(2).Infof("%s: gathering cluster OOMs tracking measurement", clusterOOMsTrackerName)
if !m.isRunning {
return nil, fmt.Errorf("measurement %s has not been started", clusterOOMsTrackerName)
}
m.stop()
oomData := make(map[string][]oomEvent)
oomData["failures"] = make([]oomEvent, 0)
oomData["past"] = make([]oomEvent, 0)
oomData["ignored"] = make([]oomEvent, 0)
for _, oom := range m.ooms {
if m.startTime.After(oom.Time) {
oomData["past"] = append(oomData["past"], oom)
continue
}
if m.processIgnored[oom.Process] {
oomData["ignored"] = append(oomData["ignored"], oom)
continue
}
oomData["failures"] = append(oomData["failures"], oom)
}
content, err := util.PrettyPrintJSON(oomData)
if err != nil {
return nil, fmt.Errorf("OOMs PrettyPrintJSON problem: %w", err)
}
summary := measurement.CreateSummary(clusterOOMsTrackerName, "json", content)
if oomFailures := oomData["failures"]; len(oomFailures) > 0 {
err = fmt.Errorf("OOMs recorded: %+v", oomFailures)
}
return []measurement.Summary{summary}, err
}
func (m *clusterOOMsTrackerMeasurement) handleOOMEvent(obj interface{}) {
event, ok := obj.(*corev1.Event)
if !ok || event.Reason != oomEventReason {
return
}
m.lock.Lock()
defer m.lock.Unlock()
if m.resourceVersionRecorded[event.ObjectMeta.ResourceVersion] {
// We are catching an OOM event with already recorded resource
// version which may happen on relisting the events when a watch
// breaks. Because of that, we do not want to register that
// OOM more than once.
return
}
m.resourceVersionRecorded[event.ObjectMeta.ResourceVersion] = true
klog.V(2).Infof("OOM detected: %+v", event)
oom := oomEvent{
Node: event.InvolvedObject.Name,
}
if !event.EventTime.IsZero() {
oom.Time = event.EventTime.Time
} else {
oom.Time = event.FirstTimestamp.Time
}
if match := m.msgRegex.FindStringSubmatch(event.Message); len(match) == 4 {
oom.ProcessID = match[1]
oom.Process = match[2]
oom.ProcessMemory = match[3]
} else {
klog.Warningf(`unrecognized OOM event message pattern; event message contents: "%v"`, event.Message)
}
m.ooms = append(m.ooms, oom)
}