func WaitForPVs()

in clusterloader2/pkg/measurement/util/wait_for_pvs.go [49:99]


func WaitForPVs(clientSet clientset.Interface, stopCh <-chan struct{}, options *WaitForPVOptions) error {
	ps, err := NewPVStore(clientSet, options.Selector, options.Provisioner)
	if err != nil {
		return fmt.Errorf("PV store creation error: %v", err)
	}
	defer ps.Stop()

	oldPVs := ps.List()
	scaling := uninitialized
	var pvStatus PVsStartupStatus

	switch {
	case len(oldPVs) == options.DesiredPVCount:
		scaling = none
	case len(oldPVs) < options.DesiredPVCount:
		scaling = up
	case len(oldPVs) > options.DesiredPVCount:
		scaling = down
	}

	for {
		select {
		case <-stopCh:
			example := ""
			if len(oldPVs) > 0 {
				example = fmt.Sprintf(", first one is %+v", oldPVs[0])
			}
			return fmt.Errorf("timeout while waiting for %d PVs with selector '%s' - only %d found provisioned%s",
				options.DesiredPVCount, options.filter(), pvStatus.Bound+pvStatus.Available, example)
		case <-time.After(options.WaitForPVsInterval):
			pvs := ps.List()
			pvStatus = ComputePVsStartupStatus(pvs, options.DesiredPVCount)

			diff := DiffPVs(oldPVs, pvs)
			deletedPVs := diff.DeletedPVs()
			if scaling != down && len(deletedPVs) > 0 {
				klog.Errorf("%s: %s: %d PVs disappeared: %v", options.CallerName, options.Selector.String(), len(deletedPVs), strings.Join(deletedPVs, ", "))
			}
			addedPVs := diff.AddedPVs()
			if scaling != up && len(addedPVs) > 0 {
				klog.Errorf("%s: %s: %d PVs appeared: %v", options.CallerName, options.filter(), len(addedPVs), strings.Join(addedPVs, ", "))
			}
			klog.V(2).Infof("%s: %s: %s", options.CallerName, options.filter(), pvStatus.String())
			// We wait until there is a desired number of PVs provisioned and all other PVs are pending.
			if len(pvs) == (pvStatus.Bound+pvStatus.Available+pvStatus.Pending) && pvStatus.Bound+pvStatus.Available == options.DesiredPVCount {
				return nil
			}
			oldPVs = pvs
		}
	}
}