common/stats/stats.go (387 lines of code) (raw):
// This package provides a set of minimal interfaces which both build on and
// are by default backed by go-metrics. We wrap go-metrics in order to provide
// a few pieces of additional functionality and to make sure we don't leak our
// dependencies to anyone pulling in scoot as a library.
//
// Specifically, we provide the following:
// - Flexibility to override stat recording and formatting, ex: internal Twitter format.
// - An interface similar in design to Finagle Metrics
// - A StatsReceiver object that can be passed down a call tree and scoped to each level.
// - The ability to specify a time.Duration precision when rendering instruments.
// - A latched update mechanism which takes snapshots at regular intervals.
// - A new Latency instrument to more easily record callsite latency.
// - Pretty printing of instrument output.
//
// Original license: github.com/rcrowley/go-metrics/blob/master/LICENSE
//
package stats
import (
"encoding/json"
"strings"
"sync"
"time"
"github.com/rcrowley/go-metrics"
log "github.com/sirupsen/logrus"
"golang.org/x/net/context"
)
// For testing.
var Time StatsTime = DefaultStatsTime()
var StatReportIntvl time.Duration = 500 * time.Millisecond
var DefaultStartupGaugeSpikeLen time.Duration = 1 * time.Minute
// Stats users can either reference this global receiver or construct their own.
var CurrentStatsReceiver StatsReceiver = NilStatsReceiver()
// Overridable instrument creation.
var NewCounter func() Counter = newMetricCounter
var NewGauge func() Gauge = newMetricGauge
var NewGaugeFloat func() GaugeFloat = newMetricGaugeFloat
var NewHistogram func() Histogram = newMetricHistogram
var NewLatency func() Latency = newLatency
// To check if pretty printing is supported.
type MarshalerPretty interface {
MarshalJSONPretty() ([]byte, error)
}
//
// Similar to the go-metrics registry but with most methods removed.
//
// Note: the default StatsRegistry (from rcrowley) doesn't support the Latency metric,
// only finagleStatsRegistry has logic to check for and marshal latency.
type StatsRegistry interface {
// Gets an existing metric or registers the given one.
// The interface can be the metric to register if not found in registry,
// or a function returning the metric for lazy instantiation.
GetOrRegister(string, interface{}) interface{}
// Unregister the metric with the given name.
Unregister(string)
// Call the given function for each registered metric.
Each(func(string, interface{}))
}
//
// A registry wrapper for metrics that will be collected about the runtime
// performance of an application.
//
// A quick note about name elements: hierarchical names are stored using a '/'
// path separator. To avoid confusion, variadic name elements passed to any
// method will have '/' characters in their names replaced by the string
// "_SLASH_" before they are used internally. This is instead of failing,
// because sometimes counters are dynamically generated (i.e. with error
// names), and it is better to strip the path elements than to, for example,
// panic.
//
type StatsReceiver interface {
// Return a stats receiver that will automatically namespace elements with
// the given scope args.
//
// statsReceiver.Scope("foo", "bar").Stat("baz") // is equivalent to
// statsReceiver.Stat("foo", "bar", "baz")
//
Scope(scope ...string) StatsReceiver
// If StatsRegistry supports the latency instrument:
//
// Returns a copy that can in turn create a Latency instrument that will use the
// given precision as its display precision when the stats are rendered as
// JSON. For example:
//
// statsReceiver.Precision(time.Millisecond).Stat("foo_ms")
//
// means that the 'foo_ms' stat will have its nanosecond data points displayed
// as milliseconds when rendered. Note that this does _not_ affect the
// captured data in any way, only its display.
//
// If the given duration is <= 1ns, we will default to ns.
Precision(time.Duration) StatsReceiver
// Provides an event counter
Counter(name ...string) Counter
// Provides a histogram of sampled stats over time. Times output in
// nanoseconds by default, but can be adjusted by using the Precision()
// function.
Latency(name ...string) Latency
// Add a gauge, which holds an int64 value that can be set arbitrarily.
Gauge(name ...string) Gauge
// Add a gauge, which holds a float64 value that can be set arbitrarily.
GaugeFloat(name ...string) GaugeFloat
// Provide a histogram of sampled stats
Histogram(name ...string) Histogram
// Removes the given named stats item if it exists
Remove(name ...string)
// Construct a JSON string by marshaling the registry.
Render(pretty bool) []byte
}
//
// DefaultStats is a small wrapper around a go-metrics like registry.
// Uses defaultStatsRegistry and sets latched duration to zero.
// Note: a <=0 latch means that the stats are reset on every call to Render().
func DefaultStatsReceiver() StatsReceiver {
stat, _ := NewCustomStatsReceiver(nil, 0)
return stat
}
// Like DefaultStatsReceiver() but latched interval is made explicit.
// Starts a goroutine that periodically captures all and clears select instruments.
// Note: setting latched to <=0 will disable latching so rendering/resetting is on demand.
// Note: it is up the main app to prevent calls to Render() after canceling the latched receiver.
func NewLatchedStatsReceiver(latched time.Duration) (stat StatsReceiver, cancelFn func()) {
return NewCustomStatsReceiver(nil, latched)
}
// Like DefaultStatsReceiver() but registry and latched are made explicit.
func NewCustomStatsReceiver(makeRegistry func() StatsRegistry, latched time.Duration) (stat StatsReceiver, cancelFn func()) {
if makeRegistry == nil {
makeRegistry = func() StatsRegistry { return metrics.NewRegistry() }
}
defaultStat := &defaultStatsReceiver{
makeRegistry: makeRegistry,
registry: makeRegistry(),
precision: time.Millisecond,
}
cancel := func() {}
if latched > 0 {
var ctx context.Context
defaultStat.latchCh = make(chan chan CapturedRegistry)
ctx, cancel = context.WithCancel(context.Background())
firstSnapshotAt := Time.Now().Add(latched).Truncate(latched)
firstCaptured := capture(defaultStat.registry, makeRegistry())
go latch(
defaultStat, firstCaptured, defaultStat.latchCh,
Time.NewTicker(latched), firstSnapshotAt, ctx)
}
return defaultStat, cancel
}
// Called as a goroutine by stats constructor. Loops until ctx is canceled, periodically capturing stats.
func latch(stat *defaultStatsReceiver, captured StatsRegistry, latchCh chan chan CapturedRegistry,
ticker StatsTicker, firstSnapshotAt time.Time, ctx context.Context,
) {
captureTime := Time.Now()
for {
select {
case <-ctx.Done():
ticker.Stop()
return
case t := <-ticker.C():
if t.Before(firstSnapshotAt) {
break
}
captured = capture(stat.registry, stat.makeRegistry())
captureTime = t
clear(stat.registry)
case req := <-latchCh:
req <- CapturedRegistry{captured, captureTime}
}
}
}
// Writes a registry copy to 'captured' and returns that copy. Called by latch().
func capture(src StatsRegistry, captured StatsRegistry) StatsRegistry {
src.Each(func(name string, i interface{}) {
switch m := i.(type) {
case Counter:
captured.GetOrRegister(name, m.Capture())
case Gauge:
captured.GetOrRegister(name, m.Capture())
case GaugeFloat:
captured.GetOrRegister(name, m.Capture())
case Histogram:
captured.GetOrRegister(name, m.Capture())
case Latency:
captured.GetOrRegister(name, m.Capture())
default:
log.Info("Unrecognized capture instrument: ", name, i)
}
})
return captured
}
// Sends capture request to a latched goroutine and returns a StatRegistry copy.
// Note: it is up the main app to prevent calls to requestCapture after closing a latched receiver.
func requestCapture(latchCh chan chan CapturedRegistry) CapturedRegistry {
resultCh := make(chan CapturedRegistry)
latchCh <- resultCh
return <-resultCh
}
// Selectively clear the specified registry.
func clear(reg StatsRegistry) {
reg.Each(func(name string, i interface{}) {
switch m := i.(type) {
case metrics.Histogram:
m.Clear()
}
})
}
type CapturedRegistry struct {
captured StatsRegistry
time time.Time // This will either be incorporated into health checks or taken out.
}
type defaultStatsReceiver struct {
makeRegistry func() StatsRegistry
registry StatsRegistry
latchCh chan chan CapturedRegistry
precision time.Duration
scope []string
}
func (s *defaultStatsReceiver) Scope(scope ...string) StatsReceiver {
return &defaultStatsReceiver{s.makeRegistry, s.registry, s.latchCh, s.precision, s.scoped(scope...)}
}
func (s *defaultStatsReceiver) Precision(precision time.Duration) StatsReceiver {
if precision < 1 {
precision = 1
}
return &defaultStatsReceiver{s.makeRegistry, s.registry, s.latchCh, precision, s.scope}
}
func (s *defaultStatsReceiver) Counter(name ...string) Counter {
return s.registry.GetOrRegister(s.scopedName(name...), NewCounter).(Counter)
}
func (s *defaultStatsReceiver) Gauge(name ...string) Gauge {
return s.registry.GetOrRegister(s.scopedName(name...), NewGauge).(Gauge)
}
func (s *defaultStatsReceiver) GaugeFloat(name ...string) GaugeFloat {
return s.registry.GetOrRegister(s.scopedName(name...), NewGaugeFloat).(GaugeFloat)
}
func (s *defaultStatsReceiver) Histogram(name ...string) Histogram {
return s.registry.GetOrRegister(s.scopedName(name...), NewHistogram).(Histogram)
}
func (s *defaultStatsReceiver) Latency(name ...string) Latency {
//nl := func() Latency { return NewLatency().Precision(s.precision) }
//return s.registry.GetOrRegister(s.scopedName(name...), nl).(Latency)
// Can't do lazy instantiation since we may use metric.Registry and it can't cast factory return val.
return s.registry.GetOrRegister(s.scopedName(name...), NewLatency().Precision(s.precision)).(Latency)
}
func (s *defaultStatsReceiver) Remove(name ...string) {
s.registry.Unregister(s.scopedName(name...))
}
func (s *defaultStatsReceiver) render(pretty bool) []byte {
reg := s.registry
if s.latchCh != nil {
reg = requestCapture(s.latchCh).captured
}
var err error
var bytes []byte
if mp, ok := reg.(MarshalerPretty); ok && pretty {
bytes, err = mp.MarshalJSONPretty()
} else {
bytes, err = json.Marshal(reg)
}
if err != nil {
panic("StatsRegistry bug, cannot be marshaled")
}
return bytes
}
func (s *defaultStatsReceiver) Render(pretty bool) []byte {
stats := s.render(pretty)
if s.latchCh == nil {
clear(s.registry) // reset on every call to render when not latched.
}
return stats
}
// Append to existing scope and scrub slashes
func (s *defaultStatsReceiver) scoped(scope ...string) []string {
for i, s := range scope {
scope[i] = strings.Replace(s, "/", "_SLASH_", -1)
}
return append(s.scope[:], scope...)
}
// Append to the existing scope and convert to slash-delimited string.
func (s *defaultStatsReceiver) scopedName(scope ...string) string {
return strings.Join(s.scoped(scope...), "/")
}
//
// NilStats ignores all stats operations.
//
func NilStatsReceiver(scope ...string) StatsReceiver {
return &nilStatsReceiver{}
}
type nilStatsReceiver struct{}
func (s *nilStatsReceiver) Scope(scope ...string) StatsReceiver { return s }
func (s *nilStatsReceiver) Precision(precision time.Duration) StatsReceiver { return s }
func (s *nilStatsReceiver) Counter(name ...string) Counter {
return &metricCounter{&metrics.NilCounter{}}
}
func (s *nilStatsReceiver) Gauge(name ...string) Gauge {
return &metricGauge{&metrics.NilGauge{}}
}
func (s *nilStatsReceiver) GaugeFloat(name ...string) GaugeFloat {
return &metricGaugeFloat{&metrics.NilGaugeFloat64{}}
}
func (s *nilStatsReceiver) Histogram(name ...string) Histogram {
return &metricHistogram{&metrics.NilHistogram{}}
}
func (s *nilStatsReceiver) Latency(name ...string) Latency {
return newNilLatency()
}
func (s *nilStatsReceiver) Remove(name ...string) {}
func (s *nilStatsReceiver) Render(pretty bool) []byte { return []byte{} }
func (s *nilStatsReceiver) RenderNoClear(pretty bool) []byte { return []byte{} }
//
// Minimally mirror go-metrics instruments.
//
// Counter
type Counter interface {
Capture() Counter
Clear()
Count() int64
Inc(int64)
Update(int64)
}
type metricCounter struct{ metrics.Counter }
func (m *metricCounter) Capture() Counter { return &metricCounter{m.Snapshot()} }
func (m *metricCounter) Update(i int64) { m.Inc(i - m.Count()) }
func newMetricCounter() Counter { return &metricCounter{metrics.NewCounter()} }
// Gauge
type Gauge interface {
Capture() Gauge
Update(int64)
Value() int64
}
type metricGauge struct{ metrics.Gauge }
func (m *metricGauge) Capture() Gauge { return &metricGauge{m.Snapshot()} }
func newMetricGauge() Gauge { return &metricGauge{metrics.NewGauge()} }
// GaugeFloat
type GaugeFloat interface {
Capture() GaugeFloat
Update(float64)
Value() float64
}
type metricGaugeFloat struct{ metrics.GaugeFloat64 }
func (m *metricGaugeFloat) Capture() GaugeFloat { return &metricGaugeFloat{m.Snapshot()} }
func newMetricGaugeFloat() GaugeFloat { return &metricGaugeFloat{metrics.NewGaugeFloat64()} }
// Viewable histogram without updates or capture.
type HistogramView interface {
Mean() float64
Count() int64
Max() int64
Min() int64
Sum() int64
Percentiles(ps []float64) []float64
}
// Histogram
type Histogram interface {
HistogramView
Capture() Histogram
Update(int64)
}
type metricHistogram struct{ metrics.Histogram }
func (m *metricHistogram) Capture() Histogram { return &metricHistogram{m.Snapshot()} }
func newMetricHistogram() Histogram {
return &metricHistogram{metrics.NewHistogram(metrics.NewUniformSample(1000))}
}
// Latency. Default implementation uses Histogram as its base.
type Latency interface {
Capture() Latency
Time() Latency //returns self.
Stop()
GetPrecision() time.Duration
Precision(time.Duration) Latency //returns self.
}
type metricLatency struct {
metrics.Histogram
start time.Time
precision time.Duration
mutex sync.RWMutex
}
func (l *metricLatency) Time() Latency {
l.mutex.Lock()
defer l.mutex.Unlock()
l.start = Time.Now()
return l
}
func (l *metricLatency) Stop() {
l.mutex.Lock()
defer l.mutex.Unlock()
l.Update(Time.Since(l.start).Nanoseconds())
}
func (l *metricLatency) Capture() Latency {
l.mutex.Lock()
defer l.mutex.Unlock()
return &metricLatency{l.Histogram.Snapshot(), l.start, l.precision, sync.RWMutex{}}
}
func (l *metricLatency) GetPrecision() time.Duration {
l.mutex.RLock()
defer l.mutex.RUnlock()
return l.precision
}
func (l *metricLatency) Precision(p time.Duration) Latency {
l.mutex.Lock()
defer l.mutex.Unlock()
if p < 1 {
p = 1
}
l.precision = p
return l
}
func newLatency() Latency {
return &metricLatency{Histogram: metrics.NewHistogram(metrics.NewUniformSample(1000)), precision: time.Nanosecond, mutex: sync.RWMutex{}}
}
type nilLatency struct{}
func (l *nilLatency) Time() Latency { return l }
func (l *nilLatency) Stop() {}
func (l *nilLatency) Capture() Latency { return l }
func (l *nilLatency) GetPrecision() time.Duration { return 0 }
func (l *nilLatency) Precision(time.Duration) Latency { return l }
func newNilLatency() Latency { return &nilLatency{} }
//
// Twitter/Finagle style metrics
//
type finagleStatsRegistry struct {
metrics.Registry
}
func NewFinagleStatsRegistry() StatsRegistry {
return &finagleStatsRegistry{metrics.NewRegistry()}
}
type jsonMap map[string]interface{}
// MarshalJSON returns a byte slice containing a JSON representation of all
// the metrics in the Registry.
func (r *finagleStatsRegistry) MarshalJSON() ([]byte, error) {
return json.Marshal(r.MarshalAll())
}
func (r *finagleStatsRegistry) MarshalJSONPretty() ([]byte, error) {
return json.MarshalIndent(r.MarshalAll(), "", " ")
}
func (r *finagleStatsRegistry) MarshalAll() jsonMap {
data := make(map[string]interface{}, 0)
r.Each(func(name string, i interface{}) {
switch stat := i.(type) {
case Counter:
data[name] = stat.Count()
case Gauge:
data[name] = stat.Value()
case GaugeFloat:
data[name] = stat.Value()
case Histogram:
h := stat.Capture()
r.marshalHistogram(data, name, h, time.Nanosecond)
case Latency:
l := stat.Capture()
r.marshalHistogram(data, name, l.(HistogramView), l.GetPrecision())
default:
log.Info("Unrecognized marshal instrument: ", name, i)
}
})
return data
}
func (r *finagleStatsRegistry) marshalHistogram(
data jsonMap,
name string,
hist HistogramView,
precision time.Duration,
) {
f64p := float64(precision)
i64p := int64(precision)
data[name+".avg"] = hist.Mean() / f64p
data[name+".count"] = hist.Count()
data[name+".max"] = hist.Max() / i64p
data[name+".min"] = hist.Min() / i64p
data[name+".sum"] = hist.Sum() / i64p
pctls := hist.Percentiles(defaultPercentiles)
for i, pctl := range pctls {
data[name+"."+defaultPercentileLabels[i]] = pctl / f64p
}
}
var defaultPercentiles = []float64{0.5, 0.9, 0.95, 0.99, 0.999, 0.9999}
var defaultPercentileLabels = []string{"p50", "p90", "p95", "p99", "p999", "p9999"}
func StartUptimeReporting(stat StatsReceiver, statName string, serverStartGaugeName string, startupGaugeSpikeLen time.Duration) {
ReportServerRestart(stat, serverStartGaugeName, startupGaugeSpikeLen)
startTime := time.Now()
ticker := time.NewTicker(time.Duration(StatReportIntvl))
for {
select {
case <-ticker.C:
upTime := time.Now().Sub(startTime) / time.Millisecond
stat.Gauge(statName).Update(int64(upTime))
}
}
}
func ReportServerRestart(stat StatsReceiver, statName string, startupGaugeSpikeLen time.Duration) {
stat.Gauge(statName).Update(int64(1))
go func() {
time.Sleep(startupGaugeSpikeLen)
stat.Gauge(statName).Update(0)
}()
}