in clusterloader2/pkg/measurement/common/wait_for_controlled_pods.go [245:340]
func (w *waitForControlledPodsRunningMeasurement) gather(syncTimeout time.Duration) error {
klog.V(2).Infof("%v: waiting for controlled pods measurement...", w)
if !w.isRunning {
return fmt.Errorf("metric %s has not been started", w)
}
objectKeys, maxResourceVersion, err := w.getObjectKeysAndMaxVersion()
if err != nil {
return err
}
// Wait until checkers for all objects are registered:
// - when object is created/updated, it's enough to wait for its resourceVersion to
// be processed by our handler; thus we wait until all events up to maxResourceVersion
// are processed before proceeding
// - when object is deleted, by definition it will not be returned by the LIST request,
// thus resourceVersion of the deletion may be higher than the maxResourceVersion;
// we solve that by waiting until list of currently existing objects (that we propagate
// via our handler) is equal to the expected one;
// NOTE: we're not resiliant to situations where an object will be created/deleted
// after the LIST call happened. But given measurement and phases don't infer with
// each other, it can't be clusterloader that deleted it. Thus we accept this limitation.
// NOTE: we could try waiting for the informer state to be the same and use the
// resourceVersion from there, but then existence of bookmarks and the fact that our
// informer doesn't necessary follow all objects of a given type can break that.
// See #1259 for more details.
cond := func() (bool, error) {
w.lock.Lock()
defer w.lock.Unlock()
return w.opResourceVersion >= maxResourceVersion && objectKeys.Equal(w.objectKeys), nil
}
if err := wait.Poll(checkControlledPodsInterval, syncTimeout, cond); err != nil {
return fmt.Errorf("timed out while waiting for controlled pods: %v", err)
}
w.handlingGroup.Wait()
w.lock.Lock()
defer w.lock.Unlock()
var numberRunning, numberDeleted, numberTimeout, numberFailed int
failedErrList := errors.NewErrorList()
timedOutObjects := []string{}
var maxDuration time.Duration
for _, checker := range w.checkerMap {
objChecker := checker.(*objectChecker)
status, err := objChecker.getStatus()
if objChecker.duration > maxDuration {
maxDuration = objChecker.duration
}
switch status {
case running:
numberRunning++
case deleted:
numberDeleted++
case timeout:
timedOutObjects = append(timedOutObjects, objChecker.key)
numberTimeout++
case deleteTimeout:
timedOutObjects = append(timedOutObjects, objChecker.key)
numberTimeout++
podsClient := w.clusterFramework.GetClientSets().GetClient().CoreV1().Pods(w.selector.Namespace)
err := podsClient.DeleteCollection(context.Background(), forceDeleteOptions(), w.listOptions())
if err != nil {
klog.Errorf("Error: %s while Force Deleting Pod, %s", err, objChecker.key)
}
case failed:
numberFailed++
if err != nil {
failedErrList.Append(err)
}
default:
// Probably implementation bug.
return fmt.Errorf("got unknown status for %v: status=%v, err=%v", objChecker.key, status, err)
}
}
klog.V(2).Infof("%s: running %d, deleted %d, timeout: %d, failed: %d", w, numberRunning, numberDeleted, numberTimeout, numberFailed)
var ratio float64
if w.operationTimeout != 0 {
ratio = float64(maxDuration) / float64(w.operationTimeout)
}
klog.V(2).Infof("%s: maxDuration=%v, operationTimeout=%v, ratio=%.2f", w, maxDuration, w.operationTimeout, ratio)
if numberTimeout > 0 {
klog.Errorf("Timed out %ss: %s", w.kind, strings.Join(timedOutObjects, ", "))
return fmt.Errorf("%d objects timed out: %ss: %s", numberTimeout, w.kind, strings.Join(timedOutObjects, ", "))
}
if objectKeys.Len() != numberRunning {
klog.Errorf("%s: incorrect objects number: %d/%d %ss are running with all pods", w, numberRunning, objectKeys.Len(), w.kind)
return fmt.Errorf("incorrect objects number: %d/%d %ss are running with all pods", numberRunning, objectKeys.Len(), w.kind)
}
if numberFailed > 0 {
klog.Errorf("%s: failed status for %d %ss: %s", w, numberFailed, w.kind, failedErrList.String())
return fmt.Errorf("failed objects statuses: %v", failedErrList.String())
}
klog.V(2).Infof("%s: %d/%d %ss are running with all pods", w, numberRunning, objectKeys.Len(), w.kind)
return nil
}