clusterloader2/pkg/measurement/util/wait_for_pods.go (90 lines of code) (raw):

/* Copyright 2019 The Kubernetes Authors. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ package util import ( "context" "fmt" "strings" "time" v1 "k8s.io/api/core/v1" "k8s.io/klog/v2" ) const ( uninitialized = iota up down none ) // WaitForPodOptions is an options used by WaitForPods methods. type WaitForPodOptions struct { DesiredPodCount func() int CountErrorMargin int CallerName string WaitForPodsInterval time.Duration // IsPodUpdated can be used to detect which pods have been already updated. // nil value means all pods are updated. IsPodUpdated func(*v1.Pod) error } // PodLister is an interface around listing pods. type PodLister interface { List() ([]*v1.Pod, error) String() string } // WaitForPods waits till desired number of pods is running. // The current set of pods are fetched by calling List() on the provided PodStore. func WaitForPods(ctx context.Context, ps PodLister, options *WaitForPodOptions) error { var timeout time.Duration if deadline, hasDeadline := ctx.Deadline(); hasDeadline { timeout = time.Until(deadline) } klog.V(2).Infof("%s: %s: starting with timeout: %v", options.CallerName, ps.String(), timeout) oldPods, err := ps.List() if err != nil { return fmt.Errorf("failed to list pods: %w", err) } scaling := uninitialized var oldPodsStatus PodsStartupStatus var lastIsPodUpdatedError error for { select { case <-ctx.Done(): desiredPodCount := options.DesiredPodCount() pods := ComputePodsStatus(oldPods) klog.V(2).Infof("%s: %s: expected %d pods, got %d pods (not RunningAndReady pods: %v)", options.CallerName, ps.String(), desiredPodCount, len(oldPods), pods.NotRunningAndReady()) klog.V(2).Infof("%s: %s: all pods: %v", options.CallerName, ps.String(), pods) klog.V(2).Infof("%s: %s: last IsPodUpdated error: %v", options.CallerName, ps.String(), lastIsPodUpdatedError) return fmt.Errorf("timeout while waiting for %d pods to be running in %s - summary of pods : %s", desiredPodCount, ps.String(), oldPodsStatus.String()) case <-time.After(options.WaitForPodsInterval): desiredPodCount := options.DesiredPodCount() switch { case len(oldPods) == desiredPodCount: scaling = none case len(oldPods) < desiredPodCount: scaling = up case len(oldPods) > desiredPodCount: scaling = down } pods, err := ps.List() if err != nil { return fmt.Errorf("failed to list pods: %w", err) } podsStatus := ComputePodsStartupStatus(pods, desiredPodCount, options.IsPodUpdated) if podsStatus.LastIsPodUpdatedError != nil { lastIsPodUpdatedError = podsStatus.LastIsPodUpdatedError } diff := DiffPods(oldPods, pods) deletedPods := diff.DeletedPods() if scaling == up && len(deletedPods) > 0 { klog.Warningf("%s: %s: %d pods disappeared: %v", options.CallerName, ps.String(), len(deletedPods), strings.Join(deletedPods, ", ")) } addedPods := diff.AddedPods() if scaling == down && len(addedPods) > 0 { klog.Warningf("%s: %s: %d pods appeared: %v", options.CallerName, ps.String(), len(addedPods), strings.Join(addedPods, ", ")) } if podsStatus.String() != oldPodsStatus.String() { klog.V(2).Infof("%s: %s: %s", options.CallerName, ps.String(), podsStatus.String()) } // We allow inactive pods (e.g. eviction happened). // We wait until there is a desired number of pods running and all other pods are inactive. if len(pods) == (podsStatus.Running+podsStatus.Inactive) && podsStatus.Running == podsStatus.RunningUpdated && podsStatus.RunningUpdated == desiredPodCount { return nil } // When using preemptibles on large scale, number of ready nodes is not stable and reaching DesiredPodCount could take a very long time. // Overall number of pods (especially Inactive pods) should not grow unchecked. if options.CountErrorMargin > 0 && podsStatus.RunningUpdated >= desiredPodCount-options.CountErrorMargin && len(pods)-podsStatus.Inactive <= desiredPodCount && podsStatus.Inactive <= options.CountErrorMargin { return nil } oldPods = pods oldPodsStatus = podsStatus } } }