clusterloader2/pkg/prometheus/experimental.go (218 lines of code) (raw):
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package prometheus
import (
"context"
"encoding/json"
"fmt"
"os"
"os/exec"
"os/signal"
"path"
"regexp"
"time"
"github.com/spf13/pflag"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/klog/v2"
)
var (
csiVolumeHandlePattern = regexp.MustCompile(`projects/(?P<Project>[^/]+)/zones/(?P<Zone>[^/]+)/disks/(?P<Name>[^/]+)`)
)
type prometheusDiskMetadata struct {
name string
zone string
}
const (
gcloudRetryInterval = 20 * time.Second
snapshotRetryTimeout = 5 * time.Minute
deleteRetryTimeout = 2 * time.Minute
)
var (
shouldSnapshotPrometheusDisk = pflag.Bool("experimental-gcp-snapshot-prometheus-disk", false, "(experimental, provider=gce|gke only) whether to snapshot Prometheus disk before Prometheus stack is torn down")
shouldSnapshotPrometheusToReportDir = pflag.Bool("experimental-prometheus-snapshot-to-report-dir", false, "(experimental) whether to save prometheus snapshot to the report-dir")
prometheusDiskSnapshotName = pflag.String("experimental-prometheus-disk-snapshot-name", "", "Name of the prometheus disk snapshot that will be created if snapshots are enabled. If not set, the prometheus disk name will be used.")
)
func (pc *Controller) isEnabled() (bool, error) {
if !*shouldSnapshotPrometheusDisk {
return false, nil
}
if !pc.provider.Features().SupportSnapshotPrometheusDisk {
return false, fmt.Errorf(
"snapshotting Prometheus' disk only available for GCP providers (gce, gke, kubemark), provider is: %s", pc.provider.Name())
}
return true, nil
}
func (pc *Controller) cachePrometheusDiskMetadataIfEnabled() error {
if enabled, err := pc.isEnabled(); !enabled {
return err
}
return wait.Poll(
10*time.Second,
2*time.Minute,
pc.tryRetrievePrometheusDiskMetadata)
}
func (pc *Controller) tryRetrievePrometheusDiskMetadata() (bool, error) {
klog.V(2).Info("Retrieving Prometheus' persistent disk metadata...")
k8sClient := pc.framework.GetClientSets().GetClient()
list, err := k8sClient.CoreV1().PersistentVolumes().List(context.TODO(), metav1.ListOptions{})
if err != nil {
klog.Errorf("Listing PVs failed: %v", err)
// Poll() stops on error so returning nil
return false, nil
}
var pdName, zone string
for _, pv := range list.Items {
if pv.Spec.ClaimRef.Name != "prometheus-k8s-db-prometheus-k8s-0" {
continue
}
if pv.Status.Phase != corev1.VolumeBound {
continue
}
klog.V(2).Infof("Found Prometheus' PV with name: %s", pv.Name)
pdName, zone, err = nameAndZone(pv)
if err != nil {
return false, fmt.Errorf("failed to extract name and value from pv: %w", err)
}
klog.V(2).Infof("PD name=%s, zone=%s", pdName, zone)
}
if pdName == "" || zone == "" {
klog.Warningf("missing zone or PD name, aborting")
klog.V(2).Info("PV list was:")
s, err := json.MarshalIndent(list, "" /*=prefix*/, " " /*=indent*/)
if err != nil {
klog.Warningf("Error while marshalling response %v: %v", list, err)
return true, err
}
klog.V(2).Info(string(s))
return true, nil
}
pc.diskMetadata.name = pdName
pc.diskMetadata.zone = zone
return true, nil
}
func nameAndZone(pv corev1.PersistentVolume) (name, zone string, err error) {
switch {
case pv.Spec.GCEPersistentDisk != nil:
name = pv.Spec.GCEPersistentDisk.PDName
zone = pv.ObjectMeta.Labels["topology.kubernetes.io/zone"]
if zone == "" {
// Fallback to old label to make it work for old k8s versions.
zone = pv.ObjectMeta.Labels["failure-domain.beta.kubernetes.io/zone"]
}
return name, zone, err
case pv.Spec.CSI != nil:
r := csiVolumeHandlePattern.FindStringSubmatch(pv.Spec.CSI.VolumeHandle)
if len(r) != 4 {
return "", "", fmt.Errorf("unexpected format of volumeHandle: %q", pv.Spec.CSI.VolumeHandle)
}
return r[3], r[2], nil
}
return "", "", fmt.Errorf("unknown pv type: %+v", pv)
}
func (pc *Controller) snapshotPrometheusDiskIfEnabledSynchronized() error {
pc.snapshotLock.Lock()
defer pc.snapshotLock.Unlock()
if pc.snapshotted {
return pc.snapshotError
}
pc.snapshotted = true
pc.snapshotError = pc.snapshotPrometheusDiskIfEnabled()
return pc.snapshotError
}
func (pc *Controller) snapshotPrometheusIfEnabled() error {
if !*shouldSnapshotPrometheusToReportDir {
return nil
}
k8sClient := pc.framework.GetClientSets().GetClient()
restClient := pc.framework.GetRestClient()
filePath := path.Join(pc.clusterLoaderConfig.ReportDir, "prometheus_snapshot.tar.gz")
return makeSnapshot(k8sClient, restClient, filePath)
}
func (pc *Controller) snapshotPrometheusDiskIfEnabled() error {
if enabled, err := pc.isEnabled(); !enabled {
return err
}
if pc.diskMetadata.name == "" || pc.diskMetadata.zone == "" {
klog.Errorf("Missing zone or PD name, aborting snapshot")
klog.V(2).Infof("PD name=%s, zone=%s", pc.diskMetadata.name, pc.diskMetadata.zone)
return fmt.Errorf("missing zone or PD name, aborting snapshot")
}
// Select snapshot name
snapshotName := pc.diskMetadata.name
if *prometheusDiskSnapshotName != "" {
if err := VerifySnapshotName(*prometheusDiskSnapshotName); err == nil {
snapshotName = *prometheusDiskSnapshotName
} else {
klog.Warningf("Incorrect disk name %v: %v. Using default name: %v", *prometheusDiskSnapshotName, err, snapshotName)
}
}
// Snapshot Prometheus disk
return wait.Poll(
gcloudRetryInterval,
snapshotRetryTimeout,
func() (bool, error) {
err := pc.trySnapshotPrometheusDisk(pc.diskMetadata.name, snapshotName, pc.diskMetadata.zone)
if err != nil {
klog.Errorf("Trying to snapshot prometheus disk failed: %v", err)
}
// Poll() stops on error so returning nil
return err == nil, nil
})
}
func (pc *Controller) trySnapshotPrometheusDisk(pdName, snapshotName, zone string) error {
klog.V(2).Info("Trying to snapshot Prometheus' persistent disk...")
project := pc.clusterLoaderConfig.PrometheusConfig.SnapshotProject
if project == "" {
// This should never happen when run from kubetest with a GCE/GKE Kubernetes
// provider - kubetest always propagates PROJECT env var in such situations.
return fmt.Errorf("unknown project - please set --experimental-snapshot-project flag")
}
klog.V(2).Infof("Snapshotting PD %q into snapshot %q in project %q in zone %q", pdName, snapshotName, project, zone)
cmd := exec.Command("gcloud", "compute", "disks", "snapshot", pdName, "--project", project, "--zone", zone, "--snapshot-names", snapshotName)
output, err := cmd.CombinedOutput()
if err != nil {
return fmt.Errorf("creating disk snapshot failed: %v\nCommand output: %q", err, string(output))
}
klog.V(2).Infof("Creating disk snapshot finished with: %q", string(output))
return nil
}
func (pc *Controller) deletePrometheusDiskIfEnabled() error {
if enabled, err := pc.isEnabled(); !enabled {
return err
}
if pc.diskMetadata.name == "" || pc.diskMetadata.zone == "" {
klog.Errorf("Missing zone or PD name, aborting deletion")
klog.V(2).Infof("PD name=%s, zone=%s", pc.diskMetadata.name, pc.diskMetadata.zone)
return fmt.Errorf("missing zone or PD name, aborting deletion")
}
// Delete Prometheus disk
return wait.Poll(
gcloudRetryInterval,
deleteRetryTimeout,
func() (bool, error) {
err := pc.tryDeletePrometheusDisk(pc.diskMetadata.name, pc.diskMetadata.zone)
if err != nil {
klog.Errorf("Trying to delete prometheus disk failed: %v", err)
}
// Poll() stops on error so returning nil
return err == nil, nil
})
}
func (pc *Controller) tryDeletePrometheusDisk(pdName, zone string) error {
klog.V(2).Info("Trying to delete Prometheus' persistent disk...")
project := pc.clusterLoaderConfig.PrometheusConfig.SnapshotProject
if project == "" {
// This should never happen when run from kubetest with a GCE/GKE Kubernetes
// provider - kubetest always propagates PROJECT env var in such situations.
return fmt.Errorf("unknown project - please set --experimental-snapshot-project flag")
}
klog.V(2).Infof("Deleting PD %q in project %q in zone %q", pdName, project, zone)
cmd := exec.Command("gcloud", "compute", "disks", "delete", pdName, "--project", project, "--zone", zone)
output, err := cmd.CombinedOutput()
if err != nil {
return fmt.Errorf("deleting disk failed: %v\nCommand output: %q", err, string(output))
}
klog.V(2).Infof("Deleting disk finished with: %q", string(output))
return nil
}
func (pc *Controller) EnableTearDownPrometheusStackOnInterrupt() {
c := make(chan os.Signal, 1)
signal.Notify(c, os.Interrupt)
go func() {
// Block until a signal is received.
s := <-c
klog.V(2).Infof("Received signal: %v", s)
if err := pc.snapshotPrometheusDiskIfEnabledSynchronized(); err != nil {
klog.Warningf("Error while snapshotting prometheus disk: %v", err)
}
os.Exit(1)
}()
}