// Copyright 2019, OpenCensus Authors // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. // package metricexport import ( "context" "sync" "testing" "time" "go.opencensus.io/metric" "go.opencensus.io/metric/metricdata" "go.opencensus.io/metric/metricproducer" ) var ( ir1 *IntervalReader ir2 *IntervalReader reader1 = NewReader(WithSpanName("test-export-span")) exporter1 = &metricExporter{} exporter2 = &metricExporter{} gaugeEntry *metric.Int64GaugeEntry duration1 = 1000 * time.Millisecond duration2 = 2000 * time.Millisecond ) type metricExporter struct { sync.Mutex metrics []*metricdata.Metric } func (e *metricExporter) ExportMetrics(ctx context.Context, metrics []*metricdata.Metric) error { e.Lock() defer e.Unlock() e.metrics = append(e.metrics, metrics...) return nil } func init() { r := metric.NewRegistry() metricproducer.GlobalManager().AddProducer(r) g, _ := r.AddInt64Gauge("active_request", metric.WithDescription("Number of active requests, per method."), metric.WithUnit(metricdata.UnitDimensionless), metric.WithLabelKeys("method")) gaugeEntry, _ = g.GetEntry(metricdata.NewLabelValue("foo")) } func TestNewReaderWitDefaultOptions(t *testing.T) { r := NewReader() if r.spanName != defaultSpanName { t.Errorf("span name: got %v, want %v\n", r.spanName, defaultSpanName) } } func TestNewReaderWitSpanName(t *testing.T) { spanName := "test-span" r := NewReader(WithSpanName(spanName)) if r.spanName != spanName { t.Errorf("span name: got %+v, want %v\n", r.spanName, spanName) } } func TestNewReader(t *testing.T) { r := NewReader() gaugeEntry.Add(1) r.ReadAndExport(exporter1) checkExportedCount(exporter1, 1, t) checkExportedMetricDesc(exporter1, "active_request", t) resetExporter(exporter1) } func TestNewIntervalReader(t *testing.T) { ir1 = createAndStart(exporter1, duration1, t) gaugeEntry.Add(1) time.Sleep(1500 * time.Millisecond) checkExportedCount(exporter1, 1, t) checkExportedMetricDesc(exporter1, "active_request", t) ir1.Stop() resetExporter(exporter1) } func TestManualReadForIntervalReader(t *testing.T) { ir1 = createAndStart(exporter1, duration1, t) gaugeEntry.Set(1) reader1.ReadAndExport(exporter1) gaugeEntry.Set(4) time.Sleep(1500 * time.Millisecond) checkExportedCount(exporter1, 2, t) checkExportedValues(exporter1, []int64{1, 4}, t) // one for manual read other for time based. checkExportedMetricDesc(exporter1, "active_request", t) ir1.Stop() resetExporter(exporter1) } func TestFlushNoOpForIntervalReader(t *testing.T) { ir1 = createAndStart(exporter1, duration1, t) gaugeEntry.Set(1) // since IR is not stopped, flush does nothing ir1.Flush() // expect no data points checkExportedCount(exporter1, 0, t) checkExportedMetricDesc(exporter1, "active_request", t) ir1.Stop() resetExporter(exporter1) } func TestFlushAllowMultipleForIntervalReader(t *testing.T) { ir1 = createAndStart(exporter1, duration1, t) gaugeEntry.Set(1) ir1.Stop() ir1.Flush() // metric is still coming in gaugeEntry.Add(1) // one more flush after IR stopped ir1.Flush() // expect 2 data point, one from each flush checkExportedCount(exporter1, 2, t) checkExportedValues(exporter1, []int64{1, 2}, t) checkExportedMetricDesc(exporter1, "active_request", t) resetExporter(exporter1) } func TestFlushRestartForIntervalReader(t *testing.T) { ir1 = createAndStart(exporter1, duration1, t) gaugeEntry.Set(1) ir1.Stop() ir1.Flush() // restart the IR err := ir1.Start() if err != nil { t.Fatalf("error starting reader %v\n", err) } gaugeEntry.Add(1) ir1.Stop() ir1.Flush() // expect 2 data point, one from each flush checkExportedCount(exporter1, 2, t) checkExportedValues(exporter1, []int64{1, 2}, t) checkExportedMetricDesc(exporter1, "active_request", t) resetExporter(exporter1) } func TestProducerWithIntervalReaderStop(t *testing.T) { ir1 = createAndStart(exporter1, duration1, t) ir1.Stop() gaugeEntry.Add(1) time.Sleep(1500 * time.Millisecond) checkExportedCount(exporter1, 0, t) checkExportedMetricDesc(exporter1, "active_request", t) resetExporter(exporter1) } func TestProducerWithMultipleIntervalReaders(t *testing.T) { ir1 = createAndStart(exporter1, duration1, t) ir2 = createAndStart(exporter2, duration2, t) gaugeEntry.Add(1) time.Sleep(2500 * time.Millisecond) checkExportedCount(exporter1, 2, t) checkExportedMetricDesc(exporter1, "active_request", t) checkExportedCount(exporter2, 1, t) checkExportedMetricDesc(exporter2, "active_request", t) ir1.Stop() ir2.Stop() resetExporter(exporter1) resetExporter(exporter1) } func TestIntervalReaderMultipleStop(t *testing.T) { ir1 = createAndStart(exporter1, duration1, t) stop := make(chan bool, 1) go func() { ir1.Stop() ir1.Stop() stop <- true }() select { case _ = <-stop: case <-time.After(1 * time.Second): t.Fatalf("ir1 stop got blocked") } } func TestIntervalReaderMultipleStart(t *testing.T) { ir1 = createAndStart(exporter1, duration1, t) err := ir1.Start() if err == nil { t.Fatalf("expected error but got nil\n") } gaugeEntry.Add(1) time.Sleep(1500 * time.Millisecond) checkExportedCount(exporter1, 1, t) checkExportedMetricDesc(exporter1, "active_request", t) ir1.Stop() resetExporter(exporter1) } func TestNewIntervalReaderWithNilReader(t *testing.T) { _, err := NewIntervalReader(nil, exporter1) if err == nil { t.Fatalf("expected error but got nil\n") } } func TestNewIntervalReaderWithNilExporter(t *testing.T) { _, err := NewIntervalReader(reader1, nil) if err == nil { t.Fatalf("expected error but got nil\n") } } func TestNewIntervalReaderStartWithInvalidInterval(t *testing.T) { ir, err := NewIntervalReader(reader1, exporter1) ir.ReportingInterval = 500 * time.Millisecond err = ir.Start() if err == nil { t.Fatalf("expected error but got nil\n") } } func checkExportedCount(exporter *metricExporter, wantCount int, t *testing.T) { exporter.Lock() defer exporter.Unlock() gotCount := len(exporter.metrics) if gotCount != wantCount { t.Fatalf("exported metric count: got %d, want %d\n", gotCount, wantCount) } } func checkExportedValues(exporter *metricExporter, wantValues []int64, t *testing.T) { exporter.Lock() defer exporter.Unlock() gotCount := len(exporter.metrics) wantCount := len(wantValues) if gotCount != wantCount { t.Errorf("exported metric count: got %d, want %d\n", gotCount, wantCount) return } for i, wantValue := range wantValues { var gotValue int64 switch v := exporter.metrics[i].TimeSeries[0].Points[0].Value.(type) { case int64: gotValue = v default: t.Errorf("expected float64 value but found other %T", exporter.metrics[i].TimeSeries[0].Points[0].Value) } if gotValue != wantValue { t.Errorf("values idx %d, got: %v, want %v", i, gotValue, wantValue) } } } func checkExportedMetricDesc(exporter *metricExporter, wantMdName string, t *testing.T) { exporter.Lock() defer exporter.Unlock() for _, metric := range exporter.metrics { gotMdName := metric.Descriptor.Name if gotMdName != wantMdName { t.Errorf("got %s, want %s\n", gotMdName, wantMdName) } } exporter.metrics = nil } func resetExporter(exporter *metricExporter) { exporter.Lock() defer exporter.Unlock() exporter.metrics = nil } // createAndStart stops the current processors and creates a new one. func createAndStart(exporter *metricExporter, d time.Duration, t *testing.T) *IntervalReader { ir, _ := NewIntervalReader(reader1, exporter) ir.ReportingInterval = d err := ir.Start() if err != nil { t.Fatalf("error creating reader %v\n", err) } return ir }