// Package influx provides an InfluxDB implementation for metrics. The model is // similar to other push-based instrumentation systems. Observations are // aggregated locally and emitted to the Influx server on regular intervals. package influx import ( "time" influxdb "github.com/influxdata/influxdb/client/v2" "github.com/go-kit/kit/log" "github.com/go-kit/kit/metrics" "github.com/go-kit/kit/metrics/generic" "github.com/go-kit/kit/metrics/internal/lv" ) // Influx is a store for metrics that will be emitted to an Influx database. // // Influx is a general purpose time-series database, and has no native concepts // of counters, gauges, or histograms. Counters are modeled as a timeseries with // one data point per flush, with a "count" field that reflects all adds since // the last flush. Gauges are modeled as a timeseries with one data point per // flush, with a "value" field that reflects the current state of the gauge. // Histograms are modeled as a timeseries with one data point per combination of tags, // with a set of quantile fields that reflects the p50, p90, p95 & p99. // // Influx tags are attached to the Influx object, can be given to each // metric at construction and can be updated anytime via With function. Influx fields // are mapped to Go kit label values directly by this collector. Actual metric // values are provided as fields with specific names depending on the metric. // // All observations are collected in memory locally, and flushed on demand. type Influx struct { counters *lv.Space gauges *lv.Space histograms *lv.Space tags map[string]string conf influxdb.BatchPointsConfig logger log.Logger } // New returns an Influx, ready to create metrics and collect observations. Tags // are applied to all metrics created from this object. The BatchPointsConfig is // used during flushing. func New(tags map[string]string, conf influxdb.BatchPointsConfig, logger log.Logger) *Influx { return &Influx{ counters: lv.NewSpace(), gauges: lv.NewSpace(), histograms: lv.NewSpace(), tags: tags, conf: conf, logger: logger, } } // NewCounter returns an Influx counter. func (in *Influx) NewCounter(name string) *Counter { return &Counter{ name: name, obs: in.counters.Observe, } } // NewGauge returns an Influx gauge. func (in *Influx) NewGauge(name string) *Gauge { return &Gauge{ name: name, obs: in.gauges.Observe, add: in.gauges.Add, } } // NewHistogram returns an Influx histogram. func (in *Influx) NewHistogram(name string) *Histogram { return &Histogram{ name: name, obs: in.histograms.Observe, } } // BatchPointsWriter captures a subset of the influxdb.Client methods necessary // for emitting metrics observations. type BatchPointsWriter interface { Write(influxdb.BatchPoints) error } // WriteLoop is a helper method that invokes WriteTo to the passed writer every // time the passed channel fires. This method blocks until the channel is // closed, so clients probably want to run it in its own goroutine. For typical // usage, create a time.Ticker and pass its C channel to this method. func (in *Influx) WriteLoop(c <-chan time.Time, w BatchPointsWriter) { for range c { if err := in.WriteTo(w); err != nil { in.logger.Log("during", "WriteTo", "err", err) } } } // WriteTo flushes the buffered content of the metrics to the writer, in an // Influx BatchPoints format. WriteTo abides best-effort semantics, so // observations are lost if there is a problem with the write. Clients should be // sure to call WriteTo regularly, ideally through the WriteLoop helper method. func (in *Influx) WriteTo(w BatchPointsWriter) (err error) { bp, err := influxdb.NewBatchPoints(in.conf) if err != nil { return err } now := time.Now() in.counters.Reset().Walk(func(name string, lvs lv.LabelValues, values []float64) bool { tags := mergeTags(in.tags, lvs) var p *influxdb.Point fields := map[string]interface{}{"count": sum(values)} p, err = influxdb.NewPoint(name, tags, fields, now) if err != nil { return false } bp.AddPoint(p) return true }) if err != nil { return err } in.gauges.Reset().Walk(func(name string, lvs lv.LabelValues, values []float64) bool { tags := mergeTags(in.tags, lvs) var p *influxdb.Point fields := map[string]interface{}{"value": last(values)} p, err = influxdb.NewPoint(name, tags, fields, now) if err != nil { return false } bp.AddPoint(p) return true }) if err != nil { return err } in.histograms.Reset().Walk(func(name string, lvs lv.LabelValues, values []float64) bool { histogram := generic.NewHistogram(name, 50) tags := mergeTags(in.tags, lvs) var p *influxdb.Point for _, v := range values { histogram.Observe(v) } fields := map[string]interface{}{ "p50": histogram.Quantile(0.50), "p90": histogram.Quantile(0.90), "p95": histogram.Quantile(0.95), "p99": histogram.Quantile(0.99), } p, err = influxdb.NewPoint(name, tags, fields, now) if err != nil { return false } bp.AddPoint(p) return true }) if err != nil { return err } return w.Write(bp) } func mergeTags(tags map[string]string, labelValues []string) map[string]string { if len(labelValues)%2 != 0 { panic("mergeTags received a labelValues with an odd number of strings") } ret := make(map[string]string, len(tags)+len(labelValues)/2) for k, v := range tags { ret[k] = v } for i := 0; i < len(labelValues); i += 2 { ret[labelValues[i]] = labelValues[i+1] } return ret } func sum(a []float64) float64 { var v float64 for _, f := range a { v += f } return v } func last(a []float64) float64 { return a[len(a)-1] } type observeFunc func(name string, lvs lv.LabelValues, value float64) // Counter is an Influx counter. Observations are forwarded to an Influx // object, and aggregated (summed) per timeseries. type Counter struct { name string lvs lv.LabelValues obs observeFunc } // With implements metrics.Counter. func (c *Counter) With(labelValues ...string) metrics.Counter { return &Counter{ name: c.name, lvs: c.lvs.With(labelValues...), obs: c.obs, } } // Add implements metrics.Counter. func (c *Counter) Add(delta float64) { c.obs(c.name, c.lvs, delta) } // Gauge is an Influx gauge. Observations are forwarded to a Dogstatsd // object, and aggregated (the last observation selected) per timeseries. type Gauge struct { name string lvs lv.LabelValues obs observeFunc add observeFunc } // With implements metrics.Gauge. func (g *Gauge) With(labelValues ...string) metrics.Gauge { return &Gauge{ name: g.name, lvs: g.lvs.With(labelValues...), obs: g.obs, add: g.add, } } // Set implements metrics.Gauge. func (g *Gauge) Set(value float64) { g.obs(g.name, g.lvs, value) } // Add implements metrics.Gauge. func (g *Gauge) Add(delta float64) { g.add(g.name, g.lvs, delta) } // Histogram is an Influx histrogram. Observations are aggregated into a // generic.Histogram and emitted as per-quantile gauges to the Influx server. type Histogram struct { name string lvs lv.LabelValues obs observeFunc } // With implements metrics.Histogram. func (h *Histogram) With(labelValues ...string) metrics.Histogram { return &Histogram{ name: h.name, lvs: h.lvs.With(labelValues...), obs: h.obs, } } // Observe implements metrics.Histogram. func (h *Histogram) Observe(value float64) { h.obs(h.name, h.lvs, value) }