/* * * Copyright 2020 gRPC authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. * */ package xdsclient import ( "context" "testing" "time" "github.com/google/go-cmp/cmp" "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/credentials/insecure" "google.golang.org/grpc/status" "google.golang.org/grpc/xds/internal/testutils/fakeserver" "google.golang.org/grpc/xds/internal/xdsclient/bootstrap" "google.golang.org/grpc/xds/internal/xdsclient/xdsresource/version" "google.golang.org/protobuf/testing/protocmp" v3corepb "github.com/envoyproxy/go-control-plane/envoy/config/core/v3" v3endpointpb "github.com/envoyproxy/go-control-plane/envoy/config/endpoint/v3" v3lrspb "github.com/envoyproxy/go-control-plane/envoy/service/load_stats/v3" durationpb "github.com/golang/protobuf/ptypes/duration" ) const ( defaultClientWatchExpiryTimeout = 15 * time.Second ) func (s) TestLRSClient(t *testing.T) { fs, sCleanup, err := fakeserver.StartServer() if err != nil { t.Fatalf("failed to start fake xDS server: %v", err) } defer sCleanup() xdsC, close, err := NewWithConfigForTesting(&bootstrap.Config{ XDSServer: &bootstrap.ServerConfig{ ServerURI: fs.Address, Creds: grpc.WithTransportCredentials(insecure.NewCredentials()), TransportAPI: version.TransportV3, NodeProto: &v3corepb.Node{}, }, }, defaultClientWatchExpiryTimeout, time.Duration(0)) if err != nil { t.Fatalf("failed to create xds client: %v", err) } defer close() // Report to the same address should not create new ClientConn. store1, lrsCancel1 := xdsC.ReportLoad( &bootstrap.ServerConfig{ ServerURI: fs.Address, Creds: grpc.WithTransportCredentials(insecure.NewCredentials()), CredsType: "insecure", TransportAPI: version.TransportV3, NodeProto: &v3corepb.Node{}, }, ) defer lrsCancel1() ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer cancel() if u, err := fs.NewConnChan.Receive(ctx); err != nil { t.Errorf("unexpected timeout: %v, %v, want NewConn", u, err) } sCtx, sCancel := context.WithTimeout(context.Background(), defaultTestShortTimeout) defer sCancel() if u, err := fs.NewConnChan.Receive(sCtx); err != context.DeadlineExceeded { t.Errorf("unexpected NewConn: %v, %v, want channel recv timeout", u, err) } fs2, sCleanup2, err := fakeserver.StartServer() if err != nil { t.Fatalf("failed to start fake xDS server: %v", err) } defer sCleanup2() // Report to a different address should create new ClientConn. store2, lrsCancel2 := xdsC.ReportLoad( &bootstrap.ServerConfig{ ServerURI: fs2.Address, Creds: grpc.WithTransportCredentials(insecure.NewCredentials()), CredsType: "insecure", TransportAPI: version.TransportV2, NodeProto: &v3corepb.Node{}, }, ) defer lrsCancel2() if u, err := fs2.NewConnChan.Receive(ctx); err != nil { t.Errorf("unexpected timeout: %v, %v, want NewConn", u, err) } if store1 == store2 { t.Fatalf("got same store for different servers, want different") } if u, err := fs2.LRSRequestChan.Receive(ctx); err != nil { t.Errorf("unexpected timeout: %v, %v, want NewConn", u, err) } store2.PerCluster("cluster", "eds").CallDropped("test") // Send one resp to the client. fs2.LRSResponseChan <- &fakeserver.Response{ Resp: &v3lrspb.LoadStatsResponse{ SendAllClusters: true, LoadReportingInterval: &durationpb.Duration{Nanos: 50000000}, }, } // Server should receive a req with the loads. u, err := fs2.LRSRequestChan.Receive(ctx) if err != nil { t.Fatalf("unexpected LRS request: %v, %v, want error canceled", u, err) } receivedLoad := u.(*fakeserver.Request).Req.(*v3lrspb.LoadStatsRequest).ClusterStats if len(receivedLoad) <= 0 { t.Fatalf("unexpected load received, want load for cluster, eds, dropped for test") } receivedLoad[0].LoadReportInterval = nil want := &v3endpointpb.ClusterStats{ ClusterName: "cluster", ClusterServiceName: "eds", TotalDroppedRequests: 1, DroppedRequests: []*v3endpointpb.ClusterStats_DroppedRequests{{Category: "test", DroppedCount: 1}}, } if d := cmp.Diff(want, receivedLoad[0], protocmp.Transform()); d != "" { t.Fatalf("unexpected load received, want load for cluster, eds, dropped for test, diff (-want +got):\n%s", d) } // Cancel this load reporting stream, server should see error canceled. lrsCancel2() // Server should receive a stream canceled error. if u, err := fs2.LRSRequestChan.Receive(ctx); err != nil || status.Code(u.(*fakeserver.Request).Err) != codes.Canceled { t.Errorf("unexpected LRS request: %v, %v, want error canceled", u, err) } }