clusterloader2/pkg/measurement/common/wait_for_controlled_pods.go (549 lines of code) (raw):

/* Copyright 2018 The Kubernetes Authors. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ package common import ( "context" "fmt" "strings" "sync" "time" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/meta" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/informers" clientset "k8s.io/client-go/kubernetes" "k8s.io/client-go/tools/cache" "k8s.io/klog/v2" "k8s.io/perf-tests/clusterloader2/pkg/errors" "k8s.io/perf-tests/clusterloader2/pkg/framework" "k8s.io/perf-tests/clusterloader2/pkg/measurement" measurementutil "k8s.io/perf-tests/clusterloader2/pkg/measurement/util" "k8s.io/perf-tests/clusterloader2/pkg/measurement/util/checker" "k8s.io/perf-tests/clusterloader2/pkg/measurement/util/informer" "k8s.io/perf-tests/clusterloader2/pkg/measurement/util/runtimeobjects" "k8s.io/perf-tests/clusterloader2/pkg/measurement/util/workerqueue" "k8s.io/perf-tests/clusterloader2/pkg/util" ) const ( defaultSyncTimeout = 60 * time.Second defaultOperationTimeout = 10 * time.Minute checkControlledPodsInterval = 5 * time.Second informerSyncTimeout = time.Minute waitForControlledPodsRunningName = "WaitForControlledPodsRunning" // In this measurement, we rely on the fact that handlers are being // processed in order - in particular gather() is checking if all // objects up to a given resource version has already been processed. // To guarantee processing order we can't have more than a single // worker. Fortunately it doesn't change much, because almost all // handler function is happening under lock. waitForControlledPodsWorkers = 1 ) var podIndexerFactory = &sharedPodIndexerFactory{} func init() { if err := measurement.Register(waitForControlledPodsRunningName, createWaitForControlledPodsRunningMeasurement); err != nil { klog.Fatalf("Cannot register %s: %v", waitForControlledPodsRunningName, err) } } type sharedPodIndexerFactory struct { podsIndexer *measurementutil.ControlledPodsIndexer err error once sync.Once } func (s *sharedPodIndexerFactory) PodsIndexer(c clientset.Interface) (*measurementutil.ControlledPodsIndexer, error) { s.once.Do(func() { s.podsIndexer, s.err = s.start(c) }) return s.podsIndexer, s.err } func (s *sharedPodIndexerFactory) start(c clientset.Interface) (*measurementutil.ControlledPodsIndexer, error) { ctx := context.Background() informerFactory := informers.NewSharedInformerFactory(c, 0) podsIndexer, err := measurementutil.NewControlledPodsIndexer( informerFactory.Core().V1().Pods(), informerFactory.Apps().V1().ReplicaSets(), ) if err != nil { return nil, fmt.Errorf("failed to initialize controlledPodsIndexer: %w", err) } informerFactory.Start(ctx.Done()) if !podsIndexer.WaitForCacheSync(ctx) { return nil, fmt.Errorf("failed to sync informers") } return podsIndexer, nil } func createWaitForControlledPodsRunningMeasurement() measurement.Measurement { return &waitForControlledPodsRunningMeasurement{ selector: util.NewObjectSelector(), queue: workerqueue.NewWorkerQueue(waitForControlledPodsWorkers), objectKeys: sets.NewString(), checkerMap: checker.NewMap(), } } type waitForControlledPodsRunningMeasurement struct { apiVersion string kind string selector *util.ObjectSelector operationTimeout time.Duration // countErrorMargin orders measurement to wait for number of pods to be in // <desired count - countErrorMargin, desired count> range // When using preemptibles on large scale, number of ready nodes is not stable // and reaching DesiredPodCount could take a very long time. countErrorMargin int stopCh chan struct{} isRunning bool queue workerqueue.Interface handlingGroup wait.Group lock sync.Mutex objectKeys sets.String opResourceVersion uint64 gvr schema.GroupVersionResource checkerMap checker.Map clusterFramework *framework.Framework checkIfPodsAreUpdated bool // podsIndexer is an indexer propagated via informers observing // changes of all pods in the whole cluster. podsIndexer *measurementutil.ControlledPodsIndexer } // Execute waits until all specified controlling objects have all pods running or until timeout happens. // Controlling objects can be specified by field and/or label selectors. // If namespace is not passed by parameter, all-namespace scope is assumed. // "Start" action starts observation of the controlling objects, while "gather" waits for until // specified number of controlling objects have all pods running. func (w *waitForControlledPodsRunningMeasurement) Execute(config *measurement.Config) ([]measurement.Summary, error) { w.clusterFramework = config.ClusterFramework action, err := util.GetString(config.Params, "action") if err != nil { return nil, err } switch action { case "start": w.apiVersion, err = util.GetString(config.Params, "apiVersion") if err != nil { return nil, err } w.kind, err = util.GetString(config.Params, "kind") if err != nil { return nil, err } if err = w.selector.Parse(config.Params); err != nil { return nil, err } w.operationTimeout, err = util.GetDurationOrDefault(config.Params, "operationTimeout", defaultOperationTimeout) if err != nil { return nil, err } w.checkIfPodsAreUpdated, err = util.GetBoolOrDefault(config.Params, "checkIfPodsAreUpdated", true) if err != nil { return nil, err } w.countErrorMargin, err = util.GetIntOrDefault(config.Params, "countErrorMargin", 0) if err != nil { return nil, err } return nil, w.start() case "gather": syncTimeout, err := util.GetDurationOrDefault(config.Params, "syncTimeout", defaultSyncTimeout) if err != nil { return nil, err } return nil, w.gather(syncTimeout) case "stop": w.Dispose() return nil, nil default: return nil, fmt.Errorf("unknown action %v", action) } } // Dispose cleans up after the measurement. func (w *waitForControlledPodsRunningMeasurement) Dispose() { if !w.isRunning { return } w.isRunning = false close(w.stopCh) w.queue.Stop() w.lock.Lock() defer w.lock.Unlock() w.checkerMap.Dispose() } // String returns a string representation of the metric. func (*waitForControlledPodsRunningMeasurement) String() string { return waitForControlledPodsRunningName } func (w *waitForControlledPodsRunningMeasurement) start() error { if w.isRunning { klog.V(2).Infof("%v: wait for controlled pods measurement already running", w) return nil } klog.V(2).Infof("%v: starting wait for controlled pods measurement...", w) gv, err := schema.ParseGroupVersion(w.apiVersion) if err != nil { return err } gvk := gv.WithKind(w.kind) w.gvr, _ = meta.UnsafeGuessKindToResource(gvk) w.isRunning = true w.stopCh = make(chan struct{}) podsIndexer, err := podIndexerFactory.PodsIndexer(w.clusterFramework.GetClientSets().GetClient()) if err != nil { return err } w.podsIndexer = podsIndexer i := informer.NewDynamicInformer( w.clusterFramework.GetDynamicClients().GetClient(), w.gvr, w.selector, func(odlObj, newObj interface{}) { f := func() { w.handleObject(odlObj, newObj) } w.queue.Add(&f) }, ) return informer.StartAndSync(i, w.stopCh, informerSyncTimeout) } func (w *waitForControlledPodsRunningMeasurement) gather(syncTimeout time.Duration) error { klog.V(2).Infof("%v: waiting for controlled pods measurement...", w) if !w.isRunning { return fmt.Errorf("metric %s has not been started", w) } objectKeys, maxResourceVersion, err := w.getObjectKeysAndMaxVersion() if err != nil { return err } // Wait until checkers for all objects are registered: // - when object is created/updated, it's enough to wait for its resourceVersion to // be processed by our handler; thus we wait until all events up to maxResourceVersion // are processed before proceeding // - when object is deleted, by definition it will not be returned by the LIST request, // thus resourceVersion of the deletion may be higher than the maxResourceVersion; // we solve that by waiting until list of currently existing objects (that we propagate // via our handler) is equal to the expected one; // NOTE: we're not resiliant to situations where an object will be created/deleted // after the LIST call happened. But given measurement and phases don't infer with // each other, it can't be clusterloader that deleted it. Thus we accept this limitation. // NOTE: we could try waiting for the informer state to be the same and use the // resourceVersion from there, but then existence of bookmarks and the fact that our // informer doesn't necessary follow all objects of a given type can break that. // See #1259 for more details. cond := func() (bool, error) { w.lock.Lock() defer w.lock.Unlock() return w.opResourceVersion >= maxResourceVersion && objectKeys.Equal(w.objectKeys), nil } if err := wait.Poll(checkControlledPodsInterval, syncTimeout, cond); err != nil { return fmt.Errorf("timed out while waiting for controlled pods: %v", err) } w.handlingGroup.Wait() w.lock.Lock() defer w.lock.Unlock() var numberRunning, numberDeleted, numberTimeout, numberFailed int failedErrList := errors.NewErrorList() timedOutObjects := []string{} var maxDuration time.Duration for _, checker := range w.checkerMap { objChecker := checker.(*objectChecker) status, err := objChecker.getStatus() if objChecker.duration > maxDuration { maxDuration = objChecker.duration } switch status { case running: numberRunning++ case deleted: numberDeleted++ case timeout: timedOutObjects = append(timedOutObjects, objChecker.key) numberTimeout++ case deleteTimeout: timedOutObjects = append(timedOutObjects, objChecker.key) numberTimeout++ podsClient := w.clusterFramework.GetClientSets().GetClient().CoreV1().Pods(w.selector.Namespace) err := podsClient.DeleteCollection(context.Background(), forceDeleteOptions(), w.listOptions()) if err != nil { klog.Errorf("Error: %s while Force Deleting Pod, %s", err, objChecker.key) } case failed: numberFailed++ if err != nil { failedErrList.Append(err) } default: // Probably implementation bug. return fmt.Errorf("got unknown status for %v: status=%v, err=%v", objChecker.key, status, err) } } klog.V(2).Infof("%s: running %d, deleted %d, timeout: %d, failed: %d", w, numberRunning, numberDeleted, numberTimeout, numberFailed) var ratio float64 if w.operationTimeout != 0 { ratio = float64(maxDuration) / float64(w.operationTimeout) } klog.V(2).Infof("%s: maxDuration=%v, operationTimeout=%v, ratio=%.2f", w, maxDuration, w.operationTimeout, ratio) if numberTimeout > 0 { klog.Errorf("Timed out %ss: %s", w.kind, strings.Join(timedOutObjects, ", ")) return fmt.Errorf("%d objects timed out: %ss: %s", numberTimeout, w.kind, strings.Join(timedOutObjects, ", ")) } if objectKeys.Len() != numberRunning { klog.Errorf("%s: incorrect objects number: %d/%d %ss are running with all pods", w, numberRunning, objectKeys.Len(), w.kind) return fmt.Errorf("incorrect objects number: %d/%d %ss are running with all pods", numberRunning, objectKeys.Len(), w.kind) } if numberFailed > 0 { klog.Errorf("%s: failed status for %d %ss: %s", w, numberFailed, w.kind, failedErrList.String()) return fmt.Errorf("failed objects statuses: %v", failedErrList.String()) } klog.V(2).Infof("%s: %d/%d %ss are running with all pods", w, numberRunning, objectKeys.Len(), w.kind) return nil } func (w *waitForControlledPodsRunningMeasurement) listOptions() metav1.ListOptions { listOptions := metav1.ListOptions{ LabelSelector: w.selector.LabelSelector, FieldSelector: w.selector.FieldSelector, } return listOptions } func forceDeleteOptions() metav1.DeleteOptions { gracePeriod := int64(0) propagationPolicy := metav1.DeletePropagationBackground forceDeletePodOptions := metav1.DeleteOptions{ GracePeriodSeconds: &gracePeriod, PropagationPolicy: &propagationPolicy, } return forceDeletePodOptions } // handleObject manages checker for given controlling pod object. // This function does not return errors only logs them. All possible errors will be caught in gather function. // If this function does not executes correctly, verifying number of running pods will fail, // causing incorrect objects number error to be returned. func (w *waitForControlledPodsRunningMeasurement) handleObject(oldObj, newObj interface{}) { var oldRuntimeObj runtime.Object var newRuntimeObj runtime.Object var ok bool oldRuntimeObj, ok = oldObj.(runtime.Object) if oldObj != nil && !ok { klog.Errorf("%s: uncastable old object: %v", w, oldObj) return } newRuntimeObj, ok = newObj.(runtime.Object) if newObj != nil && !ok { klog.Errorf("%s: uncastable new object: %v", w, newObj) return } // Acquire the lock before defining defered function to ensure it // will be called under the same lock. w.lock.Lock() defer w.lock.Unlock() defer func() { if err := w.updateCacheLocked(oldRuntimeObj, newRuntimeObj); err != nil { klog.Errorf("%s: error when updating cache: %v", w, err) } }() isEqual, err := runtimeobjects.IsEqualRuntimeObjectsSpec(oldRuntimeObj, newRuntimeObj) if err != nil { klog.Errorf("%s: comparing specs error: %v", w, err) return } if isEqual { // Skip updates without changes in the spec. return } if !w.isRunning { return } if err := w.deleteObjectLocked(oldRuntimeObj); err != nil { klog.Errorf("%s: delete checker error: %v", w, err) } if err := w.handleObjectLocked(oldRuntimeObj, newRuntimeObj); err != nil { klog.Errorf("%s: create checker error: %v", w, err) } } func (w *waitForControlledPodsRunningMeasurement) checkScaledown(oldObj, newObj runtime.Object) (bool, error) { oldReplicasWatcher, err := runtimeobjects.GetReplicasFromRuntimeObject(w.clusterFramework.GetClientSets().GetClient(), oldObj) if err != nil { return false, err } oldReplicas, err := runtimeobjects.GetReplicasOnce(oldReplicasWatcher) if err != nil { return false, err } newReplicasWatcher, err := runtimeobjects.GetReplicasFromRuntimeObject(w.clusterFramework.GetClientSets().GetClient(), newObj) if err != nil { return false, err } newReplicas, err := runtimeobjects.GetReplicasOnce(newReplicasWatcher) if err != nil { return false, err } return newReplicas < oldReplicas, nil } func (w *waitForControlledPodsRunningMeasurement) handleObjectLocked(oldObj, newObj runtime.Object) error { isObjDeleted := newObj == nil handledObj := newObj if isObjDeleted { handledObj = oldObj } key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(handledObj) if err != nil { return fmt.Errorf("meta key creation error: %v", err) } checker, err := w.waitForRuntimeObject(handledObj, isObjDeleted, w.operationTimeout) if err != nil { return fmt.Errorf("waiting for %v error: %v", key, err) } w.checkerMap.Add(key, checker) return nil } func (w *waitForControlledPodsRunningMeasurement) deleteObjectLocked(obj runtime.Object) error { if obj == nil { return nil } key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj) if err != nil { return fmt.Errorf("meta key creation error: %v", err) } w.checkerMap.DeleteAndStop(key) return nil } func (w *waitForControlledPodsRunningMeasurement) updateCacheLocked(oldObj, newObj runtime.Object) error { errList := errors.NewErrorList() if oldObj != nil { key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(oldObj) if err != nil { errList.Append(fmt.Errorf("%s: retrieving key error: %v", w, err)) } else { w.objectKeys.Delete(key) } if err := w.updateOpResourceVersionLocked(oldObj); err != nil { errList.Append(err) } } if newObj != nil { key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(newObj) if err != nil { errList.Append(fmt.Errorf("%s: retrieving key error: %v", w, err)) } else { w.objectKeys.Insert(key) } if err := w.updateOpResourceVersionLocked(newObj); err != nil { errList.Append(err) } } if errList.IsEmpty() { return nil } return fmt.Errorf(errList.Error()) } func (w *waitForControlledPodsRunningMeasurement) updateOpResourceVersionLocked(runtimeObj runtime.Object) error { version, err := runtimeobjects.GetResourceVersionFromRuntimeObject(runtimeObj) if err != nil { return fmt.Errorf("retriving resource version error: %v", err) } if version > w.opResourceVersion { w.opResourceVersion = version } return nil } // getObjectKeysAndMaxVersion returns keys of objects that satisfy measurement parameters // and the maximal resource version of these objects. func (w *waitForControlledPodsRunningMeasurement) getObjectKeysAndMaxVersion() (sets.String, uint64, error) { objects, err := runtimeobjects.ListRuntimeObjectsForKind( w.clusterFramework.GetDynamicClients().GetClient(), w.gvr, w.kind, w.selector.Namespace, w.selector.LabelSelector, w.selector.FieldSelector) if err != nil { return nil, 0, fmt.Errorf("listing objects error: %v", err) } objectKeys := sets.NewString() var maxResourceVersion uint64 for i := range objects { runtimeObj, ok := objects[i].(runtime.Object) if !ok { klog.Errorf("%s: cannot cast to runtime.Object: %v", w, objects[i]) continue } version, err := runtimeobjects.GetResourceVersionFromRuntimeObject(runtimeObj) if err != nil { klog.Errorf("%s: retriving resource version error: %v", w, err) continue } key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(runtimeObj) if err != nil { klog.Errorf("%s: retrieving key error: %v", w, err) continue } objectKeys.Insert(key) if version > maxResourceVersion { maxResourceVersion = version } } return objectKeys, maxResourceVersion, nil } func (w *waitForControlledPodsRunningMeasurement) waitForRuntimeObject(obj runtime.Object, isDeleted bool, operationTimeout time.Duration) (*objectChecker, error) { ctx := context.TODO() runtimeObjectReplicas, err := runtimeobjects.GetReplicasFromRuntimeObject(w.clusterFramework.GetClientSets().GetClient(), obj) if err != nil { return nil, err } var isPodUpdated func(*v1.Pod) error if w.checkIfPodsAreUpdated { isPodUpdated, err = runtimeobjects.GetIsPodUpdatedPredicateFromRuntimeObject(obj) if err != nil { return nil, err } } if isDeleted { runtimeObjectReplicas = &runtimeobjects.ConstReplicas{0} } key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj) if err != nil { return nil, fmt.Errorf("meta key creation error: %v", err) } podStore, err := measurementutil.NewOwnerReferenceBasedPodStore(w.podsIndexer, obj) if err != nil { return nil, fmt.Errorf("failed to create pod store: %w", err) } o := newObjectChecker(key) o.lock.Lock() defer o.lock.Unlock() w.handlingGroup.Start(func() { ctx, cancel := context.WithCancel(ctx) defer cancel() o.SetCancel(cancel) if operationTimeout != time.Duration(0) { ctx, cancel = context.WithTimeout(ctx, operationTimeout) defer cancel() } if err := runtimeObjectReplicas.Start(ctx.Done()); err != nil { klog.Errorf("%s: error while starting runtimeObjectReplicas: %v", key, err) o.err = fmt.Errorf("failed to start runtimeObjectReplicas: %v", err) return } options := &measurementutil.WaitForPodOptions{ DesiredPodCount: runtimeObjectReplicas.Replicas, CountErrorMargin: w.countErrorMargin, CallerName: w.String(), WaitForPodsInterval: defaultWaitForPodsInterval, IsPodUpdated: isPodUpdated, } // This function sets the status (and error message) for the object checker. // The handling of bad statuses and errors is done by gather() function of the measurement. start := time.Now() err := measurementutil.WaitForPods(ctx, podStore, options) o.lock.Lock() defer o.lock.Unlock() o.duration = time.Since(start) if err != nil { klog.Errorf("%s: error for %v: %v", w, key, err) o.status = failed o.err = fmt.Errorf("%s: %v", key, err) hasTimedOut := ctx.Err() != nil if hasTimedOut { if isDeleted { o.status = deleteTimeout } else { o.status = timeout } klog.Errorf("%s: %s timed out", w, key) } return } if isDeleted { o.status = deleted return } o.status = running }) return o, nil } type objectStatus int const ( unknown objectStatus = iota // WaitForPods hasn't finished yet. Result isn't determined yet. running // WaitForPods finished and scale up/down to scale other than 0 succeeded. deleted // WaitForPods finished and scale down to 0 succeeded. failed // WaitForPods finished, but failed. o.err must be set. timeout // WaitForPods has been interrupted due to timeout and target scale was other than 0. deleteTimeout // WaitForPods has been interrupted due to timeout and target scale was 0. ) type objectChecker struct { lock sync.Mutex status objectStatus err error // key of the object being checked. In the current implementation it's a namespaced name, but it // may change in the future. key string cancel context.CancelFunc duration time.Duration } func newObjectChecker(key string) *objectChecker { return &objectChecker{ status: unknown, key: key, } } func (o *objectChecker) SetCancel(cancel context.CancelFunc) { o.lock.Lock() defer o.lock.Unlock() o.cancel = cancel } func (o *objectChecker) Stop() { o.lock.Lock() defer o.lock.Unlock() o.cancel() } func (o *objectChecker) getStatus() (objectStatus, error) { o.lock.Lock() defer o.lock.Unlock() return o.status, o.err }