/* * * Copyright 2019 gRPC authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. * */ package resolver import ( "context" "errors" "net/url" "reflect" "strings" "testing" "time" xxhash "github.com/cespare/xxhash/v2" "github.com/envoyproxy/go-control-plane/pkg/wellknown" "github.com/google/go-cmp/cmp" "github.com/google/uuid" "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/credentials" "google.golang.org/grpc/credentials/insecure" xdscreds "google.golang.org/grpc/credentials/xds" "google.golang.org/grpc/internal" "google.golang.org/grpc/internal/envconfig" "google.golang.org/grpc/internal/grpcsync" "google.golang.org/grpc/internal/grpctest" iresolver "google.golang.org/grpc/internal/resolver" "google.golang.org/grpc/internal/testutils" xdsbootstrap "google.golang.org/grpc/internal/testutils/xds/bootstrap" "google.golang.org/grpc/internal/testutils/xds/e2e" "google.golang.org/grpc/internal/wrr" "google.golang.org/grpc/metadata" "google.golang.org/grpc/resolver" "google.golang.org/grpc/serviceconfig" "google.golang.org/grpc/status" "google.golang.org/grpc/xds/internal/balancer/clustermanager" "google.golang.org/grpc/xds/internal/balancer/ringhash" "google.golang.org/grpc/xds/internal/httpfilter" "google.golang.org/grpc/xds/internal/httpfilter/router" "google.golang.org/grpc/xds/internal/testutils/fakeclient" "google.golang.org/grpc/xds/internal/xdsclient" "google.golang.org/grpc/xds/internal/xdsclient/bootstrap" "google.golang.org/grpc/xds/internal/xdsclient/xdsresource" "google.golang.org/grpc/xds/internal/xdsclient/xdsresource/version" "google.golang.org/protobuf/types/known/durationpb" "google.golang.org/protobuf/types/known/wrapperspb" v3corepb "github.com/envoyproxy/go-control-plane/envoy/config/core/v3" v3listenerpb "github.com/envoyproxy/go-control-plane/envoy/config/listener/v3" v3routepb "github.com/envoyproxy/go-control-plane/envoy/config/route/v3" v3routerpb "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/http/router/v3" v3httppb "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/network/http_connection_manager/v3" v3discoverypb "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3" _ "google.golang.org/grpc/xds/internal/balancer/cdsbalancer" // To parse LB config ) const ( targetStr = "target" routeStr = "route" cluster = "cluster" defaultTestTimeout = 10 * time.Second defaultTestShortTimeout = 100 * time.Microsecond ) var target = resolver.Target{URL: *testutils.MustParseURL("xds:///" + targetStr)} var routerFilter = xdsresource.HTTPFilter{Name: "rtr", Filter: httpfilter.Get(router.TypeURL)} var routerFilterList = []xdsresource.HTTPFilter{routerFilter} type s struct { grpctest.Tester } func Test(t *testing.T) { grpctest.RunSubTests(t, s{}) } func (s) TestRegister(t *testing.T) { if resolver.Get(xdsScheme) == nil { t.Errorf("scheme %v is not registered", xdsScheme) } } // testClientConn is a fake implemetation of resolver.ClientConn that pushes // state updates and errors returned by the resolver on to channels for // consumption by tests. type testClientConn struct { resolver.ClientConn stateCh *testutils.Channel errorCh *testutils.Channel } func (t *testClientConn) UpdateState(s resolver.State) error { t.stateCh.Replace(s) return nil } func (t *testClientConn) ReportError(err error) { t.errorCh.Replace(err) } func (t *testClientConn) ParseServiceConfig(jsonSC string) *serviceconfig.ParseResult { return internal.ParseServiceConfig.(func(string) *serviceconfig.ParseResult)(jsonSC) } func newTestClientConn() *testClientConn { return &testClientConn{ stateCh: testutils.NewChannel(), errorCh: testutils.NewChannel(), } } // TestResolverBuilder_ClientCreationFails tests the case where xDS client // creation fails, and verifies that xDS resolver build fails as well. func (s) TestResolverBuilder_ClientCreationFails(t *testing.T) { // Override xDS client creation function and return an error. origNewClient := newXDSClient newXDSClient = func() (xdsclient.XDSClient, func(), error) { return nil, nil, errors.New("failed to create xDS client") } defer func() { newXDSClient = origNewClient }() // Build an xDS resolver and expect it to fail. builder := resolver.Get(xdsScheme) if builder == nil { t.Fatalf("resolver.Get(%v) returned nil", xdsScheme) } if _, err := builder.Build(target, newTestClientConn(), resolver.BuildOptions{}); err == nil { t.Fatalf("builder.Build(%v) succeeded when expected to fail", target) } } // TestResolverBuilder_DifferentBootstrapConfigs tests the resolver builder's // Build() method with different xDS bootstrap configurations. func (s) TestResolverBuilder_DifferentBootstrapConfigs(t *testing.T) { tests := []struct { name string bootstrapCfg *bootstrap.Config // Empty top-level xDS server config, will be set by test logic. target resolver.Target buildOpts resolver.BuildOptions wantErr string }{ { name: "good", bootstrapCfg: &bootstrap.Config{}, target: target, }, { name: "authority not defined in bootstrap", bootstrapCfg: &bootstrap.Config{ ClientDefaultListenerResourceNameTemplate: "%s", Authorities: map[string]*bootstrap.Authority{ "test-authority": { ClientListenerResourceNameTemplate: "xdstp://test-authority/%s", }, }, }, target: resolver.Target{ URL: url.URL{ Host: "non-existing-authority", Path: "/" + targetStr, }, }, wantErr: `authority "non-existing-authority" is not found in the bootstrap file`, }, { name: "xDS creds specified without certificate providers in bootstrap", bootstrapCfg: &bootstrap.Config{}, target: target, buildOpts: resolver.BuildOptions{ DialCreds: func() credentials.TransportCredentials { creds, err := xdscreds.NewClientCredentials(xdscreds.ClientOptions{FallbackCreds: insecure.NewCredentials()}) if err != nil { t.Fatalf("xds.NewClientCredentials() failed: %v", err) } return creds }(), }, wantErr: `xdsCreds specified but certificate_providers config missing in bootstrap file`, }, } for _, test := range tests { t.Run(test.name, func(t *testing.T) { mgmtServer, err := e2e.StartManagementServer(e2e.ManagementServerOptions{}) if err != nil { t.Fatalf("Starting xDS management server: %v", err) } defer mgmtServer.Stop() // Add top-level xDS server config corresponding to the above // management server. test.bootstrapCfg.XDSServer = &bootstrap.ServerConfig{ ServerURI: mgmtServer.Address, Creds: grpc.WithTransportCredentials(insecure.NewCredentials()), TransportAPI: version.TransportV3, } // Override xDS client creation to use bootstrap configuration // specified by the test. origNewClient := newXDSClient newXDSClient = func() (xdsclient.XDSClient, func(), error) { // The watch timeout and idle authority timeout values passed to // NewWithConfigForTesing() are immaterial for this test, as we // are only testing the resolver build functionality. return xdsclient.NewWithConfigForTesting(test.bootstrapCfg, defaultTestTimeout, defaultTestTimeout) } defer func() { newXDSClient = origNewClient }() builder := resolver.Get(xdsScheme) if builder == nil { t.Fatalf("resolver.Get(%v) returned nil", xdsScheme) } r, err := builder.Build(test.target, newTestClientConn(), test.buildOpts) if gotErr, wantErr := err != nil, test.wantErr != ""; gotErr != wantErr { t.Fatalf("builder.Build(%v) returned err: %v, wantErr: %v", target, err, test.wantErr) } if test.wantErr != "" && !strings.Contains(err.Error(), test.wantErr) { t.Fatalf("builder.Build(%v) returned err: %v, wantErr: %v", target, err, test.wantErr) } if err != nil { // This is the case where we expect an error and got it. return } r.Close() }) } } type setupOpts struct { bootstrapC *bootstrap.Config target resolver.Target } func testSetup(t *testing.T, opts setupOpts) (*xdsResolver, *fakeclient.Client, *testClientConn, func()) { t.Helper() fc := fakeclient.NewClient() if opts.bootstrapC != nil { fc.SetBootstrapConfig(opts.bootstrapC) } oldClientMaker := newXDSClient closeCh := make(chan struct{}) newXDSClient = func() (xdsclient.XDSClient, func(), error) { return fc, grpcsync.OnceFunc(func() { close(closeCh) }), nil } cancel := func() { // Make sure the xDS client is closed, in all (successful or failed) // cases. select { case <-time.After(defaultTestTimeout): t.Fatalf("timeout waiting for close") case <-closeCh: } newXDSClient = oldClientMaker } builder := resolver.Get(xdsScheme) if builder == nil { t.Fatalf("resolver.Get(%v) returned nil", xdsScheme) } tcc := newTestClientConn() r, err := builder.Build(opts.target, tcc, resolver.BuildOptions{}) if err != nil { t.Fatalf("builder.Build(%v) returned err: %v", target, err) } return r.(*xdsResolver), fc, tcc, func() { r.Close() cancel() } } // waitForWatchListener waits for the WatchListener method to be called on the // xdsClient within a reasonable amount of time, and also verifies that the // watch is called with the expected target. func waitForWatchListener(ctx context.Context, t *testing.T, xdsC *fakeclient.Client, wantTarget string) { t.Helper() gotTarget, err := xdsC.WaitForWatchListener(ctx) if err != nil { t.Fatalf("xdsClient.WatchService failed with error: %v", err) } if gotTarget != wantTarget { t.Fatalf("xdsClient.WatchService() called with target: %v, want %v", gotTarget, wantTarget) } } // waitForWatchRouteConfig waits for the WatchRoute method to be called on the // xdsClient within a reasonable amount of time, and also verifies that the // watch is called with the expected target. func waitForWatchRouteConfig(ctx context.Context, t *testing.T, xdsC *fakeclient.Client, wantTarget string) { t.Helper() gotTarget, err := xdsC.WaitForWatchRouteConfig(ctx) if err != nil { t.Fatalf("xdsClient.WatchService failed with error: %v", err) } if gotTarget != wantTarget { t.Fatalf("xdsClient.WatchService() called with target: %v, want %v", gotTarget, wantTarget) } } // buildResolverForTarget builds an xDS resolver for the given target. It // returns a testClientConn which allows inspection of resolver updates, and a // function to close the resolver once the test is complete. func buildResolverForTarget(t *testing.T, target resolver.Target) (*testClientConn, func()) { builder := resolver.Get(xdsScheme) if builder == nil { t.Fatalf("resolver.Get(%v) returned nil", xdsScheme) } tcc := newTestClientConn() r, err := builder.Build(target, tcc, resolver.BuildOptions{}) if err != nil { t.Fatalf("builder.Build(%v) returned err: %v", target, err) } return tcc, r.Close } // TestResolverResourceName builds an xDS resolver and verifies that the // resource name specified in the discovery request matches expectations. func (s) TestResolverResourceName(t *testing.T) { // Federation support is required when new style names are used. oldXDSFederation := envconfig.XDSFederation envconfig.XDSFederation = true defer func() { envconfig.XDSFederation = oldXDSFederation }() tests := []struct { name string listenerResourceNameTemplate string extraAuthority string dialTarget string wantResourceName string }{ { name: "default %s old style", listenerResourceNameTemplate: "%s", dialTarget: "xds:///target", wantResourceName: "target", }, { name: "old style no percent encoding", listenerResourceNameTemplate: "/path/to/%s", dialTarget: "xds:///target", wantResourceName: "/path/to/target", }, { name: "new style with %s", listenerResourceNameTemplate: "xdstp://authority.com/%s", dialTarget: "xds:///", wantResourceName: "xdstp://authority.com/", }, { name: "new style percent encoding", listenerResourceNameTemplate: "xdstp://authority.com/%s", dialTarget: "xds:///[::1]:8080", wantResourceName: "xdstp://authority.com/%5B::1%5D:8080", }, { name: "new style different authority", listenerResourceNameTemplate: "xdstp://authority.com/%s", extraAuthority: "test-authority", dialTarget: "xds://test-authority/target", wantResourceName: "xdstp://test-authority/envoy.config.listener.v3.Listener/target", }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { // Setup the management server to push the requested resource name // on to a channel. No resources are configured on the management // server as part of this test, as we are only interested in the // resource name being requested. resourceNameCh := make(chan string, 1) mgmtServer, err := e2e.StartManagementServer(e2e.ManagementServerOptions{ OnStreamRequest: func(_ int64, req *v3discoverypb.DiscoveryRequest) error { // When the resolver is being closed, the watch associated // with the listener resource will be cancelled, and it // might result in a discovery request with no resource // names. Hence, we only consider requests which contain a // resource name. var name string if len(req.GetResourceNames()) == 1 { name = req.GetResourceNames()[0] } select { case resourceNameCh <- name: default: } return nil }, }) if err != nil { t.Fatalf("Failed to start xDS management server: %v", err) } defer mgmtServer.Stop() // Create a bootstrap configuration with test options. opts := xdsbootstrap.Options{ ServerURI: mgmtServer.Address, Version: xdsbootstrap.TransportV3, ClientDefaultListenerResourceNameTemplate: tt.listenerResourceNameTemplate, } if tt.extraAuthority != "" { // In this test, we really don't care about having multiple // management servers. All we need to verify is whether the // resource name matches expectation. opts.Authorities = map[string]string{ tt.extraAuthority: mgmtServer.Address, } } cleanup, err := xdsbootstrap.CreateFile(opts) if err != nil { t.Fatal(err) } defer cleanup() _, rClose := buildResolverForTarget(t, resolver.Target{URL: *testutils.MustParseURL(tt.dialTarget)}) defer rClose() // Verify the resource name in the discovery request being sent out. select { case gotResourceName := <-resourceNameCh: if gotResourceName != tt.wantResourceName { t.Fatalf("Received discovery request with resource name: %v, want %v", gotResourceName, tt.wantResourceName) } case <-time.After(defaultTestTimeout): t.Fatalf("Timeout when waiting for discovery request") } }) } } // TestResolverWatchCallbackAfterClose tests the case where a service update // from the underlying xDS client is received after the resolver is closed, and // verifies that the update is not propagated to the ClientConn. func (s) TestResolverWatchCallbackAfterClose(t *testing.T) { // Setup the management server that synchronizes with the test goroutine // using two channels. The management server signals the test goroutine when // it receives a discovery request for a route configuration resource. And // the test goroutine signals the management server when the resolver is // closed. waitForRouteConfigDiscoveryReqCh := make(chan struct{}) waitForResolverCloseCh := make(chan struct{}) mgmtServer, err := e2e.StartManagementServer(e2e.ManagementServerOptions{ OnStreamRequest: func(_ int64, req *v3discoverypb.DiscoveryRequest) error { if req.GetTypeUrl() == version.V3RouteConfigURL { close(waitForRouteConfigDiscoveryReqCh) <-waitForResolverCloseCh } return nil }, }) if err != nil { t.Fatalf("Failed to start xDS management server: %v", err) } defer mgmtServer.Stop() // Create a bootstrap configuration specifying the above management server. nodeID := uuid.New().String() cleanup, err := xdsbootstrap.CreateFile(xdsbootstrap.Options{ NodeID: nodeID, ServerURI: mgmtServer.Address, Version: xdsbootstrap.TransportV3, }) if err != nil { t.Fatal(err) } defer cleanup() // Configure listener and route configuration resources on the management // server. const serviceName = "my-service-client-side-xds" rdsName := "route-" + serviceName cdsName := "cluster-" + serviceName resources := e2e.UpdateOptions{ NodeID: nodeID, Listeners: []*v3listenerpb.Listener{e2e.DefaultClientListener(serviceName, rdsName)}, Routes: []*v3routepb.RouteConfiguration{e2e.DefaultRouteConfig(rdsName, serviceName, cdsName)}, SkipValidation: true, } ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer cancel() if err := mgmtServer.Update(ctx, resources); err != nil { t.Fatal(err) } tcc, rClose := buildResolverForTarget(t, resolver.Target{URL: *testutils.MustParseURL("xds:///" + serviceName)}) defer rClose() // Wait for a discovery request for a route configuration resource. select { case <-waitForRouteConfigDiscoveryReqCh: case <-ctx.Done(): t.Fatal("Timeout when waiting for a discovery request for a route configuration resource") } // Close the resolver and unblock the management server. rClose() close(waitForResolverCloseCh) // Verify that the update from the management server is not propagated to // the ClientConn. The xDS resolver, once closed, is expected to drop // updates from the xDS client. sCtx, sCancel := context.WithTimeout(ctx, defaultTestShortTimeout) defer sCancel() if _, err := tcc.stateCh.Receive(sCtx); err != context.DeadlineExceeded { t.Fatalf("ClientConn received an update from the resolver that was closed: %v", err) } } // TestResolverCloseClosesXDSClient tests that the xDS resolver's Close method // closes the xDS client. func (s) TestResolverCloseClosesXDSClient(t *testing.T) { bootstrapCfg := &bootstrap.Config{ XDSServer: &bootstrap.ServerConfig{ ServerURI: "dummy-management-server-address", Creds: grpc.WithTransportCredentials(insecure.NewCredentials()), TransportAPI: version.TransportV3, }, } // Override xDS client creation to use bootstrap configuration pointing to a // dummy management server. Also close a channel when the returned xDS // client is closed. closeCh := make(chan struct{}) origNewClient := newXDSClient newXDSClient = func() (xdsclient.XDSClient, func(), error) { c, cancel, err := xdsclient.NewWithConfigForTesting(bootstrapCfg, defaultTestTimeout, defaultTestTimeout) return c, func() { close(closeCh) cancel() }, err } defer func() { newXDSClient = origNewClient }() _, rClose := buildResolverForTarget(t, resolver.Target{URL: *testutils.MustParseURL("xds:///my-service-client-side-xds")}) rClose() select { case <-closeCh: case <-time.After(defaultTestTimeout): t.Fatal("Timeout when waiting for xDS client to be closed") } } // TestResolverBadServiceUpdate tests the case where a resource returned by the // management server is NACKed by the xDS client, which then returns an update // containing an error to the resolver. Verifies that the update is propagated // to the ClientConn by the resolver. It also tests the cases where the resolver // gets a good update subsequently, and another error after the good update. func (s) TestResolverBadServiceUpdate(t *testing.T) { mgmtServer, err := e2e.StartManagementServer(e2e.ManagementServerOptions{}) if err != nil { t.Fatal(err) } defer mgmtServer.Stop() // Create a bootstrap configuration specifying the above management server. nodeID := uuid.New().String() cleanup, err := xdsbootstrap.CreateFile(xdsbootstrap.Options{ NodeID: nodeID, ServerURI: mgmtServer.Address, Version: xdsbootstrap.TransportV3, }) if err != nil { t.Fatal(err) } defer cleanup() const serviceName = "my-service-client-side-xds" tcc, rClose := buildResolverForTarget(t, resolver.Target{URL: *testutils.MustParseURL("xds:///" + serviceName)}) defer rClose() // Configure a listener resource that is expected to be NACKed because it // does not contain the `RouteSpecifier` field in the HTTPConnectionManager. hcm := testutils.MarshalAny(&v3httppb.HttpConnectionManager{ HttpFilters: []*v3httppb.HttpFilter{e2e.HTTPFilter("router", &v3routerpb.Router{})}, }) lis := &v3listenerpb.Listener{ Name: serviceName, ApiListener: &v3listenerpb.ApiListener{ApiListener: hcm}, FilterChains: []*v3listenerpb.FilterChain{{ Name: "filter-chain-name", Filters: []*v3listenerpb.Filter{{ Name: wellknown.HTTPConnectionManager, ConfigType: &v3listenerpb.Filter_TypedConfig{TypedConfig: hcm}, }}, }}, } resources := e2e.UpdateOptions{ NodeID: nodeID, Listeners: []*v3listenerpb.Listener{lis}, SkipValidation: true, } ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer cancel() if err := mgmtServer.Update(ctx, resources); err != nil { t.Fatal(err) } wantErr := "no RouteSpecifier" val, err := tcc.errorCh.Receive(ctx) if err != nil { t.Fatal("Timeout when waiting for error to be propagated to the ClientConn") } gotErr := val.(error) if gotErr == nil || !strings.Contains(gotErr.Error(), wantErr) { t.Fatalf("Received error from resolver %q, want %q", gotErr, wantErr) } // Configure good listener and route configuration resources on the // management server. rdsName := "route-" + serviceName cdsName := "cluster-" + serviceName resources = e2e.UpdateOptions{ NodeID: nodeID, Listeners: []*v3listenerpb.Listener{e2e.DefaultClientListener(serviceName, rdsName)}, Routes: []*v3routepb.RouteConfiguration{e2e.DefaultRouteConfig(rdsName, serviceName, cdsName)}, SkipValidation: true, } if err := mgmtServer.Update(ctx, resources); err != nil { t.Fatal(err) } // Expect a good update from the resolver. val, err = tcc.stateCh.Receive(ctx) if err != nil { t.Fatalf("Timeout waiting for an update from the resolver: %v", err) } rState := val.(resolver.State) if err := rState.ServiceConfig.Err; err != nil { t.Fatalf("Received error in service config: %v", rState.ServiceConfig.Err) } // Configure another bad resource on the management server. resources = e2e.UpdateOptions{ NodeID: nodeID, Listeners: []*v3listenerpb.Listener{lis}, SkipValidation: true, } if err := mgmtServer.Update(ctx, resources); err != nil { t.Fatal(err) } // Expect an error update from the resolver. val, err = tcc.errorCh.Receive(ctx) if err != nil { t.Fatal("Timeout when waiting for error to be propagated to the ClientConn") } gotErr = val.(error) if gotErr == nil || !strings.Contains(gotErr.Error(), wantErr) { t.Fatalf("Received error from resolver %q, want %q", gotErr, wantErr) } } // TestResolverGoodServiceUpdate tests the case where the resource returned by // the management server is ACKed by the xDS client, which then returns a good // service update to the resolver. The test verifies that the service config // returned by the resolver matches expectations, and that the config selector // returned by the resolver picks clusters based on the route configuration // received from the management server. func (s) TestResolverGoodServiceUpdate(t *testing.T) { mgmtServer, err := e2e.StartManagementServer(e2e.ManagementServerOptions{}) if err != nil { t.Fatal(err) } defer mgmtServer.Stop() // Create a bootstrap configuration specifying the above management server. nodeID := uuid.New().String() cleanup, err := xdsbootstrap.CreateFile(xdsbootstrap.Options{ NodeID: nodeID, ServerURI: mgmtServer.Address, Version: xdsbootstrap.TransportV3, }) if err != nil { t.Fatal(err) } defer cleanup() const serviceName = "my-service-client-side-xds" tcc, rClose := buildResolverForTarget(t, resolver.Target{URL: *testutils.MustParseURL("xds:///" + serviceName)}) defer rClose() ldsName := serviceName rdsName := "route-" + serviceName for _, tt := range []struct { routeConfig *v3routepb.RouteConfiguration wantServiceConfig string wantClusters map[string]bool }{ { // A route configuration with a single cluster. routeConfig: &v3routepb.RouteConfiguration{ Name: rdsName, VirtualHosts: []*v3routepb.VirtualHost{{ Domains: []string{ldsName}, Routes: []*v3routepb.Route{{ Match: &v3routepb.RouteMatch{PathSpecifier: &v3routepb.RouteMatch_Prefix{Prefix: "/"}}, Action: &v3routepb.Route_Route{Route: &v3routepb.RouteAction{ ClusterSpecifier: &v3routepb.RouteAction_WeightedClusters{WeightedClusters: &v3routepb.WeightedCluster{ Clusters: []*v3routepb.WeightedCluster_ClusterWeight{ { Name: "test-cluster-1", Weight: &wrapperspb.UInt32Value{Value: 100}, }, }, }}, }}, }}, }}, }, wantServiceConfig: ` { "loadBalancingConfig": [{ "xds_cluster_manager_experimental": { "children": { "cluster:test-cluster-1": { "childPolicy": [{ "cds_experimental": { "cluster": "test-cluster-1" } }] } } } }] }`, wantClusters: map[string]bool{"cluster:test-cluster-1": true}, }, { // A route configuration with a two new clusters. routeConfig: &v3routepb.RouteConfiguration{ Name: rdsName, VirtualHosts: []*v3routepb.VirtualHost{{ Domains: []string{ldsName}, Routes: []*v3routepb.Route{{ Match: &v3routepb.RouteMatch{PathSpecifier: &v3routepb.RouteMatch_Prefix{Prefix: "/"}}, Action: &v3routepb.Route_Route{Route: &v3routepb.RouteAction{ ClusterSpecifier: &v3routepb.RouteAction_WeightedClusters{WeightedClusters: &v3routepb.WeightedCluster{ Clusters: []*v3routepb.WeightedCluster_ClusterWeight{ { Name: "cluster_1", Weight: &wrapperspb.UInt32Value{Value: 75}, }, { Name: "cluster_2", Weight: &wrapperspb.UInt32Value{Value: 25}, }, }, }}, }}, }}, }}, }, // This update contains the cluster from the previous update as well // as this update, as the previous config selector still references // the old cluster when the new one is pushed. wantServiceConfig: ` { "loadBalancingConfig": [{ "xds_cluster_manager_experimental": { "children": { "cluster:test-cluster-1": { "childPolicy": [{ "cds_experimental": { "cluster": "test-cluster-1" } }] }, "cluster:cluster_1": { "childPolicy": [{ "cds_experimental": { "cluster": "cluster_1" } }] }, "cluster:cluster_2": { "childPolicy": [{ "cds_experimental": { "cluster": "cluster_2" } }] } } } }] }`, wantClusters: map[string]bool{"cluster:cluster_1": true, "cluster:cluster_2": true}, }, { // A redundant route configuration update. // TODO(easwars): Do we need this, or can we do something else? Because the xds client might swallow this update. routeConfig: &v3routepb.RouteConfiguration{ Name: rdsName, VirtualHosts: []*v3routepb.VirtualHost{{ Domains: []string{ldsName}, Routes: []*v3routepb.Route{{ Match: &v3routepb.RouteMatch{PathSpecifier: &v3routepb.RouteMatch_Prefix{Prefix: "/"}}, Action: &v3routepb.Route_Route{Route: &v3routepb.RouteAction{ ClusterSpecifier: &v3routepb.RouteAction_WeightedClusters{WeightedClusters: &v3routepb.WeightedCluster{ Clusters: []*v3routepb.WeightedCluster_ClusterWeight{ { Name: "cluster_1", Weight: &wrapperspb.UInt32Value{Value: 75}, }, { Name: "cluster_2", Weight: &wrapperspb.UInt32Value{Value: 25}, }, }, }}, }}, }}, }}, }, // With this redundant update, the old config selector has been // stopped, so there are no more references to the first cluster. // Only the second update's clusters should remain. wantServiceConfig: ` { "loadBalancingConfig": [{ "xds_cluster_manager_experimental": { "children": { "cluster:cluster_1": { "childPolicy": [{ "cds_experimental": { "cluster": "cluster_1" } }] }, "cluster:cluster_2": { "childPolicy": [{ "cds_experimental": { "cluster": "cluster_2" } }] } } } }] }`, wantClusters: map[string]bool{"cluster:cluster_1": true, "cluster:cluster_2": true}, }, } { // Configure the management server with a good listener resource and a // route configuration resource, as specified by the test case. resources := e2e.UpdateOptions{ NodeID: nodeID, Listeners: []*v3listenerpb.Listener{e2e.DefaultClientListener(ldsName, rdsName)}, Routes: []*v3routepb.RouteConfiguration{tt.routeConfig}, SkipValidation: true, } ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer cancel() if err := mgmtServer.Update(ctx, resources); err != nil { t.Fatal(err) } // Read the update pushed by the resolver to the ClientConn. val, err := tcc.stateCh.Receive(ctx) if err != nil { t.Fatalf("Timeout waiting for an update from the resolver: %v", err) } rState := val.(resolver.State) if err := rState.ServiceConfig.Err; err != nil { t.Fatalf("Received error in service config: %v", rState.ServiceConfig.Err) } wantSCParsed := internal.ParseServiceConfig.(func(string) *serviceconfig.ParseResult)(tt.wantServiceConfig) if !internal.EqualServiceConfigForTesting(rState.ServiceConfig.Config, wantSCParsed.Config) { t.Errorf("Received unexpected service config") t.Error("got: ", cmp.Diff(nil, rState.ServiceConfig.Config)) t.Fatal("want: ", cmp.Diff(nil, wantSCParsed.Config)) } cs := iresolver.GetConfigSelector(rState) if cs == nil { t.Fatal("Received nil config selector in update from resolver") } pickedClusters := make(map[string]bool) // Odds of picking 75% cluster 100 times in a row: 1 in 3E-13. And // with the random number generator stubbed out, we can rely on this // to be 100% reproducible. for i := 0; i < 100; i++ { res, err := cs.SelectConfig(iresolver.RPCInfo{Context: ctx, Method: "/service/method"}) if err != nil { t.Fatalf("cs.SelectConfig(): %v", err) } cluster := clustermanager.GetPickedClusterForTesting(res.Context) pickedClusters[cluster] = true res.OnCommitted() } if !cmp.Equal(pickedClusters, tt.wantClusters) { t.Errorf("Picked clusters: %v; want: %v", pickedClusters, tt.wantClusters) } } } // TestResolverRequestHash tests a case where a resolver receives a RouteConfig update // with a HashPolicy specifying to generate a hash. The configSelector generated should // successfully generate a Hash. func (s) TestResolverRequestHash(t *testing.T) { oldRH := envconfig.XDSRingHash envconfig.XDSRingHash = true defer func() { envconfig.XDSRingHash = oldRH }() mgmtServer, err := e2e.StartManagementServer(e2e.ManagementServerOptions{}) if err != nil { t.Fatal(err) } defer mgmtServer.Stop() // Create a bootstrap configuration specifying the above management server. nodeID := uuid.New().String() cleanup, err := xdsbootstrap.CreateFile(xdsbootstrap.Options{ NodeID: nodeID, ServerURI: mgmtServer.Address, Version: xdsbootstrap.TransportV3, }) if err != nil { t.Fatal(err) } defer cleanup() const serviceName = "my-service-client-side-xds" tcc, rClose := buildResolverForTarget(t, resolver.Target{URL: *testutils.MustParseURL("xds:///" + serviceName)}) defer rClose() ldsName := serviceName rdsName := "route-" + serviceName // Configure the management server with a good listener resource and a // route configuration resource that specifies a hash policy. resources := e2e.UpdateOptions{ NodeID: nodeID, Listeners: []*v3listenerpb.Listener{e2e.DefaultClientListener(ldsName, rdsName)}, Routes: []*v3routepb.RouteConfiguration{{ Name: rdsName, VirtualHosts: []*v3routepb.VirtualHost{{ Domains: []string{ldsName}, Routes: []*v3routepb.Route{{ Match: &v3routepb.RouteMatch{PathSpecifier: &v3routepb.RouteMatch_Prefix{Prefix: "/"}}, Action: &v3routepb.Route_Route{Route: &v3routepb.RouteAction{ ClusterSpecifier: &v3routepb.RouteAction_WeightedClusters{WeightedClusters: &v3routepb.WeightedCluster{ Clusters: []*v3routepb.WeightedCluster_ClusterWeight{ { Name: "test-cluster-1", Weight: &wrapperspb.UInt32Value{Value: 100}, }, }, }}, HashPolicy: []*v3routepb.RouteAction_HashPolicy{{ PolicySpecifier: &v3routepb.RouteAction_HashPolicy_Header_{ Header: &v3routepb.RouteAction_HashPolicy_Header{ HeaderName: ":path", }, }, Terminal: true, }}, }}, }}, }}, }}, SkipValidation: true, } ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer cancel() if err := mgmtServer.Update(ctx, resources); err != nil { t.Fatal(err) } // Read the update pushed by the resolver to the ClientConn. val, err := tcc.stateCh.Receive(ctx) if err != nil { t.Fatalf("Timeout waiting for an update from the resolver: %v", err) } rState := val.(resolver.State) if err := rState.ServiceConfig.Err; err != nil { t.Fatalf("Received error in service config: %v", rState.ServiceConfig.Err) } cs := iresolver.GetConfigSelector(rState) if cs == nil { t.Fatal("Received nil config selector in update from resolver") } // Selecting a config when there was a hash policy specified in the route // that will be selected should put a request hash in the config's context. res, err := cs.SelectConfig(iresolver.RPCInfo{ Context: metadata.NewOutgoingContext(ctx, metadata.Pairs(":path", "/products")), Method: "/service/method", }) if err != nil { t.Fatalf("cs.SelectConfig(): %v", err) } gotHash := ringhash.GetRequestHashForTesting(res.Context) wantHash := xxhash.Sum64String("/products") if gotHash != wantHash { t.Fatalf("Got request hash: %v, want: %v", gotHash, wantHash) } } // TestResolverRemovedWithRPCs tests the case where resources are removed from // the management server, causing it to send an empty update to the xDS client, // which returns a resource-not-found error to the xDS resolver. The test // verifies that an ongoing RPC is handled properly when this happens. func (s) TestResolverRemovedWithRPCs(t *testing.T) { mgmtServer, err := e2e.StartManagementServer(e2e.ManagementServerOptions{}) if err != nil { t.Fatal(err) } defer mgmtServer.Stop() // Create a bootstrap configuration specifying the above management server. nodeID := uuid.New().String() cleanup, err := xdsbootstrap.CreateFile(xdsbootstrap.Options{ NodeID: nodeID, ServerURI: mgmtServer.Address, Version: xdsbootstrap.TransportV3, }) if err != nil { t.Fatal(err) } defer cleanup() const serviceName = "my-service-client-side-xds" tcc, rClose := buildResolverForTarget(t, resolver.Target{URL: *testutils.MustParseURL("xds:///" + serviceName)}) defer rClose() ldsName := serviceName rdsName := "route-" + serviceName // Configure the management server with a good listener and route // configuration resource. resources := e2e.UpdateOptions{ NodeID: nodeID, Listeners: []*v3listenerpb.Listener{e2e.DefaultClientListener(ldsName, rdsName)}, Routes: []*v3routepb.RouteConfiguration{e2e.DefaultRouteConfig(rdsName, ldsName, "test-cluster-1")}, SkipValidation: true, } ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer cancel() if err := mgmtServer.Update(ctx, resources); err != nil { t.Fatal(err) } // Read the update pushed by the resolver to the ClientConn. val, err := tcc.stateCh.Receive(ctx) if err != nil { t.Fatalf("Timeout waiting for an update from the resolver: %v", err) } rState := val.(resolver.State) if err := rState.ServiceConfig.Err; err != nil { t.Fatalf("Received error in service config: %v", rState.ServiceConfig.Err) } wantSCParsed := internal.ParseServiceConfig.(func(string) *serviceconfig.ParseResult)(` { "loadBalancingConfig": [ { "xds_cluster_manager_experimental": { "children": { "cluster:test-cluster-1": { "childPolicy": [ { "cds_experimental": { "cluster": "test-cluster-1" } } ] } } } } ] }`) if !internal.EqualServiceConfigForTesting(rState.ServiceConfig.Config, wantSCParsed.Config) { t.Fatalf("Got service config:\n%s \nWant service config:\n%s", cmp.Diff(nil, rState.ServiceConfig.Config), cmp.Diff(nil, wantSCParsed.Config)) } cs := iresolver.GetConfigSelector(rState) if cs == nil { t.Fatal("Received nil config selector in update from resolver") } res, err := cs.SelectConfig(iresolver.RPCInfo{Context: ctx, Method: "/service/method"}) if err != nil { t.Fatalf("cs.SelectConfig(): %v", err) } // Delete the resources on the management server. This should result in a // resource-not-found error from the xDS client. if err := mgmtServer.Update(ctx, e2e.UpdateOptions{NodeID: nodeID}); err != nil { t.Fatal(err) } // The RPC started earlier is still in progress. So, the xDS resolver will // not produce an empty service config at this point. Instead it will retain // the cluster to which the RPC is ongoing in the service config, but will // return an erroring config selector which will fail new RPCs. val, err = tcc.stateCh.Receive(ctx) if err != nil { t.Fatalf("Timeout waiting for an update from the resolver: %v", err) } rState = val.(resolver.State) if err := rState.ServiceConfig.Err; err != nil { t.Fatalf("Received error in service config: %v", rState.ServiceConfig.Err) } if !internal.EqualServiceConfigForTesting(rState.ServiceConfig.Config, wantSCParsed.Config) { t.Fatalf("Got service config:\n%s \nWant service config:\n%s", cmp.Diff(nil, rState.ServiceConfig.Config), cmp.Diff(nil, wantSCParsed.Config)) } cs = iresolver.GetConfigSelector(rState) if cs == nil { t.Fatal("Received nil config selector in update from resolver") } _, err = cs.SelectConfig(iresolver.RPCInfo{Context: ctx, Method: "/service/method"}) if err == nil || status.Code(err) != codes.Unavailable { t.Fatalf("cs.SelectConfig() returned: %v, want: %v", err, codes.Unavailable) } // "Finish the RPC"; this could cause a panic if the resolver doesn't // handle it correctly. res.OnCommitted() // Now that the RPC is committed, the xDS resolver is expected to send an // update with an empty service config. val, err = tcc.stateCh.Receive(ctx) if err != nil { t.Fatalf("Timeout waiting for an update from the resolver: %v", err) } rState = val.(resolver.State) if err := rState.ServiceConfig.Err; err != nil { t.Fatalf("Received error in service config: %v", rState.ServiceConfig.Err) } wantSCParsed = internal.ParseServiceConfig.(func(string) *serviceconfig.ParseResult)(`{}`) if !internal.EqualServiceConfigForTesting(rState.ServiceConfig.Config, wantSCParsed.Config) { t.Fatalf("Got service config:\n%s \nWant service config:\n%s", cmp.Diff(nil, rState.ServiceConfig.Config), cmp.Diff(nil, wantSCParsed.Config)) } } // TestResolverRemovedResource tests the case where resources returned by the // management server are removed. The test verifies that the resolver pushes the // expected config selector and service config in this case. func (s) TestResolverRemovedResource(t *testing.T) { mgmtServer, err := e2e.StartManagementServer(e2e.ManagementServerOptions{}) if err != nil { t.Fatal(err) } defer mgmtServer.Stop() // Create a bootstrap configuration specifying the above management server. nodeID := uuid.New().String() cleanup, err := xdsbootstrap.CreateFile(xdsbootstrap.Options{ NodeID: nodeID, ServerURI: mgmtServer.Address, Version: xdsbootstrap.TransportV3, }) if err != nil { t.Fatal(err) } defer cleanup() const serviceName = "my-service-client-side-xds" tcc, rClose := buildResolverForTarget(t, resolver.Target{URL: *testutils.MustParseURL("xds:///" + serviceName)}) defer rClose() // Configure the management server with a good listener and route // configuration resource. ldsName := serviceName rdsName := "route-" + serviceName resources := e2e.UpdateOptions{ NodeID: nodeID, Listeners: []*v3listenerpb.Listener{e2e.DefaultClientListener(ldsName, rdsName)}, Routes: []*v3routepb.RouteConfiguration{e2e.DefaultRouteConfig(rdsName, ldsName, "test-cluster-1")}, SkipValidation: true, } ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer cancel() if err := mgmtServer.Update(ctx, resources); err != nil { t.Fatal(err) } // Read the update pushed by the resolver to the ClientConn. val, err := tcc.stateCh.Receive(ctx) if err != nil { t.Fatalf("Timeout waiting for an update from the resolver: %v", err) } rState := val.(resolver.State) if err := rState.ServiceConfig.Err; err != nil { t.Fatalf("Received error in service config: %v", rState.ServiceConfig.Err) } wantSCParsed := internal.ParseServiceConfig.(func(string) *serviceconfig.ParseResult)(` { "loadBalancingConfig": [ { "xds_cluster_manager_experimental": { "children": { "cluster:test-cluster-1": { "childPolicy": [ { "cds_experimental": { "cluster": "test-cluster-1" } } ] } } } } ] }`) if !internal.EqualServiceConfigForTesting(rState.ServiceConfig.Config, wantSCParsed.Config) { t.Fatalf("Got service config:\n%s \nWant service config:\n%s", cmp.Diff(nil, rState.ServiceConfig.Config), cmp.Diff(nil, wantSCParsed.Config)) } // "Make an RPC" by invoking the config selector. cs := iresolver.GetConfigSelector(rState) if cs == nil { t.Fatal("Received nil config selector in update from resolver") } res, err := cs.SelectConfig(iresolver.RPCInfo{Context: ctx, Method: "/service/method"}) if err != nil { t.Fatalf("cs.SelectConfig(): %v", err) } // "Finish the RPC"; this could cause a panic if the resolver doesn't // handle it correctly. res.OnCommitted() // Delete the resources on the management server, resulting in a // resource-not-found error from the xDS client. if err := mgmtServer.Update(ctx, e2e.UpdateOptions{NodeID: nodeID}); err != nil { t.Fatal(err) } // The channel should receive the existing service config with the original // cluster but with an erroring config selector. val, err = tcc.stateCh.Receive(ctx) if err != nil { t.Fatalf("Timeout waiting for an update from the resolver: %v", err) } rState = val.(resolver.State) if err := rState.ServiceConfig.Err; err != nil { t.Fatalf("Received error in service config: %v", rState.ServiceConfig.Err) } if !internal.EqualServiceConfigForTesting(rState.ServiceConfig.Config, wantSCParsed.Config) { t.Fatalf("Got service config:\n%s \nWant service config:\n%s", cmp.Diff(nil, rState.ServiceConfig.Config), cmp.Diff(nil, wantSCParsed.Config)) } // "Make another RPC" by invoking the config selector. cs = iresolver.GetConfigSelector(rState) if cs == nil { t.Fatal("Received nil config selector in update from resolver") } res, err = cs.SelectConfig(iresolver.RPCInfo{Context: ctx, Method: "/service/method"}) if err == nil || status.Code(err) != codes.Unavailable { t.Fatalf("cs.SelectConfig() got %v, %v, expected UNAVAILABLE error", res, err) } // In the meantime, an empty ServiceConfig update should have been sent. val, err = tcc.stateCh.Receive(ctx) if err != nil { t.Fatalf("Timeout waiting for an update from the resolver: %v", err) } rState = val.(resolver.State) if err := rState.ServiceConfig.Err; err != nil { t.Fatalf("Received error in service config: %v", rState.ServiceConfig.Err) } wantSCParsed = internal.ParseServiceConfig.(func(string) *serviceconfig.ParseResult)("{}") if !internal.EqualServiceConfigForTesting(rState.ServiceConfig.Config, wantSCParsed.Config) { t.Fatalf("Got service config:\n%s \nWant service config:\n%s", cmp.Diff(nil, rState.ServiceConfig.Config), cmp.Diff(nil, wantSCParsed.Config)) } } // TestResolverWRR tests the case where the route configuration returned by the // management server contains a set of weighted clusters. The test performs a // bunch of RPCs using the cluster specifier returned by the resolver, and // verifies the cluster distribution. func (s) TestResolverWRR(t *testing.T) { defer func(oldNewWRR func() wrr.WRR) { newWRR = oldNewWRR }(newWRR) newWRR = testutils.NewTestWRR mgmtServer, err := e2e.StartManagementServer(e2e.ManagementServerOptions{}) if err != nil { t.Fatal(err) } defer mgmtServer.Stop() // Create a bootstrap configuration specifying the above management server. nodeID := uuid.New().String() cleanup, err := xdsbootstrap.CreateFile(xdsbootstrap.Options{ NodeID: nodeID, ServerURI: mgmtServer.Address, Version: xdsbootstrap.TransportV3, }) if err != nil { t.Fatal(err) } defer cleanup() const serviceName = "my-service-client-side-xds" tcc, rClose := buildResolverForTarget(t, resolver.Target{URL: *testutils.MustParseURL("xds:///" + serviceName)}) defer rClose() ldsName := serviceName rdsName := "route-" + serviceName // Configure the management server with a good listener resource and a // route configuration resource. resources := e2e.UpdateOptions{ NodeID: nodeID, Listeners: []*v3listenerpb.Listener{e2e.DefaultClientListener(ldsName, rdsName)}, Routes: []*v3routepb.RouteConfiguration{{ Name: rdsName, VirtualHosts: []*v3routepb.VirtualHost{{ Domains: []string{ldsName}, Routes: []*v3routepb.Route{{ Match: &v3routepb.RouteMatch{PathSpecifier: &v3routepb.RouteMatch_Prefix{Prefix: "/"}}, Action: &v3routepb.Route_Route{Route: &v3routepb.RouteAction{ ClusterSpecifier: &v3routepb.RouteAction_WeightedClusters{WeightedClusters: &v3routepb.WeightedCluster{ Clusters: []*v3routepb.WeightedCluster_ClusterWeight{ { Name: "A", Weight: &wrapperspb.UInt32Value{Value: 75}, }, { Name: "B", Weight: &wrapperspb.UInt32Value{Value: 25}, }, }, }}, }}, }}, }}, }}, SkipValidation: true, } ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer cancel() if err := mgmtServer.Update(ctx, resources); err != nil { t.Fatal(err) } // Read the update pushed by the resolver to the ClientConn. gotState, err := tcc.stateCh.Receive(ctx) if err != nil { t.Fatalf("Timeout waiting for an update from the resolver: %v", err) } rState := gotState.(resolver.State) if err := rState.ServiceConfig.Err; err != nil { t.Fatalf("Received error in service config: %v", rState.ServiceConfig.Err) } cs := iresolver.GetConfigSelector(rState) if cs == nil { t.Fatal("Received nil config selector in update from resolver") } // Make RPCs are verify WRR behavior in the cluster specifier. picks := map[string]int{} for i := 0; i < 100; i++ { res, err := cs.SelectConfig(iresolver.RPCInfo{Context: ctx, Method: "/service/method"}) if err != nil { t.Fatalf("cs.SelectConfig(): %v", err) } picks[clustermanager.GetPickedClusterForTesting(res.Context)]++ res.OnCommitted() } want := map[string]int{"cluster:A": 75, "cluster:B": 25} if !cmp.Equal(picks, want) { t.Errorf("Picked clusters: %v; want: %v", picks, want) } } // TestResolverMaxStreamDuration tests the case where the resolver receives max // stream duration as part of the listener and route configuration resources. // The test verifies that the RPC timeout returned by the config selector // matches expectations. A non-nil max stream duration (this includes an // explicit zero value) in a matching route overrides the value specified in the // listener resource. func (s) TestResolverMaxStreamDuration(t *testing.T) { mgmtServer, err := e2e.StartManagementServer(e2e.ManagementServerOptions{}) if err != nil { t.Fatal(err) } defer mgmtServer.Stop() // Create a bootstrap configuration specifying the above management server. nodeID := uuid.New().String() cleanup, err := xdsbootstrap.CreateFile(xdsbootstrap.Options{ NodeID: nodeID, ServerURI: mgmtServer.Address, Version: xdsbootstrap.TransportV3, }) if err != nil { t.Fatal(err) } defer cleanup() const serviceName = "my-service-client-side-xds" tcc, rClose := buildResolverForTarget(t, resolver.Target{URL: *testutils.MustParseURL("xds:///" + serviceName)}) defer rClose() // Configure the management server with a listener resource that specifies a // max stream duration as part of its HTTP connection manager. Also // configure a route configuration resource, which has multiple routes with // different values of max stream duration. ldsName := serviceName rdsName := "route-" + serviceName hcm := testutils.MarshalAny(&v3httppb.HttpConnectionManager{ RouteSpecifier: &v3httppb.HttpConnectionManager_Rds{Rds: &v3httppb.Rds{ ConfigSource: &v3corepb.ConfigSource{ ConfigSourceSpecifier: &v3corepb.ConfigSource_Ads{Ads: &v3corepb.AggregatedConfigSource{}}, }, RouteConfigName: rdsName, }}, HttpFilters: []*v3httppb.HttpFilter{e2e.RouterHTTPFilter}, CommonHttpProtocolOptions: &v3corepb.HttpProtocolOptions{ MaxStreamDuration: durationpb.New(1 * time.Second), }, }) resources := e2e.UpdateOptions{ NodeID: nodeID, Listeners: []*v3listenerpb.Listener{{ Name: ldsName, ApiListener: &v3listenerpb.ApiListener{ApiListener: hcm}, FilterChains: []*v3listenerpb.FilterChain{{ Name: "filter-chain-name", Filters: []*v3listenerpb.Filter{{ Name: wellknown.HTTPConnectionManager, ConfigType: &v3listenerpb.Filter_TypedConfig{TypedConfig: hcm}, }}, }}, }}, Routes: []*v3routepb.RouteConfiguration{{ Name: rdsName, VirtualHosts: []*v3routepb.VirtualHost{{ Domains: []string{ldsName}, Routes: []*v3routepb.Route{ { Match: &v3routepb.RouteMatch{PathSpecifier: &v3routepb.RouteMatch_Prefix{Prefix: "/foo"}}, Action: &v3routepb.Route_Route{Route: &v3routepb.RouteAction{ ClusterSpecifier: &v3routepb.RouteAction_WeightedClusters{WeightedClusters: &v3routepb.WeightedCluster{ Clusters: []*v3routepb.WeightedCluster_ClusterWeight{ { Name: "A", Weight: &wrapperspb.UInt32Value{Value: 100}, }, }}, }, MaxStreamDuration: &v3routepb.RouteAction_MaxStreamDuration{ MaxStreamDuration: durationpb.New(5 * time.Second), }, }}, }, { Match: &v3routepb.RouteMatch{PathSpecifier: &v3routepb.RouteMatch_Prefix{Prefix: "/bar"}}, Action: &v3routepb.Route_Route{Route: &v3routepb.RouteAction{ ClusterSpecifier: &v3routepb.RouteAction_WeightedClusters{WeightedClusters: &v3routepb.WeightedCluster{ Clusters: []*v3routepb.WeightedCluster_ClusterWeight{ { Name: "B", Weight: &wrapperspb.UInt32Value{Value: 100}, }, }}, }, MaxStreamDuration: &v3routepb.RouteAction_MaxStreamDuration{ MaxStreamDuration: durationpb.New(0 * time.Second), }, }}, }, { Match: &v3routepb.RouteMatch{PathSpecifier: &v3routepb.RouteMatch_Prefix{Prefix: "/"}}, Action: &v3routepb.Route_Route{Route: &v3routepb.RouteAction{ ClusterSpecifier: &v3routepb.RouteAction_WeightedClusters{WeightedClusters: &v3routepb.WeightedCluster{ Clusters: []*v3routepb.WeightedCluster_ClusterWeight{ { Name: "C", Weight: &wrapperspb.UInt32Value{Value: 100}, }, }}, }, }}, }, }, }}, }}, SkipValidation: true, } ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer cancel() if err := mgmtServer.Update(ctx, resources); err != nil { t.Fatal(err) } // Read the update pushed by the resolver to the ClientConn. gotState, err := tcc.stateCh.Receive(ctx) if err != nil { t.Fatalf("Timeout waiting for an update from the resolver: %v", err) } rState := gotState.(resolver.State) if err := rState.ServiceConfig.Err; err != nil { t.Fatalf("Received error in service config: %v", rState.ServiceConfig.Err) } cs := iresolver.GetConfigSelector(rState) if cs == nil { t.Fatal("Received nil config selector in update from resolver") } testCases := []struct { name string method string want *time.Duration }{{ name: "RDS setting", method: "/foo/method", want: newDurationP(5 * time.Second), }, { name: "explicit zero in RDS; ignore LDS", method: "/bar/method", want: nil, }, { name: "no config in RDS; fallback to LDS", method: "/baz/method", want: newDurationP(time.Second), }} for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { req := iresolver.RPCInfo{ Method: tc.method, Context: ctx, } res, err := cs.SelectConfig(req) if err != nil { t.Errorf("cs.SelectConfig(%v): %v", req, err) return } res.OnCommitted() got := res.MethodConfig.Timeout if !cmp.Equal(got, tc.want) { t.Errorf("For method %q: res.MethodConfig.Timeout = %v; want %v", tc.method, got, tc.want) } }) } } // TestResolverDelayedOnCommitted tests that clusters remain in service // config if RPCs are in flight. func (s) TestResolverDelayedOnCommitted(t *testing.T) { mgmtServer, err := e2e.StartManagementServer(e2e.ManagementServerOptions{}) if err != nil { t.Fatal(err) } defer mgmtServer.Stop() // Create a bootstrap configuration specifying the above management server. nodeID := uuid.New().String() cleanup, err := xdsbootstrap.CreateFile(xdsbootstrap.Options{ NodeID: nodeID, ServerURI: mgmtServer.Address, Version: xdsbootstrap.TransportV3, }) if err != nil { t.Fatal(err) } defer cleanup() const serviceName = "my-service-client-side-xds" tcc, rClose := buildResolverForTarget(t, resolver.Target{URL: *testutils.MustParseURL("xds:///" + serviceName)}) defer rClose() // Configure the management server with a good listener and route // configuration resource. ldsName := serviceName rdsName := "route-" + serviceName resources := e2e.UpdateOptions{ NodeID: nodeID, Listeners: []*v3listenerpb.Listener{e2e.DefaultClientListener(ldsName, rdsName)}, Routes: []*v3routepb.RouteConfiguration{e2e.DefaultRouteConfig(rdsName, ldsName, "old-cluster")}, SkipValidation: true, } ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer cancel() if err := mgmtServer.Update(ctx, resources); err != nil { t.Fatal(err) } // Read the update pushed by the resolver to the ClientConn. val, err := tcc.stateCh.Receive(ctx) if err != nil { t.Fatalf("Timeout waiting for an update from the resolver: %v", err) } rState := val.(resolver.State) if err := rState.ServiceConfig.Err; err != nil { t.Fatalf("Received error in service config: %v", rState.ServiceConfig.Err) } wantSCParsed := internal.ParseServiceConfig.(func(string) *serviceconfig.ParseResult)(` { "loadBalancingConfig": [ { "xds_cluster_manager_experimental": { "children": { "cluster:old-cluster": { "childPolicy": [ { "cds_experimental": { "cluster": "old-cluster" } } ] } } } } ] }`) if !internal.EqualServiceConfigForTesting(rState.ServiceConfig.Config, wantSCParsed.Config) { t.Fatalf("Got service config:\n%s \nWant service config:\n%s", cmp.Diff(nil, rState.ServiceConfig.Config), cmp.Diff(nil, wantSCParsed.Config)) } // Make an RPC, but do not commit it yet. cs := iresolver.GetConfigSelector(rState) if cs == nil { t.Fatal("Received nil config selector in update from resolver") } resOld, err := cs.SelectConfig(iresolver.RPCInfo{Context: ctx, Method: "/service/method"}) if err != nil { t.Fatalf("cs.SelectConfig(): %v", err) } if cluster := clustermanager.GetPickedClusterForTesting(resOld.Context); cluster != "cluster:old-cluster" { t.Fatalf("Picked cluster is %q, want %q", cluster, "cluster:old-cluster") } // Delay resOld.OnCommitted(). As long as there are pending RPCs to removed // clusters, they still appear in the service config. // Update the route configuration resource on the management server to // return a new cluster. resources = e2e.UpdateOptions{ NodeID: nodeID, Listeners: []*v3listenerpb.Listener{e2e.DefaultClientListener(ldsName, rdsName)}, Routes: []*v3routepb.RouteConfiguration{e2e.DefaultRouteConfig(rdsName, ldsName, "new-cluster")}, SkipValidation: true, } if err := mgmtServer.Update(ctx, resources); err != nil { t.Fatal(err) } // Read the update pushed by the resolver to the ClientConn and ensure the // old cluster is present in the service config. Also ensure that the newly // returned config selector does not hold a reference to the old cluster. val, err = tcc.stateCh.Receive(ctx) if err != nil { t.Fatalf("Timeout waiting for an update from the resolver: %v", err) } rState = val.(resolver.State) if err := rState.ServiceConfig.Err; err != nil { t.Fatalf("Received error in service config: %v", rState.ServiceConfig.Err) } wantSCParsed = internal.ParseServiceConfig.(func(string) *serviceconfig.ParseResult)(` { "loadBalancingConfig": [ { "xds_cluster_manager_experimental": { "children": { "cluster:old-cluster": { "childPolicy": [ { "cds_experimental": { "cluster": "old-cluster" } } ] }, "cluster:new-cluster": { "childPolicy": [ { "cds_experimental": { "cluster": "new-cluster" } } ] } } } } ] }`) if !internal.EqualServiceConfigForTesting(rState.ServiceConfig.Config, wantSCParsed.Config) { t.Fatalf("Got service config:\n%s\nWant service config:\n%s", cmp.Diff(nil, rState.ServiceConfig.Config), cmp.Diff(nil, wantSCParsed.Config)) } cs = iresolver.GetConfigSelector(rState) if cs == nil { t.Fatal("Received nil config selector in update from resolver") } resNew, err := cs.SelectConfig(iresolver.RPCInfo{Context: ctx, Method: "/service/method"}) if err != nil { t.Fatalf("cs.SelectConfig(): %v", err) } if cluster := clustermanager.GetPickedClusterForTesting(resNew.Context); cluster != "cluster:new-cluster" { t.Fatalf("Picked cluster is %q, want %q", cluster, "cluster:new-cluster") } // Invoke OnCommitted on the old RPC; should lead to a service config update // that deletes the old cluster, as the old cluster no longer has any // pending RPCs. resOld.OnCommitted() val, err = tcc.stateCh.Receive(ctx) if err != nil { t.Fatalf("Timeout waiting for an update from the resolver: %v", err) } rState = val.(resolver.State) if err := rState.ServiceConfig.Err; err != nil { t.Fatalf("Received error in service config: %v", rState.ServiceConfig.Err) } wantSCParsed = internal.ParseServiceConfig.(func(string) *serviceconfig.ParseResult)(` { "loadBalancingConfig": [ { "xds_cluster_manager_experimental": { "children": { "cluster:new-cluster": { "childPolicy": [ { "cds_experimental": { "cluster": "new-cluster" } } ] } } } } ] }`) if !internal.EqualServiceConfigForTesting(rState.ServiceConfig.Config, wantSCParsed.Config) { t.Fatalf("Got service config:\n%s \nWant service config:\n%s", cmp.Diff(nil, rState.ServiceConfig.Config), cmp.Diff(nil, wantSCParsed.Config)) } } // TestResolverMultipleLDSUpdates tests the case where two LDS updates with the // same RDS name to watch are received without an RDS in between. Those LDS // updates shouldn't trigger a service config update. func (s) TestResolverMultipleLDSUpdates(t *testing.T) { mgmtServer, err := e2e.StartManagementServer(e2e.ManagementServerOptions{}) if err != nil { t.Fatal(err) } defer mgmtServer.Stop() // Create a bootstrap configuration specifying the above management server. nodeID := uuid.New().String() cleanup, err := xdsbootstrap.CreateFile(xdsbootstrap.Options{ NodeID: nodeID, ServerURI: mgmtServer.Address, Version: xdsbootstrap.TransportV3, }) if err != nil { t.Fatal(err) } defer cleanup() // Build an xDS resolver that uses the above bootstrap configuration // Creating the xDS resolver should result in creation of the xDS client. const serviceName = "my-service-client-side-xds" tcc, rClose := buildResolverForTarget(t, resolver.Target{URL: *testutils.MustParseURL("xds:///" + serviceName)}) defer rClose() // Configure the management server with a listener resource, but no route // configuration resource. ldsName := serviceName rdsName := "route-" + serviceName resources := e2e.UpdateOptions{ NodeID: nodeID, Listeners: []*v3listenerpb.Listener{e2e.DefaultClientListener(ldsName, rdsName)}, SkipValidation: true, } ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer cancel() if err := mgmtServer.Update(ctx, resources); err != nil { t.Fatal(err) } // Ensure there is no update from the resolver. sCtx, sCancel := context.WithTimeout(ctx, defaultTestShortTimeout) defer sCancel() gotState, err := tcc.stateCh.Receive(sCtx) if err == nil { t.Fatalf("Received update from resolver %v when none expected", gotState) } // Configure the management server with a listener resource that points to // the same route configuration resource but has different values for some // other fields. There is still no route configuration resource on the // management server. hcm := testutils.MarshalAny(&v3httppb.HttpConnectionManager{ RouteSpecifier: &v3httppb.HttpConnectionManager_Rds{Rds: &v3httppb.Rds{ ConfigSource: &v3corepb.ConfigSource{ ConfigSourceSpecifier: &v3corepb.ConfigSource_Ads{Ads: &v3corepb.AggregatedConfigSource{}}, }, RouteConfigName: rdsName, }}, HttpFilters: []*v3httppb.HttpFilter{e2e.RouterHTTPFilter}, CommonHttpProtocolOptions: &v3corepb.HttpProtocolOptions{ MaxStreamDuration: durationpb.New(1 * time.Second), }, }) resources = e2e.UpdateOptions{ NodeID: nodeID, Listeners: []*v3listenerpb.Listener{{ Name: ldsName, ApiListener: &v3listenerpb.ApiListener{ApiListener: hcm}, FilterChains: []*v3listenerpb.FilterChain{{ Name: "filter-chain-name", Filters: []*v3listenerpb.Filter{{ Name: wellknown.HTTPConnectionManager, ConfigType: &v3listenerpb.Filter_TypedConfig{TypedConfig: hcm}, }}, }}, }}, SkipValidation: true, } if err := mgmtServer.Update(ctx, resources); err != nil { t.Fatal(err) } // Ensure that there is no update from the resolver. sCtx, sCancel = context.WithTimeout(ctx, defaultTestShortTimeout) defer sCancel() gotState, err = tcc.stateCh.Receive(sCtx) if err == nil { t.Fatalf("Received update from resolver %v when none expected", gotState) } } type filterBuilder struct { httpfilter.Filter // embedded as we do not need to implement registry / parsing in this test. path *[]string } var _ httpfilter.ClientInterceptorBuilder = &filterBuilder{} func (fb *filterBuilder) BuildClientInterceptor(config, override httpfilter.FilterConfig) (iresolver.ClientInterceptor, error) { if config == nil { panic("unexpected missing config") } *fb.path = append(*fb.path, "build:"+config.(filterCfg).s) err := config.(filterCfg).newStreamErr if override != nil { *fb.path = append(*fb.path, "override:"+override.(filterCfg).s) err = override.(filterCfg).newStreamErr } return &filterInterceptor{path: fb.path, s: config.(filterCfg).s, err: err}, nil } type filterInterceptor struct { path *[]string s string err error } func (fi *filterInterceptor) NewStream(ctx context.Context, ri iresolver.RPCInfo, done func(), newStream func(ctx context.Context, done func()) (iresolver.ClientStream, error)) (iresolver.ClientStream, error) { *fi.path = append(*fi.path, "newstream:"+fi.s) if fi.err != nil { return nil, fi.err } d := func() { *fi.path = append(*fi.path, "done:"+fi.s) done() } cs, err := newStream(ctx, d) if err != nil { return nil, err } return &clientStream{ClientStream: cs, path: fi.path, s: fi.s}, nil } type clientStream struct { iresolver.ClientStream path *[]string s string } type filterCfg struct { httpfilter.FilterConfig s string newStreamErr error } func (s) TestXDSResolverHTTPFilters(t *testing.T) { var path []string testCases := []struct { name string ldsFilters []xdsresource.HTTPFilter vhOverrides map[string]httpfilter.FilterConfig rtOverrides map[string]httpfilter.FilterConfig clOverrides map[string]httpfilter.FilterConfig rpcRes map[string][][]string selectErr string newStreamErr string }{ { name: "no router filter", ldsFilters: []xdsresource.HTTPFilter{ {Name: "foo", Filter: &filterBuilder{path: &path}, Config: filterCfg{s: "foo1"}}, }, rpcRes: map[string][][]string{ "1": { {"build:foo1", "override:foo2", "build:bar1", "override:bar2", "newstream:foo1", "newstream:bar1", "done:bar1", "done:foo1"}, }, }, selectErr: "no router filter present", }, { name: "ignored after router filter", ldsFilters: []xdsresource.HTTPFilter{ {Name: "foo", Filter: &filterBuilder{path: &path}, Config: filterCfg{s: "foo1"}}, routerFilter, {Name: "foo2", Filter: &filterBuilder{path: &path}, Config: filterCfg{s: "foo2"}}, }, rpcRes: map[string][][]string{ "1": { {"build:foo1", "newstream:foo1", "done:foo1"}, }, "2": { {"build:foo1", "newstream:foo1", "done:foo1"}, {"build:foo1", "newstream:foo1", "done:foo1"}, {"build:foo1", "newstream:foo1", "done:foo1"}, }, }, }, { name: "NewStream error; ensure earlier interceptor Done is still called", ldsFilters: []xdsresource.HTTPFilter{ {Name: "foo", Filter: &filterBuilder{path: &path}, Config: filterCfg{s: "foo1"}}, {Name: "bar", Filter: &filterBuilder{path: &path}, Config: filterCfg{s: "bar1", newStreamErr: errors.New("bar newstream err")}}, routerFilter, }, rpcRes: map[string][][]string{ "1": { {"build:foo1", "build:bar1", "newstream:foo1", "newstream:bar1" /* */, "done:foo1"}, }, "2": { {"build:foo1", "build:bar1", "newstream:foo1", "newstream:bar1" /* */, "done:foo1"}, }, }, newStreamErr: "bar newstream err", }, { name: "all overrides", ldsFilters: []xdsresource.HTTPFilter{ {Name: "foo", Filter: &filterBuilder{path: &path}, Config: filterCfg{s: "foo1", newStreamErr: errors.New("this is overridden to nil")}}, {Name: "bar", Filter: &filterBuilder{path: &path}, Config: filterCfg{s: "bar1"}}, routerFilter, }, vhOverrides: map[string]httpfilter.FilterConfig{"foo": filterCfg{s: "foo2"}, "bar": filterCfg{s: "bar2"}}, rtOverrides: map[string]httpfilter.FilterConfig{"foo": filterCfg{s: "foo3"}, "bar": filterCfg{s: "bar3"}}, clOverrides: map[string]httpfilter.FilterConfig{"foo": filterCfg{s: "foo4"}, "bar": filterCfg{s: "bar4"}}, rpcRes: map[string][][]string{ "1": { {"build:foo1", "override:foo2", "build:bar1", "override:bar2", "newstream:foo1", "newstream:bar1", "done:bar1", "done:foo1"}, {"build:foo1", "override:foo2", "build:bar1", "override:bar2", "newstream:foo1", "newstream:bar1", "done:bar1", "done:foo1"}, }, "2": { {"build:foo1", "override:foo3", "build:bar1", "override:bar3", "newstream:foo1", "newstream:bar1", "done:bar1", "done:foo1"}, {"build:foo1", "override:foo4", "build:bar1", "override:bar4", "newstream:foo1", "newstream:bar1", "done:bar1", "done:foo1"}, {"build:foo1", "override:foo3", "build:bar1", "override:bar3", "newstream:foo1", "newstream:bar1", "done:bar1", "done:foo1"}, {"build:foo1", "override:foo4", "build:bar1", "override:bar4", "newstream:foo1", "newstream:bar1", "done:bar1", "done:foo1"}, }, }, }, } for i, tc := range testCases { t.Run(tc.name, func(t *testing.T) { xdsR, xdsC, tcc, cancel := testSetup(t, setupOpts{target: target}) defer xdsR.Close() defer cancel() ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer cancel() waitForWatchListener(ctx, t, xdsC, targetStr) xdsC.InvokeWatchListenerCallback(xdsresource.ListenerUpdate{ RouteConfigName: routeStr, HTTPFilters: tc.ldsFilters, }, nil) if i == 0 { waitForWatchRouteConfig(ctx, t, xdsC, routeStr) } defer func(oldNewWRR func() wrr.WRR) { newWRR = oldNewWRR }(newWRR) newWRR = testutils.NewTestWRR // Invoke the watchAPI callback with a good service update and wait for the // UpdateState method to be called on the ClientConn. xdsC.InvokeWatchRouteConfigCallback("", xdsresource.RouteConfigUpdate{ VirtualHosts: []*xdsresource.VirtualHost{ { Domains: []string{targetStr}, Routes: []*xdsresource.Route{{ Prefix: newStringP("1"), WeightedClusters: map[string]xdsresource.WeightedCluster{ "A": {Weight: 1}, "B": {Weight: 1}, }, }, { Prefix: newStringP("2"), WeightedClusters: map[string]xdsresource.WeightedCluster{ "A": {Weight: 1}, "B": {Weight: 1, HTTPFilterConfigOverride: tc.clOverrides}, }, HTTPFilterConfigOverride: tc.rtOverrides, }}, HTTPFilterConfigOverride: tc.vhOverrides, }, }, }, nil) gotState, err := tcc.stateCh.Receive(ctx) if err != nil { t.Fatalf("Error waiting for UpdateState to be called: %v", err) } rState := gotState.(resolver.State) if err := rState.ServiceConfig.Err; err != nil { t.Fatalf("ClientConn.UpdateState received error in service config: %v", rState.ServiceConfig.Err) } cs := iresolver.GetConfigSelector(rState) if cs == nil { t.Fatal("received nil config selector") } for method, wants := range tc.rpcRes { // Order of wants is non-deterministic. remainingWant := make([][]string, len(wants)) copy(remainingWant, wants) for n := range wants { path = nil res, err := cs.SelectConfig(iresolver.RPCInfo{Method: method, Context: context.Background()}) if tc.selectErr != "" { if err == nil || !strings.Contains(err.Error(), tc.selectErr) { t.Errorf("SelectConfig(_) = _, %v; want _, Contains(%v)", err, tc.selectErr) } if err == nil { res.OnCommitted() } continue } if err != nil { t.Fatalf("Unexpected error from cs.SelectConfig(_): %v", err) } var doneFunc func() _, err = res.Interceptor.NewStream(context.Background(), iresolver.RPCInfo{}, func() {}, func(ctx context.Context, done func()) (iresolver.ClientStream, error) { doneFunc = done return nil, nil }) if tc.newStreamErr != "" { if err == nil || !strings.Contains(err.Error(), tc.newStreamErr) { t.Errorf("NewStream(...) = _, %v; want _, Contains(%v)", err, tc.newStreamErr) } if err == nil { res.OnCommitted() doneFunc() } continue } if err != nil { t.Fatalf("unexpected error from Interceptor.NewStream: %v", err) } res.OnCommitted() doneFunc() // Confirm the desired path is found in remainingWant, and remove it. pass := false for i := range remainingWant { if reflect.DeepEqual(path, remainingWant[i]) { remainingWant[i] = remainingWant[len(remainingWant)-1] remainingWant = remainingWant[:len(remainingWant)-1] pass = true break } } if !pass { t.Errorf("%q:%v - path:\n%v\nwant one of:\n%v", method, n, path, remainingWant) } } } }) } } func newDurationP(d time.Duration) *time.Duration { return &d }