// Copyright 2017, OpenCensus Authors // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. // package view import ( "math" "time" "go.opencensus.io/metric/metricdata" ) // AggregationData represents an aggregated value from a collection. // They are reported on the view data during exporting. // Mosts users won't directly access aggregration data. type AggregationData interface { isAggregationData() bool addSample(v float64, attachments map[string]interface{}, t time.Time) clone() AggregationData equal(other AggregationData) bool toPoint(t metricdata.Type, time time.Time) metricdata.Point StartTime() time.Time } const epsilon = 1e-9 // CountData is the aggregated data for the Count aggregation. // A count aggregation processes data and counts the recordings. // // Most users won't directly access count data. type CountData struct { Start time.Time Value int64 } func (a *CountData) isAggregationData() bool { return true } func (a *CountData) addSample(_ float64, _ map[string]interface{}, _ time.Time) { a.Value = a.Value + 1 } func (a *CountData) clone() AggregationData { return &CountData{Value: a.Value, Start: a.Start} } func (a *CountData) equal(other AggregationData) bool { a2, ok := other.(*CountData) if !ok { return false } return a.Start.Equal(a2.Start) && a.Value == a2.Value } func (a *CountData) toPoint(metricType metricdata.Type, t time.Time) metricdata.Point { switch metricType { case metricdata.TypeCumulativeInt64: return metricdata.NewInt64Point(t, a.Value) default: panic("unsupported metricdata.Type") } } // StartTime returns the start time of the data being aggregated by CountData. func (a *CountData) StartTime() time.Time { return a.Start } // SumData is the aggregated data for the Sum aggregation. // A sum aggregation processes data and sums up the recordings. // // Most users won't directly access sum data. type SumData struct { Start time.Time Value float64 } func (a *SumData) isAggregationData() bool { return true } func (a *SumData) addSample(v float64, _ map[string]interface{}, _ time.Time) { a.Value += v } func (a *SumData) clone() AggregationData { return &SumData{Value: a.Value, Start: a.Start} } func (a *SumData) equal(other AggregationData) bool { a2, ok := other.(*SumData) if !ok { return false } return a.Start.Equal(a2.Start) && math.Pow(a.Value-a2.Value, 2) < epsilon } func (a *SumData) toPoint(metricType metricdata.Type, t time.Time) metricdata.Point { switch metricType { case metricdata.TypeCumulativeInt64: return metricdata.NewInt64Point(t, int64(a.Value)) case metricdata.TypeCumulativeFloat64: return metricdata.NewFloat64Point(t, a.Value) default: panic("unsupported metricdata.Type") } } // StartTime returns the start time of the data being aggregated by SumData. func (a *SumData) StartTime() time.Time { return a.Start } // DistributionData is the aggregated data for the // Distribution aggregation. // // Most users won't directly access distribution data. // // For a distribution with N bounds, the associated DistributionData will have // N+1 buckets. type DistributionData struct { Count int64 // number of data points aggregated Min float64 // minimum value in the distribution Max float64 // max value in the distribution Mean float64 // mean of the distribution SumOfSquaredDev float64 // sum of the squared deviation from the mean CountPerBucket []int64 // number of occurrences per bucket // ExemplarsPerBucket is slice the same length as CountPerBucket containing // an exemplar for the associated bucket, or nil. ExemplarsPerBucket []*metricdata.Exemplar bounds []float64 // histogram distribution of the values Start time.Time } func newDistributionData(agg *Aggregation, t time.Time) *DistributionData { bucketCount := len(agg.Buckets) + 1 return &DistributionData{ CountPerBucket: make([]int64, bucketCount), ExemplarsPerBucket: make([]*metricdata.Exemplar, bucketCount), bounds: agg.Buckets, Min: math.MaxFloat64, Max: math.SmallestNonzeroFloat64, Start: t, } } // Sum returns the sum of all samples collected. func (a *DistributionData) Sum() float64 { return a.Mean * float64(a.Count) } func (a *DistributionData) variance() float64 { if a.Count <= 1 { return 0 } return a.SumOfSquaredDev / float64(a.Count-1) } func (a *DistributionData) isAggregationData() bool { return true } // TODO(songy23): support exemplar attachments. func (a *DistributionData) addSample(v float64, attachments map[string]interface{}, t time.Time) { if v < a.Min { a.Min = v } if v > a.Max { a.Max = v } a.Count++ a.addToBucket(v, attachments, t) if a.Count == 1 { a.Mean = v return } oldMean := a.Mean a.Mean = a.Mean + (v-a.Mean)/float64(a.Count) a.SumOfSquaredDev = a.SumOfSquaredDev + (v-oldMean)*(v-a.Mean) } func (a *DistributionData) addToBucket(v float64, attachments map[string]interface{}, t time.Time) { var count *int64 var i int var b float64 for i, b = range a.bounds { if v < b { count = &a.CountPerBucket[i] break } } if count == nil { // Last bucket. i = len(a.bounds) count = &a.CountPerBucket[i] } *count++ if exemplar := getExemplar(v, attachments, t); exemplar != nil { a.ExemplarsPerBucket[i] = exemplar } } func getExemplar(v float64, attachments map[string]interface{}, t time.Time) *metricdata.Exemplar { if len(attachments) == 0 { return nil } return &metricdata.Exemplar{ Value: v, Timestamp: t, Attachments: attachments, } } func (a *DistributionData) clone() AggregationData { c := *a c.CountPerBucket = append([]int64(nil), a.CountPerBucket...) c.ExemplarsPerBucket = append([]*metricdata.Exemplar(nil), a.ExemplarsPerBucket...) return &c } func (a *DistributionData) equal(other AggregationData) bool { a2, ok := other.(*DistributionData) if !ok { return false } if a2 == nil { return false } if len(a.CountPerBucket) != len(a2.CountPerBucket) { return false } for i := range a.CountPerBucket { if a.CountPerBucket[i] != a2.CountPerBucket[i] { return false } } return a.Start.Equal(a2.Start) && a.Count == a2.Count && a.Min == a2.Min && a.Max == a2.Max && math.Pow(a.Mean-a2.Mean, 2) < epsilon && math.Pow(a.variance()-a2.variance(), 2) < epsilon } func (a *DistributionData) toPoint(metricType metricdata.Type, t time.Time) metricdata.Point { switch metricType { case metricdata.TypeCumulativeDistribution: buckets := []metricdata.Bucket{} for i := 0; i < len(a.CountPerBucket); i++ { buckets = append(buckets, metricdata.Bucket{ Count: a.CountPerBucket[i], Exemplar: a.ExemplarsPerBucket[i], }) } bucketOptions := &metricdata.BucketOptions{Bounds: a.bounds} val := &metricdata.Distribution{ Count: a.Count, Sum: a.Sum(), SumOfSquaredDeviation: a.SumOfSquaredDev, BucketOptions: bucketOptions, Buckets: buckets, } return metricdata.NewDistributionPoint(t, val) default: // TODO: [rghetia] when we have a use case for TypeGaugeDistribution. panic("unsupported metricdata.Type") } } // StartTime returns the start time of the data being aggregated by DistributionData. func (a *DistributionData) StartTime() time.Time { return a.Start } // LastValueData returns the last value recorded for LastValue aggregation. type LastValueData struct { Value float64 } func (l *LastValueData) isAggregationData() bool { return true } func (l *LastValueData) addSample(v float64, _ map[string]interface{}, _ time.Time) { l.Value = v } func (l *LastValueData) clone() AggregationData { return &LastValueData{l.Value} } func (l *LastValueData) equal(other AggregationData) bool { a2, ok := other.(*LastValueData) if !ok { return false } return l.Value == a2.Value } func (l *LastValueData) toPoint(metricType metricdata.Type, t time.Time) metricdata.Point { switch metricType { case metricdata.TypeGaugeInt64: return metricdata.NewInt64Point(t, int64(l.Value)) case metricdata.TypeGaugeFloat64: return metricdata.NewFloat64Point(t, l.Value) default: panic("unsupported metricdata.Type") } } // StartTime returns an empty time value as start time is not recorded when using last value // aggregation. func (l *LastValueData) StartTime() time.Time { return time.Time{} } // ClearStart clears the Start field from data if present. Useful for testing in cases where the // start time will be nondeterministic. func ClearStart(data AggregationData) { switch data := data.(type) { case *CountData: data.Start = time.Time{} case *SumData: data.Start = time.Time{} case *DistributionData: data.Start = time.Time{} } }