/* * * Copyright 2019 gRPC authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. * */ package v2 import ( "context" "errors" "reflect" "testing" "time" "github.com/golang/protobuf/proto" "github.com/google/go-cmp/cmp" "google.golang.org/grpc" "google.golang.org/grpc/internal/grpclog" "google.golang.org/grpc/internal/grpctest" "google.golang.org/grpc/internal/testutils" "google.golang.org/grpc/resolver" "google.golang.org/grpc/resolver/manual" xdsclient "google.golang.org/grpc/xds/internal/client" "google.golang.org/grpc/xds/internal/testutils/fakeserver" "google.golang.org/grpc/xds/internal/version" xdspb "github.com/envoyproxy/go-control-plane/envoy/api/v2" basepb "github.com/envoyproxy/go-control-plane/envoy/api/v2/core" routepb "github.com/envoyproxy/go-control-plane/envoy/api/v2/route" httppb "github.com/envoyproxy/go-control-plane/envoy/config/filter/network/http_connection_manager/v2" listenerpb "github.com/envoyproxy/go-control-plane/envoy/config/listener/v2" anypb "github.com/golang/protobuf/ptypes/any" structpb "github.com/golang/protobuf/ptypes/struct" ) type s struct { grpctest.Tester } func Test(t *testing.T) { grpctest.RunSubTests(t, s{}) } const ( goodLDSTarget1 = "lds.target.good:1111" goodLDSTarget2 = "lds.target.good:2222" goodRouteName1 = "GoodRouteConfig1" goodRouteName2 = "GoodRouteConfig2" goodEDSName = "GoodClusterAssignment1" uninterestingRouteName = "UninterestingRouteName" uninterestingDomain = "uninteresting.domain" goodClusterName1 = "GoodClusterName1" goodClusterName2 = "GoodClusterName2" uninterestingClusterName = "UninterestingClusterName" httpConnManagerURL = "type.googleapis.com/envoy.config.filter.network.http_connection_manager.v2.HttpConnectionManager" ) var ( goodNodeProto = &basepb.Node{ Id: "ENVOY_NODE_ID", Metadata: &structpb.Struct{ Fields: map[string]*structpb.Value{ "TRAFFICDIRECTOR_GRPC_HOSTNAME": { Kind: &structpb.Value_StringValue{StringValue: "trafficdirector"}, }, }, }, } goodLDSRequest = &xdspb.DiscoveryRequest{ Node: goodNodeProto, TypeUrl: version.V2ListenerURL, ResourceNames: []string{goodLDSTarget1}, } goodRDSRequest = &xdspb.DiscoveryRequest{ Node: goodNodeProto, TypeUrl: version.V2RouteConfigURL, ResourceNames: []string{goodRouteName1}, } goodCDSRequest = &xdspb.DiscoveryRequest{ Node: goodNodeProto, TypeUrl: version.V2ClusterURL, ResourceNames: []string{goodClusterName1}, } goodEDSRequest = &xdspb.DiscoveryRequest{ Node: goodNodeProto, TypeUrl: version.V2EndpointsURL, ResourceNames: []string{goodEDSName}, } goodHTTPConnManager1 = &httppb.HttpConnectionManager{ RouteSpecifier: &httppb.HttpConnectionManager_Rds{ Rds: &httppb.Rds{ ConfigSource: &basepb.ConfigSource{ ConfigSourceSpecifier: &basepb.ConfigSource_Ads{Ads: &basepb.AggregatedConfigSource{}}, }, RouteConfigName: goodRouteName1, }, }, } marshaledConnMgr1, _ = proto.Marshal(goodHTTPConnManager1) goodListener1 = &xdspb.Listener{ Name: goodLDSTarget1, ApiListener: &listenerpb.ApiListener{ ApiListener: &anypb.Any{ TypeUrl: httpConnManagerURL, Value: marshaledConnMgr1, }, }, } marshaledListener1, _ = proto.Marshal(goodListener1) goodListener2 = &xdspb.Listener{ Name: goodLDSTarget2, ApiListener: &listenerpb.ApiListener{ ApiListener: &anypb.Any{ TypeUrl: httpConnManagerURL, Value: marshaledConnMgr1, }, }, } marshaledListener2, _ = proto.Marshal(goodListener2) noAPIListener = &xdspb.Listener{Name: goodLDSTarget1} marshaledNoAPIListener, _ = proto.Marshal(noAPIListener) badAPIListener2 = &xdspb.Listener{ Name: goodLDSTarget2, ApiListener: &listenerpb.ApiListener{ ApiListener: &anypb.Any{ TypeUrl: httpConnManagerURL, Value: []byte{1, 2, 3, 4}, }, }, } badlyMarshaledAPIListener2, _ = proto.Marshal(badAPIListener2) goodLDSResponse1 = &xdspb.DiscoveryResponse{ Resources: []*anypb.Any{ { TypeUrl: version.V2ListenerURL, Value: marshaledListener1, }, }, TypeUrl: version.V2ListenerURL, } goodLDSResponse2 = &xdspb.DiscoveryResponse{ Resources: []*anypb.Any{ { TypeUrl: version.V2ListenerURL, Value: marshaledListener2, }, }, TypeUrl: version.V2ListenerURL, } emptyLDSResponse = &xdspb.DiscoveryResponse{TypeUrl: version.V2ListenerURL} badlyMarshaledLDSResponse = &xdspb.DiscoveryResponse{ Resources: []*anypb.Any{ { TypeUrl: version.V2ListenerURL, Value: []byte{1, 2, 3, 4}, }, }, TypeUrl: version.V2ListenerURL, } badResourceTypeInLDSResponse = &xdspb.DiscoveryResponse{ Resources: []*anypb.Any{ { TypeUrl: httpConnManagerURL, Value: marshaledConnMgr1, }, }, TypeUrl: version.V2ListenerURL, } ldsResponseWithMultipleResources = &xdspb.DiscoveryResponse{ Resources: []*anypb.Any{ { TypeUrl: version.V2ListenerURL, Value: marshaledListener2, }, { TypeUrl: version.V2ListenerURL, Value: marshaledListener1, }, }, TypeUrl: version.V2ListenerURL, } noAPIListenerLDSResponse = &xdspb.DiscoveryResponse{ Resources: []*anypb.Any{ { TypeUrl: version.V2ListenerURL, Value: marshaledNoAPIListener, }, }, TypeUrl: version.V2ListenerURL, } goodBadUglyLDSResponse = &xdspb.DiscoveryResponse{ Resources: []*anypb.Any{ { TypeUrl: version.V2ListenerURL, Value: marshaledListener2, }, { TypeUrl: version.V2ListenerURL, Value: marshaledListener1, }, { TypeUrl: version.V2ListenerURL, Value: badlyMarshaledAPIListener2, }, }, TypeUrl: version.V2ListenerURL, } badlyMarshaledRDSResponse = &xdspb.DiscoveryResponse{ Resources: []*anypb.Any{ { TypeUrl: version.V2RouteConfigURL, Value: []byte{1, 2, 3, 4}, }, }, TypeUrl: version.V2RouteConfigURL, } badResourceTypeInRDSResponse = &xdspb.DiscoveryResponse{ Resources: []*anypb.Any{ { TypeUrl: httpConnManagerURL, Value: marshaledConnMgr1, }, }, TypeUrl: version.V2RouteConfigURL, } emptyRouteConfig = &xdspb.RouteConfiguration{} marshaledEmptyRouteConfig, _ = proto.Marshal(emptyRouteConfig) noVirtualHostsInRDSResponse = &xdspb.DiscoveryResponse{ Resources: []*anypb.Any{ { TypeUrl: version.V2RouteConfigURL, Value: marshaledEmptyRouteConfig, }, }, TypeUrl: version.V2RouteConfigURL, } goodRouteConfig1 = &xdspb.RouteConfiguration{ Name: goodRouteName1, VirtualHosts: []*routepb.VirtualHost{ { Domains: []string{uninterestingDomain}, Routes: []*routepb.Route{ { Match: &routepb.RouteMatch{PathSpecifier: &routepb.RouteMatch_Prefix{Prefix: ""}}, Action: &routepb.Route_Route{ Route: &routepb.RouteAction{ ClusterSpecifier: &routepb.RouteAction_Cluster{Cluster: uninterestingClusterName}, }, }, }, }, }, { Domains: []string{goodLDSTarget1}, Routes: []*routepb.Route{ { Match: &routepb.RouteMatch{PathSpecifier: &routepb.RouteMatch_Prefix{Prefix: ""}}, Action: &routepb.Route_Route{ Route: &routepb.RouteAction{ ClusterSpecifier: &routepb.RouteAction_Cluster{Cluster: goodClusterName1}, }, }, }, }, }, }, } marshaledGoodRouteConfig1, _ = proto.Marshal(goodRouteConfig1) goodRouteConfig2 = &xdspb.RouteConfiguration{ Name: goodRouteName2, VirtualHosts: []*routepb.VirtualHost{ { Domains: []string{uninterestingDomain}, Routes: []*routepb.Route{ { Match: &routepb.RouteMatch{PathSpecifier: &routepb.RouteMatch_Prefix{Prefix: ""}}, Action: &routepb.Route_Route{ Route: &routepb.RouteAction{ ClusterSpecifier: &routepb.RouteAction_Cluster{Cluster: uninterestingClusterName}, }, }, }, }, }, { Domains: []string{goodLDSTarget1}, Routes: []*routepb.Route{ { Match: &routepb.RouteMatch{PathSpecifier: &routepb.RouteMatch_Prefix{Prefix: ""}}, Action: &routepb.Route_Route{ Route: &routepb.RouteAction{ ClusterSpecifier: &routepb.RouteAction_Cluster{Cluster: goodClusterName2}, }, }, }, }, }, }, } marshaledGoodRouteConfig2, _ = proto.Marshal(goodRouteConfig2) goodRDSResponse1 = &xdspb.DiscoveryResponse{ Resources: []*anypb.Any{ { TypeUrl: version.V2RouteConfigURL, Value: marshaledGoodRouteConfig1, }, }, TypeUrl: version.V2RouteConfigURL, } goodRDSResponse2 = &xdspb.DiscoveryResponse{ Resources: []*anypb.Any{ { TypeUrl: version.V2RouteConfigURL, Value: marshaledGoodRouteConfig2, }, }, TypeUrl: version.V2RouteConfigURL, } ) type watchHandleTestcase struct { rType xdsclient.ResourceType resourceName string responseToHandle *xdspb.DiscoveryResponse wantHandleErr bool wantUpdate interface{} wantUpdateErr bool } type testUpdateReceiver struct { f func(rType xdsclient.ResourceType, d map[string]interface{}) } func (t *testUpdateReceiver) NewListeners(d map[string]xdsclient.ListenerUpdate) { dd := make(map[string]interface{}) for k, v := range d { dd[k] = v } t.newUpdate(xdsclient.ListenerResource, dd) } func (t *testUpdateReceiver) NewRouteConfigs(d map[string]xdsclient.RouteConfigUpdate) { dd := make(map[string]interface{}) for k, v := range d { dd[k] = v } t.newUpdate(xdsclient.RouteConfigResource, dd) } func (t *testUpdateReceiver) NewClusters(d map[string]xdsclient.ClusterUpdate) { dd := make(map[string]interface{}) for k, v := range d { dd[k] = v } t.newUpdate(xdsclient.ClusterResource, dd) } func (t *testUpdateReceiver) NewEndpoints(d map[string]xdsclient.EndpointsUpdate) { dd := make(map[string]interface{}) for k, v := range d { dd[k] = v } t.newUpdate(xdsclient.EndpointsResource, dd) } func (t *testUpdateReceiver) newUpdate(rType xdsclient.ResourceType, d map[string]interface{}) { t.f(rType, d) } // testWatchHandle is called to test response handling for each xDS. // // It starts the xDS watch as configured in test, waits for the fake xds server // to receive the request (so watch callback is installed), and calls // handleXDSResp with responseToHandle (if it's set). It then compares the // update received by watch callback with the expected results. func testWatchHandle(t *testing.T, test *watchHandleTestcase) { fakeServer, cc, cleanup := startServerAndGetCC(t) defer cleanup() type updateErr struct { u interface{} err error } gotUpdateCh := testutils.NewChannel() v2c, err := newV2Client(&testUpdateReceiver{ f: func(rType xdsclient.ResourceType, d map[string]interface{}) { if rType == test.rType { if u, ok := d[test.resourceName]; ok { gotUpdateCh.Send(updateErr{u, nil}) } } }, }, cc, goodNodeProto, func(int) time.Duration { return 0 }, nil) if err != nil { t.Fatal(err) } defer v2c.Close() // RDS needs an existing LDS watch for the hostname. if test.rType == xdsclient.RouteConfigResource { doLDS(t, v2c, fakeServer) } // Register the watcher, this will also trigger the v2Client to send the xDS // request. v2c.AddWatch(test.rType, test.resourceName) // Wait till the request makes it to the fakeServer. This ensures that // the watch request has been processed by the v2Client. ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer cancel() if _, err := fakeServer.XDSRequestChan.Receive(ctx); err != nil { t.Fatalf("Timeout waiting for an xDS request: %v", err) } // Directly push the response through a call to handleXDSResp. This bypasses // the fakeServer, so it's only testing the handle logic. Client response // processing is covered elsewhere. // // Also note that this won't trigger ACK, so there's no need to clear the // request channel afterwards. var handleXDSResp func(response *xdspb.DiscoveryResponse) error switch test.rType { case xdsclient.ListenerResource: handleXDSResp = v2c.handleLDSResponse case xdsclient.RouteConfigResource: handleXDSResp = v2c.handleRDSResponse case xdsclient.ClusterResource: handleXDSResp = v2c.handleCDSResponse case xdsclient.EndpointsResource: handleXDSResp = v2c.handleEDSResponse } if err := handleXDSResp(test.responseToHandle); (err != nil) != test.wantHandleErr { t.Fatalf("v2c.handleRDSResponse() returned err: %v, wantErr: %v", err, test.wantHandleErr) } // If the test doesn't expect the callback to be invoked, verify that no // update or error is pushed to the callback. // // Cannot directly compare test.wantUpdate with nil (typed vs non-typed nil: // https://golang.org/doc/faq#nil_error). if c := test.wantUpdate; c == nil || (reflect.ValueOf(c).Kind() == reflect.Ptr && reflect.ValueOf(c).IsNil()) { update, err := gotUpdateCh.Receive(ctx) if err == context.DeadlineExceeded { return } t.Fatalf("Unexpected update: +%v", update) } wantUpdate := reflect.ValueOf(test.wantUpdate).Elem().Interface() uErr, err := gotUpdateCh.Receive(ctx) if err == context.DeadlineExceeded { t.Fatal("Timeout expecting xDS update") } gotUpdate := uErr.(updateErr).u if diff := cmp.Diff(gotUpdate, wantUpdate); diff != "" { t.Fatalf("got update : %+v, want %+v, diff: %s", gotUpdate, wantUpdate, diff) } gotUpdateErr := uErr.(updateErr).err if (gotUpdateErr != nil) != test.wantUpdateErr { t.Fatalf("got xDS update error {%v}, wantErr: %v", gotUpdateErr, test.wantUpdateErr) } } // startServerAndGetCC starts a fake XDS server and also returns a ClientConn // connected to it. func startServerAndGetCC(t *testing.T) (*fakeserver.Server, *grpc.ClientConn, func()) { t.Helper() fs, sCleanup, err := fakeserver.StartServer() if err != nil { t.Fatalf("Failed to start fake xDS server: %v", err) } cc, ccCleanup, err := fs.XDSClientConn() if err != nil { sCleanup() t.Fatalf("Failed to get a clientConn to the fake xDS server: %v", err) } return fs, cc, func() { sCleanup() ccCleanup() } } func newV2Client(p xdsclient.UpdateHandler, cc *grpc.ClientConn, n *basepb.Node, b func(int) time.Duration, l *grpclog.PrefixLogger) (*client, error) { c, err := newClient(cc, xdsclient.BuildOptions{ Parent: p, NodeProto: n, Backoff: b, Logger: l, }) if err != nil { return nil, err } return c.(*client), nil } // TestV2ClientBackoffAfterRecvError verifies if the v2Client backs off when it // encounters a Recv error while receiving an LDS response. func (s) TestV2ClientBackoffAfterRecvError(t *testing.T) { fakeServer, cc, cleanup := startServerAndGetCC(t) defer cleanup() // Override the v2Client backoff function with this, so that we can verify // that a backoff actually was triggered. boCh := make(chan int, 1) clientBackoff := func(v int) time.Duration { boCh <- v return 0 } callbackCh := make(chan struct{}) v2c, err := newV2Client(&testUpdateReceiver{ f: func(xdsclient.ResourceType, map[string]interface{}) { close(callbackCh) }, }, cc, goodNodeProto, clientBackoff, nil) if err != nil { t.Fatal(err) } defer v2c.Close() t.Log("Started xds v2Client...") v2c.AddWatch(xdsclient.ListenerResource, goodLDSTarget1) ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer cancel() if _, err := fakeServer.XDSRequestChan.Receive(ctx); err != nil { t.Fatalf("Timeout expired when expecting an LDS request") } t.Log("FakeServer received request...") fakeServer.XDSResponseChan <- &fakeserver.Response{Err: errors.New("RPC error")} t.Log("Bad LDS response pushed to fakeServer...") timer := time.NewTimer(1 * time.Second) select { case <-timer.C: t.Fatal("Timeout when expecting LDS update") case <-boCh: timer.Stop() t.Log("v2Client backed off before retrying...") case <-callbackCh: t.Fatal("Received unexpected LDS callback") } if _, err := fakeServer.XDSRequestChan.Receive(ctx); err != nil { t.Fatalf("Timeout expired when expecting an LDS request") } t.Log("FakeServer received request after backoff...") } // TestV2ClientRetriesAfterBrokenStream verifies the case where a stream // encountered a Recv() error, and is expected to send out xDS requests for // registered watchers once it comes back up again. func (s) TestV2ClientRetriesAfterBrokenStream(t *testing.T) { fakeServer, cc, cleanup := startServerAndGetCC(t) defer cleanup() callbackCh := testutils.NewChannel() v2c, err := newV2Client(&testUpdateReceiver{ f: func(rType xdsclient.ResourceType, d map[string]interface{}) { if rType == xdsclient.ListenerResource { if u, ok := d[goodLDSTarget1]; ok { t.Logf("Received LDS callback with ldsUpdate {%+v}", u) callbackCh.Send(struct{}{}) } } }, }, cc, goodNodeProto, func(int) time.Duration { return 0 }, nil) if err != nil { t.Fatal(err) } defer v2c.Close() t.Log("Started xds v2Client...") v2c.AddWatch(xdsclient.ListenerResource, goodLDSTarget1) ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer cancel() if _, err := fakeServer.XDSRequestChan.Receive(ctx); err != nil { t.Fatalf("Timeout expired when expecting an LDS request") } t.Log("FakeServer received request...") fakeServer.XDSResponseChan <- &fakeserver.Response{Resp: goodLDSResponse1} t.Log("Good LDS response pushed to fakeServer...") if _, err := callbackCh.Receive(ctx); err != nil { t.Fatal("Timeout when expecting LDS update") } // Read the ack, so the next request is sent after stream re-creation. if _, err := fakeServer.XDSRequestChan.Receive(ctx); err != nil { t.Fatalf("Timeout expired when expecting an LDS ACK") } fakeServer.XDSResponseChan <- &fakeserver.Response{Err: errors.New("RPC error")} t.Log("Bad LDS response pushed to fakeServer...") val, err := fakeServer.XDSRequestChan.Receive(ctx) if err == context.DeadlineExceeded { t.Fatalf("Timeout expired when expecting LDS update") } gotRequest := val.(*fakeserver.Request) if !proto.Equal(gotRequest.Req, goodLDSRequest) { t.Fatalf("gotRequest: %+v, wantRequest: %+v", gotRequest.Req, goodLDSRequest) } } // TestV2ClientWatchWithoutStream verifies the case where a watch is started // when the xds stream is not created. The watcher should not receive any update // (because there won't be any xds response, and timeout is done at a upper // level). And when the stream is re-created, the watcher should get future // updates. func (s) TestV2ClientWatchWithoutStream(t *testing.T) { fakeServer, sCleanup, err := fakeserver.StartServer() if err != nil { t.Fatalf("Failed to start fake xDS server: %v", err) } defer sCleanup() const scheme = "xds_client_test_whatever" rb := manual.NewBuilderWithScheme(scheme) rb.InitialState(resolver.State{Addresses: []resolver.Address{{Addr: "no.such.server"}}}) cc, err := grpc.Dial(scheme+":///whatever", grpc.WithInsecure(), grpc.WithResolvers(rb)) if err != nil { t.Fatalf("Failed to dial ClientConn: %v", err) } defer cc.Close() callbackCh := testutils.NewChannel() v2c, err := newV2Client(&testUpdateReceiver{ f: func(rType xdsclient.ResourceType, d map[string]interface{}) { if rType == xdsclient.ListenerResource { if u, ok := d[goodLDSTarget1]; ok { t.Logf("Received LDS callback with ldsUpdate {%+v}", u) callbackCh.Send(u) } } }, }, cc, goodNodeProto, func(int) time.Duration { return 0 }, nil) if err != nil { t.Fatal(err) } defer v2c.Close() t.Log("Started xds v2Client...") // This watch is started when the xds-ClientConn is in Transient Failure, // and no xds stream is created. v2c.AddWatch(xdsclient.ListenerResource, goodLDSTarget1) // The watcher should receive an update, with a timeout error in it. ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond) defer cancel() if v, err := callbackCh.Receive(ctx); err == nil { t.Fatalf("Expect an timeout error from watcher, got %v", v) } // Send the real server address to the ClientConn, the stream should be // created, and the previous watch should be sent. rb.UpdateState(resolver.State{ Addresses: []resolver.Address{{Addr: fakeServer.Address}}, }) ctx, cancel = context.WithTimeout(context.Background(), defaultTestTimeout) defer cancel() if _, err := fakeServer.XDSRequestChan.Receive(ctx); err != nil { t.Fatalf("Timeout expired when expecting an LDS request") } t.Log("FakeServer received request...") fakeServer.XDSResponseChan <- &fakeserver.Response{Resp: goodLDSResponse1} t.Log("Good LDS response pushed to fakeServer...") if v, err := callbackCh.Receive(ctx); err != nil { t.Fatal("Timeout when expecting LDS update") } else if _, ok := v.(xdsclient.ListenerUpdate); !ok { t.Fatalf("Expect an LDS update from watcher, got %v", v) } } func newStringP(s string) *string { return &s }