func()

in clusterloader2/pkg/measurement/common/wait_for_controlled_pods.go [542:625]


func (w *waitForControlledPodsRunningMeasurement) waitForRuntimeObject(obj runtime.Object, isDeleted bool, operationTimeout time.Duration) (*objectChecker, error) {
	ctx := context.TODO()

	runtimeObjectReplicas, err := runtimeobjects.GetReplicasFromRuntimeObject(w.clusterFramework.GetClientSets().GetClient(), obj)
	if err != nil {
		return nil, err
	}
	var isPodUpdated func(*v1.Pod) error
	if w.checkIfPodsAreUpdated {
		isPodUpdated, err = runtimeobjects.GetIsPodUpdatedPredicateFromRuntimeObject(obj)
		if err != nil {
			return nil, err
		}
	}
	if isDeleted {
		runtimeObjectReplicas = &runtimeobjects.ConstReplicas{0}
	}
	key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
	if err != nil {
		return nil, fmt.Errorf("meta key creation error: %v", err)
	}

	podStore, err := measurementutil.NewOwnerReferenceBasedPodStore(w.podsIndexer, obj)
	if err != nil {
		return nil, fmt.Errorf("failed to create pod store: %w", err)
	}

	o := newObjectChecker(key)
	o.lock.Lock()
	defer o.lock.Unlock()
	w.handlingGroup.Start(func() {
		ctx, cancel := context.WithCancel(ctx)
		defer cancel()
		o.SetCancel(cancel)
		if operationTimeout != time.Duration(0) {
			ctx, cancel = context.WithTimeout(ctx, operationTimeout)
			defer cancel()
		}
		if err := runtimeObjectReplicas.Start(ctx.Done()); err != nil {
			klog.Errorf("%s: error while starting runtimeObjectReplicas: %v", key, err)
			o.err = fmt.Errorf("failed to start runtimeObjectReplicas: %v", err)
			return
		}
		options := &measurementutil.WaitForPodOptions{
			DesiredPodCount:     runtimeObjectReplicas.Replicas,
			CountErrorMargin:    w.countErrorMargin,
			CallerName:          w.String(),
			WaitForPodsInterval: defaultWaitForPodsInterval,
			IsPodUpdated:        isPodUpdated,
		}

		// This function sets the status (and error message) for the object checker.
		// The handling of bad statuses and errors is done by gather() function of the measurement.
		start := time.Now()
		err := measurementutil.WaitForPods(ctx, podStore, options)
		o.lock.Lock()
		defer o.lock.Unlock()
		o.duration = time.Since(start)

		if err != nil {
			klog.Errorf("%s: error for %v: %v", w, key, err)
			o.status = failed
			o.err = fmt.Errorf("%s: %v", key, err)

			hasTimedOut := ctx.Err() != nil
			if hasTimedOut {
				if isDeleted {
					o.status = deleteTimeout
				} else {
					o.status = timeout
				}
				klog.Errorf("%s: %s timed out", w, key)
			}
			return
		}
		if isDeleted {
			o.status = deleted
			return
		}

		o.status = running
	})
	return o, nil
}