clusterloader2/pkg/chaos/nodes.go (161 lines of code) (raw):

/* Copyright 2018 The Kubernetes Authors. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ package chaos import ( "fmt" "math" "math/rand" "strings" "sync" "time" "k8s.io/perf-tests/clusterloader2/api" "k8s.io/perf-tests/clusterloader2/pkg/framework/client" "k8s.io/perf-tests/clusterloader2/pkg/util" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/wait" clientset "k8s.io/client-go/kubernetes" "k8s.io/klog/v2" "k8s.io/perf-tests/clusterloader2/pkg/provider" ) const ( monitoringNamespace = "monitoring" prometheusLabel = "prometheus=k8s" ) // NodeKiller is a utility to simulate node failures. type NodeKiller struct { config api.NodeFailureConfig client clientset.Interface // killedNodes stores names of the nodes that have been killed by NodeKiller. killedNodes sets.String recorder *eventRecorder ssh util.SSHExecutor } type nodeAction string const ( stopServices nodeAction = "stopService" rebootNode = "rebootNode" ) type event struct { time time.Time action nodeAction nodeName string } type eventRecorder struct { events []event mux sync.Mutex } func newEventRecorder() *eventRecorder { return &eventRecorder{[]event{}, sync.Mutex{}} } func (r *eventRecorder) record(a nodeAction, nodeName string) { e := event{time.Now(), a, nodeName} r.mux.Lock() r.events = append(r.events, e) r.mux.Unlock() } // NewNodeKiller creates new NodeKiller. func NewNodeKiller(config api.NodeFailureConfig, client clientset.Interface, killedNodes sets.String, provider provider.Provider) (*NodeKiller, error) { // TODO(#1399): node-killing code is provider specific, move it into provider if !provider.Features().SupportNodeKiller { return nil, fmt.Errorf("provider %q is not supported by NodeKiller", provider) } sshExecutor := &util.GCloudSSHExecutor{} return &NodeKiller{config, client, killedNodes, newEventRecorder(), sshExecutor}, nil } // Run starts NodeKiller until stopCh is closed. func (k *NodeKiller) Run(stopCh <-chan struct{}, wg *sync.WaitGroup) { defer wg.Done() // wait.JitterUntil starts work immediately, so wait first. sleepInterrupt(wait.Jitter(time.Duration(k.config.Interval), k.config.JitterFactor), stopCh) wait.JitterUntil(func() { nodes, err := k.pickNodes() if err != nil { klog.Errorf("%s: Unable to pick nodes to kill: %v", k, err) return } k.kill(nodes, stopCh) }, time.Duration(k.config.Interval), k.config.JitterFactor, true, stopCh) } func (k *NodeKiller) pickNodes() ([]v1.Node, error) { allNodes, err := util.GetSchedulableUntainedNodes(k.client) if err != nil { return nil, err } prometheusPods, err := client.ListPodsWithOptions(k.client, monitoringNamespace, metav1.ListOptions{ LabelSelector: prometheusLabel, }) if err != nil { return nil, err } nodesHasPrometheusPod := sets.NewString() for i := range prometheusPods { if prometheusPods[i].Spec.NodeName != "" { nodesHasPrometheusPod.Insert(prometheusPods[i].Spec.NodeName) klog.V(2).Infof("%s: Node %s removed from killing. Runs pod %s", k, prometheusPods[i].Spec.NodeName, prometheusPods[i].Name) } } nodes := allNodes[:0] for _, node := range allNodes { if !nodesHasPrometheusPod.Has(node.Name) && !k.killedNodes.Has(node.Name) { nodes = append(nodes, node) } } rand.Shuffle(len(nodes), func(i, j int) { nodes[i], nodes[j] = nodes[j], nodes[i] }) numNodes := int(math.Ceil(k.config.FailureRate * float64(len(nodes)))) klog.V(2).Infof("%s: %d nodes available, wants to fail %d nodes", k, len(nodes), numNodes) if len(nodes) > numNodes { nodes = nodes[:numNodes] } for _, node := range nodes { klog.V(2).Infof("%s: Node %q schedule for failure", k, node.Name) } return nodes, nil } func (k *NodeKiller) kill(nodes []v1.Node, stopCh <-chan struct{}) { wg := sync.WaitGroup{} wg.Add(len(nodes)) for _, node := range nodes { k.killedNodes.Insert(node.Name) node := node go func() { defer wg.Done() klog.V(2).Infof("%s: Stopping docker and kubelet on %q to simulate failure", k, node.Name) k.addStopServicesEvent(node.Name) err := k.ssh.Exec("sudo systemctl stop docker kubelet", &node, nil) if err != nil { klog.Errorf("%s: ERROR while stopping node %q: %v", k, node.Name, err) return } // Listening for interruptions on stopCh or wait for the simulated downtime sleepInterrupt(time.Duration(k.config.SimulatedDowntime), stopCh) klog.V(2).Infof("%s: Rebooting %q to repair the node", k, node.Name) k.addRebootEvent(node.Name) err = k.ssh.Exec("sudo reboot", &node, nil) if err != nil { klog.Errorf("%s: Error while rebooting node %q: %v", k, node.Name, err) return } }() } wg.Wait() } func (k *NodeKiller) addStopServicesEvent(nodeName string) { k.recorder.record(stopServices, nodeName) } func (k *NodeKiller) addRebootEvent(nodeName string) { k.recorder.record(rebootNode, nodeName) } // Summary logs NodeKiller execution func (k *NodeKiller) Summary() string { var sb strings.Builder sb.WriteString(fmt.Sprintf("%s: Recorded following events\n", k)) for _, e := range k.recorder.events { sb.WriteString(fmt.Sprintf("%s: At %v %v happend for node %s\n", k, e.time.Format(time.UnixDate), e.action, e.nodeName)) } return sb.String() } func (k *NodeKiller) String() string { return "NodeKiller" } // Utility method to put the current routine to sleep or break the sleep if stopCh closes // Note of warning: if stopCh is already closed the process may not sleep at all. func sleepInterrupt(duration time.Duration, stopCh <-chan struct{}) { select { case <-stopCh: break case <-time.After(duration): break } }