in clusterloader2/pkg/measurement/common/wait_for_controlled_pods.go [542:625]
func (w *waitForControlledPodsRunningMeasurement) waitForRuntimeObject(obj runtime.Object, isDeleted bool, operationTimeout time.Duration) (*objectChecker, error) {
ctx := context.TODO()
runtimeObjectReplicas, err := runtimeobjects.GetReplicasFromRuntimeObject(w.clusterFramework.GetClientSets().GetClient(), obj)
if err != nil {
return nil, err
}
var isPodUpdated func(*v1.Pod) error
if w.checkIfPodsAreUpdated {
isPodUpdated, err = runtimeobjects.GetIsPodUpdatedPredicateFromRuntimeObject(obj)
if err != nil {
return nil, err
}
}
if isDeleted {
runtimeObjectReplicas = &runtimeobjects.ConstReplicas{0}
}
key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
if err != nil {
return nil, fmt.Errorf("meta key creation error: %v", err)
}
podStore, err := measurementutil.NewOwnerReferenceBasedPodStore(w.podsIndexer, obj)
if err != nil {
return nil, fmt.Errorf("failed to create pod store: %w", err)
}
o := newObjectChecker(key)
o.lock.Lock()
defer o.lock.Unlock()
w.handlingGroup.Start(func() {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
o.SetCancel(cancel)
if operationTimeout != time.Duration(0) {
ctx, cancel = context.WithTimeout(ctx, operationTimeout)
defer cancel()
}
if err := runtimeObjectReplicas.Start(ctx.Done()); err != nil {
klog.Errorf("%s: error while starting runtimeObjectReplicas: %v", key, err)
o.err = fmt.Errorf("failed to start runtimeObjectReplicas: %v", err)
return
}
options := &measurementutil.WaitForPodOptions{
DesiredPodCount: runtimeObjectReplicas.Replicas,
CountErrorMargin: w.countErrorMargin,
CallerName: w.String(),
WaitForPodsInterval: defaultWaitForPodsInterval,
IsPodUpdated: isPodUpdated,
}
// This function sets the status (and error message) for the object checker.
// The handling of bad statuses and errors is done by gather() function of the measurement.
start := time.Now()
err := measurementutil.WaitForPods(ctx, podStore, options)
o.lock.Lock()
defer o.lock.Unlock()
o.duration = time.Since(start)
if err != nil {
klog.Errorf("%s: error for %v: %v", w, key, err)
o.status = failed
o.err = fmt.Errorf("%s: %v", key, err)
hasTimedOut := ctx.Err() != nil
if hasTimedOut {
if isDeleted {
o.status = deleteTimeout
} else {
o.status = timeout
}
klog.Errorf("%s: %s timed out", w, key)
}
return
}
if isDeleted {
o.status = deleted
return
}
o.status = running
})
return o, nil
}