in clusterloader2/pkg/measurement/util/wait_for_pods.go [56:127]
func WaitForPods(ctx context.Context, ps PodLister, options *WaitForPodOptions) error {
var timeout time.Duration
if deadline, hasDeadline := ctx.Deadline(); hasDeadline {
timeout = time.Until(deadline)
}
klog.V(2).Infof("%s: %s: starting with timeout: %v", options.CallerName, ps.String(), timeout)
oldPods, err := ps.List()
if err != nil {
return fmt.Errorf("failed to list pods: %w", err)
}
scaling := uninitialized
var oldPodsStatus PodsStartupStatus
var lastIsPodUpdatedError error
for {
select {
case <-ctx.Done():
desiredPodCount := options.DesiredPodCount()
pods := ComputePodsStatus(oldPods)
klog.V(2).Infof("%s: %s: expected %d pods, got %d pods (not RunningAndReady pods: %v)", options.CallerName, ps.String(), desiredPodCount, len(oldPods), pods.NotRunningAndReady())
klog.V(2).Infof("%s: %s: all pods: %v", options.CallerName, ps.String(), pods)
klog.V(2).Infof("%s: %s: last IsPodUpdated error: %v", options.CallerName, ps.String(), lastIsPodUpdatedError)
return fmt.Errorf("timeout while waiting for %d pods to be running in %s - summary of pods : %s",
desiredPodCount, ps.String(), oldPodsStatus.String())
case <-time.After(options.WaitForPodsInterval):
desiredPodCount := options.DesiredPodCount()
switch {
case len(oldPods) == desiredPodCount:
scaling = none
case len(oldPods) < desiredPodCount:
scaling = up
case len(oldPods) > desiredPodCount:
scaling = down
}
pods, err := ps.List()
if err != nil {
return fmt.Errorf("failed to list pods: %w", err)
}
podsStatus := ComputePodsStartupStatus(pods, desiredPodCount, options.IsPodUpdated)
if podsStatus.LastIsPodUpdatedError != nil {
lastIsPodUpdatedError = podsStatus.LastIsPodUpdatedError
}
diff := DiffPods(oldPods, pods)
deletedPods := diff.DeletedPods()
if scaling == up && len(deletedPods) > 0 {
klog.Warningf("%s: %s: %d pods disappeared: %v", options.CallerName, ps.String(), len(deletedPods), strings.Join(deletedPods, ", "))
}
addedPods := diff.AddedPods()
if scaling == down && len(addedPods) > 0 {
klog.Warningf("%s: %s: %d pods appeared: %v", options.CallerName, ps.String(), len(addedPods), strings.Join(addedPods, ", "))
}
if podsStatus.String() != oldPodsStatus.String() {
klog.V(2).Infof("%s: %s: %s", options.CallerName, ps.String(), podsStatus.String())
}
// We allow inactive pods (e.g. eviction happened).
// We wait until there is a desired number of pods running and all other pods are inactive.
if len(pods) == (podsStatus.Running+podsStatus.Inactive) && podsStatus.Running == podsStatus.RunningUpdated && podsStatus.RunningUpdated == desiredPodCount {
return nil
}
// When using preemptibles on large scale, number of ready nodes is not stable and reaching DesiredPodCount could take a very long time.
// Overall number of pods (especially Inactive pods) should not grow unchecked.
if options.CountErrorMargin > 0 && podsStatus.RunningUpdated >= desiredPodCount-options.CountErrorMargin && len(pods)-podsStatus.Inactive <= desiredPodCount && podsStatus.Inactive <= options.CountErrorMargin {
return nil
}
oldPods = pods
oldPodsStatus = podsStatus
}
}
}