func()

in clusterloader2/pkg/measurement/common/wait_for_controlled_pods.go [245:340]


func (w *waitForControlledPodsRunningMeasurement) gather(syncTimeout time.Duration) error {
	klog.V(2).Infof("%v: waiting for controlled pods measurement...", w)
	if !w.isRunning {
		return fmt.Errorf("metric %s has not been started", w)
	}
	objectKeys, maxResourceVersion, err := w.getObjectKeysAndMaxVersion()
	if err != nil {
		return err
	}

	// Wait until checkers for all objects are registered:
	// - when object is created/updated, it's enough to wait for its resourceVersion to
	//   be processed by our handler; thus we wait until all events up to maxResourceVersion
	//   are processed before proceeding
	// - when object is deleted, by definition it will not be returned by the LIST request,
	//   thus resourceVersion of the deletion may be higher than the maxResourceVersion;
	//   we solve that by waiting until list of currently existing objects (that we propagate
	//   via our handler) is equal to the expected one;
	//   NOTE: we're not resiliant to situations where an object will be created/deleted
	//   after the LIST call happened. But given measurement and phases don't infer with
	//   each other, it can't be clusterloader that deleted it. Thus we accept this limitation.
	//   NOTE: we could try waiting for the informer state to be the same and use the
	//   resourceVersion from there, but then existence of bookmarks and the fact that our
	//   informer doesn't necessary follow all objects of a given type can break that.
	//   See #1259 for more details.

	cond := func() (bool, error) {
		w.lock.Lock()
		defer w.lock.Unlock()
		return w.opResourceVersion >= maxResourceVersion && objectKeys.Equal(w.objectKeys), nil
	}
	if err := wait.Poll(checkControlledPodsInterval, syncTimeout, cond); err != nil {
		return fmt.Errorf("timed out while waiting for controlled pods: %v", err)
	}

	w.handlingGroup.Wait()
	w.lock.Lock()
	defer w.lock.Unlock()
	var numberRunning, numberDeleted, numberTimeout, numberFailed int
	failedErrList := errors.NewErrorList()
	timedOutObjects := []string{}
	var maxDuration time.Duration
	for _, checker := range w.checkerMap {
		objChecker := checker.(*objectChecker)
		status, err := objChecker.getStatus()
		if objChecker.duration > maxDuration {
			maxDuration = objChecker.duration
		}
		switch status {
		case running:
			numberRunning++
		case deleted:
			numberDeleted++
		case timeout:
			timedOutObjects = append(timedOutObjects, objChecker.key)
			numberTimeout++
		case deleteTimeout:
			timedOutObjects = append(timedOutObjects, objChecker.key)
			numberTimeout++
			podsClient := w.clusterFramework.GetClientSets().GetClient().CoreV1().Pods(w.selector.Namespace)
			err := podsClient.DeleteCollection(context.Background(), forceDeleteOptions(), w.listOptions())
			if err != nil {
				klog.Errorf("Error: %s while Force Deleting Pod, %s", err, objChecker.key)
			}
		case failed:
			numberFailed++
			if err != nil {
				failedErrList.Append(err)
			}
		default:
			// Probably implementation bug.
			return fmt.Errorf("got unknown status for %v: status=%v, err=%v", objChecker.key, status, err)
		}
	}
	klog.V(2).Infof("%s: running %d, deleted %d, timeout: %d, failed: %d", w, numberRunning, numberDeleted, numberTimeout, numberFailed)
	var ratio float64
	if w.operationTimeout != 0 {
		ratio = float64(maxDuration) / float64(w.operationTimeout)
	}
	klog.V(2).Infof("%s: maxDuration=%v, operationTimeout=%v, ratio=%.2f", w, maxDuration, w.operationTimeout, ratio)
	if numberTimeout > 0 {
		klog.Errorf("Timed out %ss: %s", w.kind, strings.Join(timedOutObjects, ", "))
		return fmt.Errorf("%d objects timed out: %ss: %s", numberTimeout, w.kind, strings.Join(timedOutObjects, ", "))
	}
	if objectKeys.Len() != numberRunning {
		klog.Errorf("%s: incorrect objects number: %d/%d %ss are running with all pods", w, numberRunning, objectKeys.Len(), w.kind)
		return fmt.Errorf("incorrect objects number: %d/%d %ss are running with all pods", numberRunning, objectKeys.Len(), w.kind)
	}
	if numberFailed > 0 {
		klog.Errorf("%s: failed status for %d %ss: %s", w, numberFailed, w.kind, failedErrList.String())
		return fmt.Errorf("failed objects statuses: %v", failedErrList.String())
	}

	klog.V(2).Infof("%s: %d/%d %ss are running with all pods", w, numberRunning, objectKeys.Len(), w.kind)
	return nil
}