in clusterloader2/pkg/measurement/util/wait_for_pvs.go [49:99]
func WaitForPVs(clientSet clientset.Interface, stopCh <-chan struct{}, options *WaitForPVOptions) error {
ps, err := NewPVStore(clientSet, options.Selector, options.Provisioner)
if err != nil {
return fmt.Errorf("PV store creation error: %v", err)
}
defer ps.Stop()
oldPVs := ps.List()
scaling := uninitialized
var pvStatus PVsStartupStatus
switch {
case len(oldPVs) == options.DesiredPVCount:
scaling = none
case len(oldPVs) < options.DesiredPVCount:
scaling = up
case len(oldPVs) > options.DesiredPVCount:
scaling = down
}
for {
select {
case <-stopCh:
example := ""
if len(oldPVs) > 0 {
example = fmt.Sprintf(", first one is %+v", oldPVs[0])
}
return fmt.Errorf("timeout while waiting for %d PVs with selector '%s' - only %d found provisioned%s",
options.DesiredPVCount, options.filter(), pvStatus.Bound+pvStatus.Available, example)
case <-time.After(options.WaitForPVsInterval):
pvs := ps.List()
pvStatus = ComputePVsStartupStatus(pvs, options.DesiredPVCount)
diff := DiffPVs(oldPVs, pvs)
deletedPVs := diff.DeletedPVs()
if scaling != down && len(deletedPVs) > 0 {
klog.Errorf("%s: %s: %d PVs disappeared: %v", options.CallerName, options.Selector.String(), len(deletedPVs), strings.Join(deletedPVs, ", "))
}
addedPVs := diff.AddedPVs()
if scaling != up && len(addedPVs) > 0 {
klog.Errorf("%s: %s: %d PVs appeared: %v", options.CallerName, options.filter(), len(addedPVs), strings.Join(addedPVs, ", "))
}
klog.V(2).Infof("%s: %s: %s", options.CallerName, options.filter(), pvStatus.String())
// We wait until there is a desired number of PVs provisioned and all other PVs are pending.
if len(pvs) == (pvStatus.Bound+pvStatus.Available+pvStatus.Pending) && pvStatus.Bound+pvStatus.Available == options.DesiredPVCount {
return nil
}
oldPVs = pvs
}
}
}