/* * Copyright 2022 gRPC authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package orca_test import ( "context" "fmt" "sync" "testing" "time" "github.com/golang/protobuf/proto" "google.golang.org/grpc" "google.golang.org/grpc/balancer" "google.golang.org/grpc/balancer/roundrobin" "google.golang.org/grpc/codes" "google.golang.org/grpc/credentials/insecure" "google.golang.org/grpc/internal/grpctest" "google.golang.org/grpc/internal/testutils" "google.golang.org/grpc/orca" "google.golang.org/grpc/orca/internal" "google.golang.org/grpc/resolver" "google.golang.org/grpc/resolver/manual" "google.golang.org/grpc/status" v3orcapb "github.com/cncf/xds/go/xds/data/orca/v3" v3orcaservicegrpc "github.com/cncf/xds/go/xds/service/orca/v3" v3orcaservicepb "github.com/cncf/xds/go/xds/service/orca/v3" ) // customLBB wraps a round robin LB policy but provides a ClientConn wrapper to // add an ORCA OOB report producer for all created SubConns. type customLBB struct{} func (customLBB) Build(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.Balancer { return balancer.Get(roundrobin.Name).Build(&ccWrapper{ClientConn: cc}, opts) } func (customLBB) Name() string { return "customLB" } func init() { balancer.Register(customLBB{}) } type ccWrapper struct { balancer.ClientConn } func (w *ccWrapper) NewSubConn(addrs []resolver.Address, opts balancer.NewSubConnOptions) (balancer.SubConn, error) { if len(addrs) != 1 { panic(fmt.Sprintf("got addrs=%v; want len(addrs) == 1", addrs)) } sc, err := w.ClientConn.NewSubConn(addrs, opts) if err != nil { return sc, err } l := getListenerInfo(addrs[0]) l.listener.cleanup = orca.RegisterOOBListener(sc, l.listener, l.opts) l.sc = sc return sc, nil } // listenerInfo is stored in an address's attributes to allow ORCA // listeners to be registered on subconns created for that address. type listenerInfo struct { listener *testOOBListener opts orca.OOBListenerOptions sc balancer.SubConn // Set by the LB policy } type listenerInfoKey struct{} func setListenerInfo(addr resolver.Address, l *listenerInfo) resolver.Address { addr.Attributes = addr.Attributes.WithValue(listenerInfoKey{}, l) return addr } func getListenerInfo(addr resolver.Address) *listenerInfo { return addr.Attributes.Value(listenerInfoKey{}).(*listenerInfo) } // testOOBListener is a simple listener that pushes load reports to a channel. type testOOBListener struct { cleanup func() loadReportCh chan *v3orcapb.OrcaLoadReport } func newTestOOBListener() *testOOBListener { return &testOOBListener{cleanup: func() {}, loadReportCh: make(chan *v3orcapb.OrcaLoadReport)} } func (t *testOOBListener) Stop() { t.cleanup() } func (t *testOOBListener) OnLoadReport(r *v3orcapb.OrcaLoadReport) { t.loadReportCh <- r } // TestProducer is a basic, end-to-end style test of an LB policy with an // OOBListener communicating with a server with an ORCA service. func (s) TestProducer(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer cancel() // Use a fixed backoff for stream recreation. oldBackoff := internal.DefaultBackoffFunc internal.DefaultBackoffFunc = func(int) time.Duration { return 10 * time.Millisecond } defer func() { internal.DefaultBackoffFunc = oldBackoff }() // Initialize listener for our ORCA server. lis, err := testutils.LocalTCPListener() if err != nil { t.Fatal(err) } // Register the OpenRCAService with a very short metrics reporting interval. const shortReportingInterval = 50 * time.Millisecond opts := orca.ServiceOptions{MinReportingInterval: shortReportingInterval} internal.AllowAnyMinReportingInterval.(func(*orca.ServiceOptions))(&opts) s := grpc.NewServer() orcaSrv, err := orca.Register(s, opts) if err != nil { t.Fatalf("orca.Register failed: %v", err) } go s.Serve(lis) defer s.Stop() // Create our client with an OOB listener in the LB policy it selects. r := manual.NewBuilderWithScheme("whatever") oobLis := newTestOOBListener() lisOpts := orca.OOBListenerOptions{ReportInterval: 50 * time.Millisecond} li := &listenerInfo{listener: oobLis, opts: lisOpts} addr := setListenerInfo(resolver.Address{Addr: lis.Addr().String()}, li) r.InitialState(resolver.State{Addresses: []resolver.Address{addr}}) cc, err := grpc.Dial("whatever:///whatever", grpc.WithDefaultServiceConfig(`{"loadBalancingConfig": [{"customLB":{}}]}`), grpc.WithResolvers(r), grpc.WithTransportCredentials(insecure.NewCredentials())) if err != nil { t.Fatalf("grpc.Dial failed: %v", err) } defer cc.Close() // Ensure the OOB listener is stopped before the client is closed to avoid // a potential irrelevant error in the logs. defer oobLis.Stop() // Set a few metrics and wait for them on the client side. orcaSrv.SetCPUUtilization(10) orcaSrv.SetMemoryUtilization(100) orcaSrv.SetUtilization("bob", 555) loadReportWant := &v3orcapb.OrcaLoadReport{ CpuUtilization: 10, MemUtilization: 100, Utilization: map[string]float64{"bob": 555}, } testReport: for { select { case r := <-oobLis.loadReportCh: t.Log("Load report received: ", r) if proto.Equal(r, loadReportWant) { // Success! break testReport } case <-ctx.Done(): t.Fatalf("timed out waiting for load report: %v", loadReportWant) } } // Change and add metrics and wait for them on the client side. orcaSrv.SetCPUUtilization(50) orcaSrv.SetMemoryUtilization(200) orcaSrv.SetUtilization("mary", 321) loadReportWant = &v3orcapb.OrcaLoadReport{ CpuUtilization: 50, MemUtilization: 200, Utilization: map[string]float64{"bob": 555, "mary": 321}, } for { select { case r := <-oobLis.loadReportCh: t.Log("Load report received: ", r) if proto.Equal(r, loadReportWant) { // Success! return } case <-ctx.Done(): t.Fatalf("timed out waiting for load report: %v", loadReportWant) } } } // fakeORCAService is a simple implementation of an ORCA service that pushes // requests it receives from clients to a channel and sends responses from a // channel back. This allows tests to verify the client is sending requests // and processing responses properly. type fakeORCAService struct { v3orcaservicegrpc.UnimplementedOpenRcaServiceServer reqCh chan *v3orcaservicepb.OrcaLoadReportRequest respCh chan interface{} // either *v3orcapb.OrcaLoadReport or error } func newFakeORCAService() *fakeORCAService { return &fakeORCAService{ reqCh: make(chan *v3orcaservicepb.OrcaLoadReportRequest), respCh: make(chan interface{}), } } func (f *fakeORCAService) close() { close(f.respCh) } func (f *fakeORCAService) StreamCoreMetrics(req *v3orcaservicepb.OrcaLoadReportRequest, stream v3orcaservicegrpc.OpenRcaService_StreamCoreMetricsServer) error { f.reqCh <- req for resp := range f.respCh { if err, ok := resp.(error); ok { return err } if err := stream.Send(resp.(*v3orcapb.OrcaLoadReport)); err != nil { // In the event that a stream error occurs, a new stream will have // been created that was waiting for this response message. Push // it back onto the channel and return. // // This happens because we range over respCh. If we changed to // instead select on respCh + stream.Context(), the same situation // could still occur due to a race between noticing the two events, // so such a workaround would still be needed to prevent flakiness. f.respCh <- resp return err } } return nil } // TestProducerBackoff verifies that the ORCA producer applies the proper // backoff after stream failures. func (s) TestProducerBackoff(t *testing.T) { grpctest.TLogger.ExpectErrorN("injected error", 4) ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer cancel() // Provide a convenient way to expect backoff calls and return a minimal // value. const backoffShouldNotBeCalled = 9999 // Use to assert backoff function is not called. const backoffAllowAny = -1 // Use to ignore any backoff calls. expectedBackoff := backoffAllowAny oldBackoff := internal.DefaultBackoffFunc internal.DefaultBackoffFunc = func(got int) time.Duration { if expectedBackoff == backoffShouldNotBeCalled { t.Errorf("Unexpected backoff call; parameter = %v", got) } else if expectedBackoff != backoffAllowAny { if got != expectedBackoff { t.Errorf("Unexpected backoff received; got %v want %v", got, expectedBackoff) } } return time.Millisecond } defer func() { internal.DefaultBackoffFunc = oldBackoff }() // Initialize listener for our ORCA server. lis, err := testutils.LocalTCPListener() if err != nil { t.Fatal(err) } // Register our fake ORCA service. s := grpc.NewServer() fake := newFakeORCAService() defer fake.close() v3orcaservicegrpc.RegisterOpenRcaServiceServer(s, fake) go s.Serve(lis) defer s.Stop() // Define the report interval and a function to wait for it to be sent to // the server. const reportInterval = 123 * time.Second awaitRequest := func(interval time.Duration) { select { case req := <-fake.reqCh: if got := req.GetReportInterval().AsDuration(); got != interval { t.Errorf("Unexpected report interval; got %v want %v", got, interval) } case <-ctx.Done(): t.Fatalf("Did not receive client request") } } // Create our client with an OOB listener in the LB policy it selects. r := manual.NewBuilderWithScheme("whatever") oobLis := newTestOOBListener() lisOpts := orca.OOBListenerOptions{ReportInterval: reportInterval} li := &listenerInfo{listener: oobLis, opts: lisOpts} r.InitialState(resolver.State{Addresses: []resolver.Address{setListenerInfo(resolver.Address{Addr: lis.Addr().String()}, li)}}) cc, err := grpc.Dial("whatever:///whatever", grpc.WithDefaultServiceConfig(`{"loadBalancingConfig": [{"customLB":{}}]}`), grpc.WithResolvers(r), grpc.WithTransportCredentials(insecure.NewCredentials())) if err != nil { t.Fatalf("grpc.Dial failed: %v", err) } defer cc.Close() // Ensure the OOB listener is stopped before the client is closed to avoid // a potential irrelevant error in the logs. defer oobLis.Stop() // Define a load report to send and expect the client to see. loadReportWant := &v3orcapb.OrcaLoadReport{ CpuUtilization: 10, MemUtilization: 100, Utilization: map[string]float64{"bob": 555}, } // Unblock the fake. awaitRequest(reportInterval) fake.respCh <- loadReportWant select { case r := <-oobLis.loadReportCh: t.Log("Load report received: ", r) if proto.Equal(r, loadReportWant) { // Success! break } case <-ctx.Done(): t.Fatalf("timed out waiting for load report: %v", loadReportWant) } // The next request should be immediate, since there was a message // received. expectedBackoff = backoffShouldNotBeCalled fake.respCh <- status.Errorf(codes.Internal, "injected error") awaitRequest(reportInterval) // The next requests will need to backoff. expectedBackoff = 0 fake.respCh <- status.Errorf(codes.Internal, "injected error") awaitRequest(reportInterval) expectedBackoff = 1 fake.respCh <- status.Errorf(codes.Internal, "injected error") awaitRequest(reportInterval) expectedBackoff = 2 fake.respCh <- status.Errorf(codes.Internal, "injected error") awaitRequest(reportInterval) // The next request should be immediate, since there was a message // received. expectedBackoff = backoffShouldNotBeCalled // Send another valid response and wait for it on the client. fake.respCh <- loadReportWant select { case r := <-oobLis.loadReportCh: t.Log("Load report received: ", r) if proto.Equal(r, loadReportWant) { // Success! break } case <-ctx.Done(): t.Fatalf("timed out waiting for load report: %v", loadReportWant) } } // TestProducerMultipleListeners tests that multiple listeners works as // expected in a producer: requesting the proper interval and delivering the // update to all listeners. func (s) TestProducerMultipleListeners(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer cancel() // Provide a convenient way to expect backoff calls and return a minimal // value. oldBackoff := internal.DefaultBackoffFunc internal.DefaultBackoffFunc = func(got int) time.Duration { return time.Millisecond } defer func() { internal.DefaultBackoffFunc = oldBackoff }() // Initialize listener for our ORCA server. lis, err := testutils.LocalTCPListener() if err != nil { t.Fatal(err) } // Register our fake ORCA service. s := grpc.NewServer() fake := newFakeORCAService() defer fake.close() v3orcaservicegrpc.RegisterOpenRcaServiceServer(s, fake) go s.Serve(lis) defer s.Stop() // Define the report interval and a function to wait for it to be sent to // the server. const reportInterval1 = 123 * time.Second const reportInterval2 = 234 * time.Second const reportInterval3 = 56 * time.Second awaitRequest := func(interval time.Duration) { select { case req := <-fake.reqCh: if got := req.GetReportInterval().AsDuration(); got != interval { t.Errorf("Unexpected report interval; got %v want %v", got, interval) } case <-ctx.Done(): t.Fatalf("Did not receive client request") } } // Create our client with an OOB listener in the LB policy it selects. r := manual.NewBuilderWithScheme("whatever") oobLis1 := newTestOOBListener() lisOpts1 := orca.OOBListenerOptions{ReportInterval: reportInterval1} li := &listenerInfo{listener: oobLis1, opts: lisOpts1} r.InitialState(resolver.State{Addresses: []resolver.Address{setListenerInfo(resolver.Address{Addr: lis.Addr().String()}, li)}}) cc, err := grpc.Dial("whatever:///whatever", grpc.WithDefaultServiceConfig(`{"loadBalancingConfig": [{"customLB":{}}]}`), grpc.WithResolvers(r), grpc.WithTransportCredentials(insecure.NewCredentials())) if err != nil { t.Fatalf("grpc.Dial failed: %v", err) } defer cc.Close() // Ensure the OOB listener is stopped before the client is closed to avoid // a potential irrelevant error in the logs. defer oobLis1.Stop() oobLis2 := newTestOOBListener() lisOpts2 := orca.OOBListenerOptions{ReportInterval: reportInterval2} oobLis3 := newTestOOBListener() lisOpts3 := orca.OOBListenerOptions{ReportInterval: reportInterval3} // Define a load report to send and expect the client to see. loadReportWant := &v3orcapb.OrcaLoadReport{ CpuUtilization: 10, MemUtilization: 100, Utilization: map[string]float64{"bob": 555}, } // Receive reports and update counts for the three listeners. var reportsMu sync.Mutex var reportsReceived1, reportsReceived2, reportsReceived3 int go func() { for { select { case r := <-oobLis1.loadReportCh: t.Log("Load report 1 received: ", r) if !proto.Equal(r, loadReportWant) { t.Errorf("Unexpected report received: %+v", r) } reportsMu.Lock() reportsReceived1++ reportsMu.Unlock() case r := <-oobLis2.loadReportCh: t.Log("Load report 2 received: ", r) if !proto.Equal(r, loadReportWant) { t.Errorf("Unexpected report received: %+v", r) } reportsMu.Lock() reportsReceived2++ reportsMu.Unlock() case r := <-oobLis3.loadReportCh: t.Log("Load report 3 received: ", r) if !proto.Equal(r, loadReportWant) { t.Errorf("Unexpected report received: %+v", r) } reportsMu.Lock() reportsReceived3++ reportsMu.Unlock() case <-ctx.Done(): // Test has ended; exit return } } }() // checkReports is a helper function to check the report counts for the three listeners. checkReports := func(r1, r2, r3 int) { t.Helper() for ctx.Err() == nil { reportsMu.Lock() if r1 == reportsReceived1 && r2 == reportsReceived2 && r3 == reportsReceived3 { // Success! reportsMu.Unlock() return } if reportsReceived1 > r1 || reportsReceived2 > r2 || reportsReceived3 > r3 { reportsMu.Unlock() t.Fatalf("received excess reports. got %v %v %v; want %v %v %v", reportsReceived1, reportsReceived2, reportsReceived3, r1, r2, r3) return } reportsMu.Unlock() time.Sleep(10 * time.Millisecond) } t.Fatalf("timed out waiting for reports received. got %v %v %v; want %v %v %v", reportsReceived1, reportsReceived2, reportsReceived3, r1, r2, r3) } // Only 1 listener; expect reportInterval1 to be used and expect the report // to be sent to the listener. awaitRequest(reportInterval1) fake.respCh <- loadReportWant checkReports(1, 0, 0) // Register listener 2 with a less frequent interval; no need to recreate // stream. Report should go to both listeners. oobLis2.cleanup = orca.RegisterOOBListener(li.sc, oobLis2, lisOpts2) fake.respCh <- loadReportWant checkReports(2, 1, 0) // Register listener 3 with a more frequent interval; stream is recreated // with this interval after the next report is received. The first report // will go to all three listeners. oobLis3.cleanup = orca.RegisterOOBListener(li.sc, oobLis3, lisOpts3) fake.respCh <- loadReportWant checkReports(3, 2, 1) awaitRequest(reportInterval3) // Another report without a change in listeners should go to all three listeners. fake.respCh <- loadReportWant checkReports(4, 3, 2) // Stop listener 2. This does not affect the interval as listener 3 is // still the shortest. The next update goes to listeners 1 and 3. oobLis2.Stop() fake.respCh <- loadReportWant checkReports(5, 3, 3) // Stop listener 3. This makes the interval longer, with stream recreation // delayed until the next report is received. Reports should only go to // listener 1 now. oobLis3.Stop() fake.respCh <- loadReportWant checkReports(6, 3, 3) awaitRequest(reportInterval1) // Another report without a change in listeners should go to the first listener. fake.respCh <- loadReportWant checkReports(7, 3, 3) }