clusterloader2/pkg/measurement/util/prometheus.go (135 lines of code) (raw):

/* Copyright 2018 The Kubernetes Authors. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ package util import ( "encoding/json" "fmt" "io" "math" "strings" "time" "github.com/prometheus/common/expfmt" "github.com/prometheus/common/model" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/klog/v2" prom "k8s.io/perf-tests/clusterloader2/pkg/prometheus/clients" ) const ( queryTimeout = 15 * time.Minute queryInterval = 30 * time.Second ) // ExtractMetricSamples unpacks metric blob into prometheus model structures. func ExtractMetricSamples(metricsBlob string) ([]*model.Sample, error) { dec := expfmt.NewDecoder(strings.NewReader(metricsBlob), expfmt.FmtText) decoder := expfmt.SampleDecoder{ Dec: dec, Opts: &expfmt.DecodeOptions{}, } var samples []*model.Sample for { var v model.Vector if err := decoder.Decode(&v); err != nil { if err == io.EOF { // Expected loop termination condition. return samples, nil } return nil, err } samples = append(samples, v...) } } // ExtractMetricSamples2 unpacks metric blob into prometheus model structures. func ExtractMetricSamples2(response []byte) ([]*model.Sample, error) { var pqr promQueryResponse if err := json.Unmarshal(response, &pqr); err != nil { return nil, err } if pqr.Status != "success" { return nil, fmt.Errorf("non-success response status: %v", pqr.Status) } vector, ok := pqr.Data.v.(model.Vector) if !ok { return nil, fmt.Errorf("incorrect response type: %v", pqr.Data.v.Type()) } return []*model.Sample(vector), nil } // promQueryResponse stores the response from the Prometheus server. // This struct follows the format described in the Prometheus documentation: // https://prometheus.io/docs/prometheus/latest/querying/api/#format-overview. type promQueryResponse struct { Status string `json:"status"` Data promResponseData `json:"data"` ErrorType string `json:"errorType"` Error string `json:"error"` Warnings []string `json:"warnings"` } type promResponseData struct { v model.Value } // NewQueryExecutor creates instance of PrometheusQueryExecutor. func NewQueryExecutor(pc prom.Client) *PrometheusQueryExecutor { return &PrometheusQueryExecutor{client: pc} } // PrometheusQueryExecutor executes queries against Prometheus. type PrometheusQueryExecutor struct { client prom.Client } // Query executes given prometheus query at given point in time. func (e *PrometheusQueryExecutor) Query(query string, queryTime time.Time) ([]*model.Sample, error) { if queryTime.IsZero() { return nil, fmt.Errorf("query time can't be zero") } var body []byte var queryErr error klog.V(2).Infof("Executing %q at %v", query, queryTime.Format(time.RFC3339)) if err := wait.PollImmediate(queryInterval, queryTimeout, func() (bool, error) { body, queryErr = e.client.Query(query, queryTime) if queryErr != nil { return false, nil } return true, nil }); err != nil { if queryErr != nil { resp := "(empty)" if body != nil { resp = string(body) } return nil, fmt.Errorf("query error: %v [body: %s]", queryErr, resp) } return nil, fmt.Errorf("error: %v", err) } samples, err := ExtractMetricSamples2(body) if err != nil { return nil, fmt.Errorf("extracting error: %v", err) } var resultSamples []*model.Sample for _, sample := range samples { if !math.IsNaN(float64(sample.Value)) { resultSamples = append(resultSamples, sample) } } klog.V(4).Infof("Got %d samples", len(resultSamples)) return resultSamples, nil } // UnmarshalJSON unmarshals json into promResponseData structure. func (qr *promResponseData) UnmarshalJSON(b []byte) error { v := struct { Type model.ValueType `json:"resultType"` Result json.RawMessage `json:"result"` }{} err := json.Unmarshal(b, &v) if err != nil { return err } switch v.Type { case model.ValScalar: var sv model.Scalar err = json.Unmarshal(v.Result, &sv) qr.v = &sv case model.ValVector: var vv model.Vector err = json.Unmarshal(v.Result, &vv) qr.v = vv case model.ValMatrix: var mv model.Matrix err = json.Unmarshal(v.Result, &mv) qr.v = mv default: err = fmt.Errorf("unexpected value type %q", v.Type) } return err } // ToPrometheusTime returns prometheus string representation of given time. func ToPrometheusTime(t time.Duration) string { if t < time.Minute { return fmt.Sprintf("%ds", int64(t)/int64(time.Second)) } return fmt.Sprintf("%dm", int64(t)/int64(time.Minute)) }