/* * * Copyright 2019 gRPC authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. * */ package buffer import ( "fmt" "sync" "testing" "time" "google.golang.org/grpc/internal/grpctest" ) type s struct { grpctest.Tester } func Test(t *testing.T) { grpctest.RunSubTests(t, s{}) } func (s) TestCircularBufferSerial(t *testing.T) { var size, i uint32 var result []interface{} size = 1 << 15 cb, err := NewCircularBuffer(size) if err != nil { t.Fatalf("error allocating CircularBuffer: %v", err) } for i = 0; i < size/2; i++ { cb.Push(i) } result = cb.Drain() if uint32(len(result)) != size/2 { t.Fatalf("len(result) = %d; want %d", len(result), size/2) } // The returned result isn't necessarily sorted. seen := make(map[uint32]bool) for _, r := range result { seen[r.(uint32)] = true } for i = 0; i < uint32(len(result)); i++ { if !seen[i] { t.Fatalf("seen[%d] = false; want true", i) } } for i = 0; i < size; i++ { cb.Push(i) } result = cb.Drain() if uint32(len(result)) != size { t.Fatalf("len(result) = %d; want %d", len(result), size/2) } } func (s) TestCircularBufferOverflow(t *testing.T) { var size, i uint32 var result []interface{} size = 1 << 10 cb, err := NewCircularBuffer(size) if err != nil { t.Fatalf("error allocating CircularBuffer: %v", err) } for i = 0; i < 10*size; i++ { cb.Push(i) } result = cb.Drain() if uint32(len(result)) != size { t.Fatalf("len(result) = %d; want %d", len(result), size) } for idx, x := range result { if x.(uint32) < size { t.Fatalf("result[%d] = %d; want it to be >= %d", idx, x, size) } } } func (s) TestCircularBufferConcurrent(t *testing.T) { for tn := 0; tn < 2; tn++ { var size uint32 var result []interface{} size = 1 << 6 cb, err := NewCircularBuffer(size) if err != nil { t.Fatalf("error allocating CircularBuffer: %v", err) } type item struct { R uint32 N uint32 T time.Time } var wg sync.WaitGroup for r := uint32(0); r < 1024; r++ { wg.Add(1) go func(r uint32) { for n := uint32(0); n < size; n++ { cb.Push(item{R: r, N: n, T: time.Now()}) } wg.Done() }(r) } // Wait for all goroutines to finish only in one test. Draining // concurrently while Pushes are still happening will test for races in the // Draining lock. if tn == 0 { wg.Wait() } result = cb.Drain() // Can't expect the buffer to be full if the Pushes aren't necessarily done. if tn == 0 { if uint32(len(result)) != size { t.Fatalf("len(result) = %d; want %d", len(result), size) } } // There can be absolutely no expectation on the order of the data returned // by Drain because: (a) everything is happening concurrently (b) a // round-robin is used to write to different queues (and therefore // different cachelines) for less write contention. // Wait for all goroutines to complete before moving on to other tests. If // the benchmarks run after this, it might affect performance unfairly. wg.Wait() } } func BenchmarkCircularBuffer(b *testing.B) { x := 1 for size := 1 << 16; size <= 1<<20; size <<= 1 { for routines := 1; routines <= 1<<8; routines <<= 1 { b.Run(fmt.Sprintf("goroutines:%d/size:%d", routines, size), func(b *testing.B) { cb, err := NewCircularBuffer(uint32(size)) if err != nil { b.Fatalf("error allocating CircularBuffer: %v", err) } perRoutine := b.N / routines var wg sync.WaitGroup for r := 0; r < routines; r++ { wg.Add(1) go func() { for i := 0; i < perRoutine; i++ { cb.Push(&x) } wg.Done() }() } wg.Wait() }) } } }