/* * * Copyright 2021 gRPC authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. * */ package resolver import ( "context" "testing" "github.com/google/go-cmp/cmp" "google.golang.org/grpc/balancer" "google.golang.org/grpc/internal" iresolver "google.golang.org/grpc/internal/resolver" "google.golang.org/grpc/resolver" "google.golang.org/grpc/serviceconfig" "google.golang.org/grpc/xds/internal/balancer/clustermanager" "google.golang.org/grpc/xds/internal/clusterspecifier" "google.golang.org/grpc/xds/internal/xdsclient/xdsresource" ) func init() { balancer.Register(cspB{}) } type cspB struct{} func (cspB) Build(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.Balancer { return nil } func (cspB) Name() string { return "csp_experimental" } type cspConfig struct { ArbitraryField string `json:"arbitrary_field"` } // TestXDSResolverClusterSpecifierPlugin tests that cluster specifier plugins // produce the correct service config, and that the config selector routes to a // cluster specifier plugin supported by this service config (i.e. prefixed with // a cluster specifier plugin prefix). func (s) TestXDSResolverClusterSpecifierPlugin(t *testing.T) { xdsR, xdsC, tcc, cancel := testSetup(t, setupOpts{target: target}) defer xdsR.Close() defer cancel() ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer cancel() waitForWatchListener(ctx, t, xdsC, targetStr) xdsC.InvokeWatchListenerCallback(xdsresource.ListenerUpdate{RouteConfigName: routeStr, HTTPFilters: routerFilterList}, nil) waitForWatchRouteConfig(ctx, t, xdsC, routeStr) xdsC.InvokeWatchRouteConfigCallback("", xdsresource.RouteConfigUpdate{ VirtualHosts: []*xdsresource.VirtualHost{ { Domains: []string{targetStr}, Routes: []*xdsresource.Route{{Prefix: newStringP(""), ClusterSpecifierPlugin: "cspA"}}, }, }, // Top level csp config here - the value of cspA should get directly // placed as a child policy of xds cluster manager. ClusterSpecifierPlugins: map[string]clusterspecifier.BalancerConfig{"cspA": []map[string]interface{}{{"csp_experimental": cspConfig{ArbitraryField: "anything"}}}}, }, nil) gotState, err := tcc.stateCh.Receive(ctx) if err != nil { t.Fatalf("Error waiting for UpdateState to be called: %v", err) } rState := gotState.(resolver.State) if err := rState.ServiceConfig.Err; err != nil { t.Fatalf("ClientConn.UpdateState received error in service config: %v", rState.ServiceConfig.Err) } wantJSON := `{"loadBalancingConfig":[{ "xds_cluster_manager_experimental":{ "children":{ "cluster_specifier_plugin:cspA":{ "childPolicy":[{"csp_experimental":{"arbitrary_field":"anything"}}] } } }}]}` wantSCParsed := internal.ParseServiceConfig.(func(string) *serviceconfig.ParseResult)(wantJSON) if !internal.EqualServiceConfigForTesting(rState.ServiceConfig.Config, wantSCParsed.Config) { t.Errorf("ClientConn.UpdateState received different service config") t.Error("got: ", cmp.Diff(nil, rState.ServiceConfig.Config)) t.Fatal("want: ", cmp.Diff(nil, wantSCParsed.Config)) } cs := iresolver.GetConfigSelector(rState) if cs == nil { t.Fatal("received nil config selector") } res, err := cs.SelectConfig(iresolver.RPCInfo{Context: context.Background()}) if err != nil { t.Fatalf("Unexpected error from cs.SelectConfig(_): %v", err) } cluster := clustermanager.GetPickedClusterForTesting(res.Context) clusterWant := clusterSpecifierPluginPrefix + "cspA" if cluster != clusterWant { t.Fatalf("cluster: %+v, want: %+v", cluster, clusterWant) } } // TestXDSResolverClusterSpecifierPluginConfigUpdate tests that cluster // specifier plugins produce the correct service config, and that on an update // to the CSP Configuration, the new config is accounted for in the output // service config. func (s) TestXDSResolverClusterSpecifierPluginConfigUpdate(t *testing.T) { xdsR, xdsC, tcc, cancel := testSetup(t, setupOpts{target: target}) defer xdsR.Close() defer cancel() ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer cancel() waitForWatchListener(ctx, t, xdsC, targetStr) xdsC.InvokeWatchListenerCallback(xdsresource.ListenerUpdate{RouteConfigName: routeStr, HTTPFilters: routerFilterList}, nil) waitForWatchRouteConfig(ctx, t, xdsC, routeStr) xdsC.InvokeWatchRouteConfigCallback("", xdsresource.RouteConfigUpdate{ VirtualHosts: []*xdsresource.VirtualHost{ { Domains: []string{targetStr}, Routes: []*xdsresource.Route{{Prefix: newStringP(""), ClusterSpecifierPlugin: "cspA"}}, }, }, // Top level csp config here - the value of cspA should get directly // placed as a child policy of xds cluster manager. ClusterSpecifierPlugins: map[string]clusterspecifier.BalancerConfig{"cspA": []map[string]interface{}{{"csp_experimental": cspConfig{ArbitraryField: "anything"}}}}, }, nil) gotState, err := tcc.stateCh.Receive(ctx) if err != nil { t.Fatalf("Error waiting for UpdateState to be called: %v", err) } rState := gotState.(resolver.State) if err := rState.ServiceConfig.Err; err != nil { t.Fatalf("ClientConn.UpdateState received error in service config: %v", rState.ServiceConfig.Err) } wantJSON := `{"loadBalancingConfig":[{ "xds_cluster_manager_experimental":{ "children":{ "cluster_specifier_plugin:cspA":{ "childPolicy":[{"csp_experimental":{"arbitrary_field":"anything"}}] } } }}]}` wantSCParsed := internal.ParseServiceConfig.(func(string) *serviceconfig.ParseResult)(wantJSON) if !internal.EqualServiceConfigForTesting(rState.ServiceConfig.Config, wantSCParsed.Config) { t.Errorf("ClientConn.UpdateState received different service config") t.Error("got: ", cmp.Diff(nil, rState.ServiceConfig.Config)) t.Fatal("want: ", cmp.Diff(nil, wantSCParsed.Config)) } xdsC.InvokeWatchRouteConfigCallback("", xdsresource.RouteConfigUpdate{ VirtualHosts: []*xdsresource.VirtualHost{ { Domains: []string{targetStr}, Routes: []*xdsresource.Route{{Prefix: newStringP(""), ClusterSpecifierPlugin: "cspA"}}, }, }, // Top level csp config here - the value of cspA should get directly // placed as a child policy of xds cluster manager. ClusterSpecifierPlugins: map[string]clusterspecifier.BalancerConfig{"cspA": []map[string]interface{}{{"csp_experimental": cspConfig{ArbitraryField: "changed"}}}}, }, nil) gotState, err = tcc.stateCh.Receive(ctx) if err != nil { t.Fatalf("Error waiting for UpdateState to be called: %v", err) } rState = gotState.(resolver.State) if err := rState.ServiceConfig.Err; err != nil { t.Fatalf("ClientConn.UpdateState received error in service config: %v", rState.ServiceConfig.Err) } wantJSON = `{"loadBalancingConfig":[{ "xds_cluster_manager_experimental":{ "children":{ "cluster_specifier_plugin:cspA":{ "childPolicy":[{"csp_experimental":{"arbitrary_field":"changed"}}] } } }}]}` wantSCParsed = internal.ParseServiceConfig.(func(string) *serviceconfig.ParseResult)(wantJSON) if !internal.EqualServiceConfigForTesting(rState.ServiceConfig.Config, wantSCParsed.Config) { t.Errorf("ClientConn.UpdateState received different service config") t.Error("got: ", cmp.Diff(nil, rState.ServiceConfig.Config)) t.Fatal("want: ", cmp.Diff(nil, wantSCParsed.Config)) } } // TestXDSResolverDelayedOnCommittedCSP tests that cluster specifier plugins and // their corresponding configurations remain in service config if RPCs are in // flight. func (s) TestXDSResolverDelayedOnCommittedCSP(t *testing.T) { xdsR, xdsC, tcc, cancel := testSetup(t, setupOpts{target: target}) defer xdsR.Close() defer cancel() ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer cancel() waitForWatchListener(ctx, t, xdsC, targetStr) xdsC.InvokeWatchListenerCallback(xdsresource.ListenerUpdate{RouteConfigName: routeStr, HTTPFilters: routerFilterList}, nil) waitForWatchRouteConfig(ctx, t, xdsC, routeStr) xdsC.InvokeWatchRouteConfigCallback("", xdsresource.RouteConfigUpdate{ VirtualHosts: []*xdsresource.VirtualHost{ { Domains: []string{targetStr}, Routes: []*xdsresource.Route{{Prefix: newStringP(""), ClusterSpecifierPlugin: "cspA"}}, }, }, // Top level csp config here - the value of cspA should get directly // placed as a child policy of xds cluster manager. ClusterSpecifierPlugins: map[string]clusterspecifier.BalancerConfig{"cspA": []map[string]interface{}{{"csp_experimental": cspConfig{ArbitraryField: "anythingA"}}}}, }, nil) gotState, err := tcc.stateCh.Receive(ctx) if err != nil { t.Fatalf("Error waiting for UpdateState to be called: %v", err) } rState := gotState.(resolver.State) if err := rState.ServiceConfig.Err; err != nil { t.Fatalf("ClientConn.UpdateState received error in service config: %v", rState.ServiceConfig.Err) } wantJSON := `{"loadBalancingConfig":[{ "xds_cluster_manager_experimental":{ "children":{ "cluster_specifier_plugin:cspA":{ "childPolicy":[{"csp_experimental":{"arbitrary_field":"anythingA"}}] } } }}]}` wantSCParsed := internal.ParseServiceConfig.(func(string) *serviceconfig.ParseResult)(wantJSON) if !internal.EqualServiceConfigForTesting(rState.ServiceConfig.Config, wantSCParsed.Config) { t.Errorf("ClientConn.UpdateState received different service config") t.Error("got: ", cmp.Diff(nil, rState.ServiceConfig.Config)) t.Fatal("want: ", cmp.Diff(nil, wantSCParsed.Config)) } cs := iresolver.GetConfigSelector(rState) if cs == nil { t.Fatal("received nil config selector") } res, err := cs.SelectConfig(iresolver.RPCInfo{Context: context.Background()}) if err != nil { t.Fatalf("Unexpected error from cs.SelectConfig(_): %v", err) } cluster := clustermanager.GetPickedClusterForTesting(res.Context) clusterWant := clusterSpecifierPluginPrefix + "cspA" if cluster != clusterWant { t.Fatalf("cluster: %+v, want: %+v", cluster, clusterWant) } // delay res.OnCommitted() // Perform TWO updates to ensure the old config selector does not hold a reference to cspA xdsC.InvokeWatchRouteConfigCallback("", xdsresource.RouteConfigUpdate{ VirtualHosts: []*xdsresource.VirtualHost{ { Domains: []string{targetStr}, Routes: []*xdsresource.Route{{Prefix: newStringP(""), ClusterSpecifierPlugin: "cspB"}}, }, }, // Top level csp config here - the value of cspB should get directly // placed as a child policy of xds cluster manager. ClusterSpecifierPlugins: map[string]clusterspecifier.BalancerConfig{"cspB": []map[string]interface{}{{"csp_experimental": cspConfig{ArbitraryField: "anythingB"}}}}, }, nil) tcc.stateCh.Receive(ctx) // Ignore the first update. xdsC.InvokeWatchRouteConfigCallback("", xdsresource.RouteConfigUpdate{ VirtualHosts: []*xdsresource.VirtualHost{ { Domains: []string{targetStr}, Routes: []*xdsresource.Route{{Prefix: newStringP(""), ClusterSpecifierPlugin: "cspB"}}, }, }, // Top level csp config here - the value of cspB should get directly // placed as a child policy of xds cluster manager. ClusterSpecifierPlugins: map[string]clusterspecifier.BalancerConfig{"cspB": []map[string]interface{}{{"csp_experimental": cspConfig{ArbitraryField: "anythingB"}}}}, }, nil) gotState, err = tcc.stateCh.Receive(ctx) if err != nil { t.Fatalf("Error waiting for UpdateState to be called: %v", err) } rState = gotState.(resolver.State) if err := rState.ServiceConfig.Err; err != nil { t.Fatalf("ClientConn.UpdateState received error in service config: %v", rState.ServiceConfig.Err) } wantJSON2 := `{"loadBalancingConfig":[{ "xds_cluster_manager_experimental":{ "children":{ "cluster_specifier_plugin:cspA":{ "childPolicy":[{"csp_experimental":{"arbitrary_field":"anythingA"}}] }, "cluster_specifier_plugin:cspB":{ "childPolicy":[{"csp_experimental":{"arbitrary_field":"anythingB"}}] } } }}]}` wantSCParsed2 := internal.ParseServiceConfig.(func(string) *serviceconfig.ParseResult)(wantJSON2) if !internal.EqualServiceConfigForTesting(rState.ServiceConfig.Config, wantSCParsed2.Config) { t.Errorf("ClientConn.UpdateState received different service config") t.Error("got: ", cmp.Diff(nil, rState.ServiceConfig.Config)) t.Fatal("want: ", cmp.Diff(nil, wantSCParsed2.Config)) } // Invoke OnCommitted; should lead to a service config update that deletes // cspA. res.OnCommitted() xdsC.InvokeWatchRouteConfigCallback("", xdsresource.RouteConfigUpdate{ VirtualHosts: []*xdsresource.VirtualHost{ { Domains: []string{targetStr}, Routes: []*xdsresource.Route{{Prefix: newStringP(""), ClusterSpecifierPlugin: "cspB"}}, }, }, // Top level csp config here - the value of cspB should get directly // placed as a child policy of xds cluster manager. ClusterSpecifierPlugins: map[string]clusterspecifier.BalancerConfig{"cspB": []map[string]interface{}{{"csp_experimental": cspConfig{ArbitraryField: "anythingB"}}}}, }, nil) gotState, err = tcc.stateCh.Receive(ctx) if err != nil { t.Fatalf("Error waiting for UpdateState to be called: %v", err) } rState = gotState.(resolver.State) if err := rState.ServiceConfig.Err; err != nil { t.Fatalf("ClientConn.UpdateState received error in service config: %v", rState.ServiceConfig.Err) } wantJSON3 := `{"loadBalancingConfig":[{ "xds_cluster_manager_experimental":{ "children":{ "cluster_specifier_plugin:cspB":{ "childPolicy":[{"csp_experimental":{"arbitrary_field":"anythingB"}}] } } }}]}` wantSCParsed3 := internal.ParseServiceConfig.(func(string) *serviceconfig.ParseResult)(wantJSON3) if !internal.EqualServiceConfigForTesting(rState.ServiceConfig.Config, wantSCParsed3.Config) { t.Errorf("ClientConn.UpdateState received different service config") t.Error("got: ", cmp.Diff(nil, rState.ServiceConfig.Config)) t.Fatal("want: ", cmp.Diff(nil, wantSCParsed3.Config)) } }