/* * * Copyright 2021 gRPC authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. * */ package csds_test import ( "context" "fmt" "io" "sort" "strings" "testing" "time" "github.com/golang/protobuf/proto" "github.com/google/go-cmp/cmp" "github.com/google/uuid" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" "google.golang.org/grpc/internal/grpctest" "google.golang.org/grpc/internal/testutils" "google.golang.org/grpc/internal/testutils/xds/bootstrap" "google.golang.org/grpc/internal/testutils/xds/e2e" "google.golang.org/grpc/xds/csds" "google.golang.org/grpc/xds/internal/xdsclient" "google.golang.org/grpc/xds/internal/xdsclient/xdsresource" "google.golang.org/protobuf/testing/protocmp" "google.golang.org/protobuf/types/known/anypb" v3adminpb "github.com/envoyproxy/go-control-plane/envoy/admin/v3" v3clusterpb "github.com/envoyproxy/go-control-plane/envoy/config/cluster/v3" v3endpointpb "github.com/envoyproxy/go-control-plane/envoy/config/endpoint/v3" v3listenerpb "github.com/envoyproxy/go-control-plane/envoy/config/listener/v3" v3routepb "github.com/envoyproxy/go-control-plane/envoy/config/route/v3" v3statuspb "github.com/envoyproxy/go-control-plane/envoy/service/status/v3" v3statuspbgrpc "github.com/envoyproxy/go-control-plane/envoy/service/status/v3" _ "google.golang.org/grpc/xds/internal/httpfilter/router" // Register the router filter ) const defaultTestTimeout = 5 * time.Second var cmpOpts = cmp.Options{ cmp.Transformer("sort", func(in []*v3statuspb.ClientConfig_GenericXdsConfig) []*v3statuspb.ClientConfig_GenericXdsConfig { out := append([]*v3statuspb.ClientConfig_GenericXdsConfig(nil), in...) sort.Slice(out, func(i, j int) bool { a, b := out[i], out[j] if a == nil { return true } if b == nil { return false } if strings.Compare(a.TypeUrl, b.TypeUrl) == 0 { return strings.Compare(a.Name, b.Name) < 0 } return strings.Compare(a.TypeUrl, b.TypeUrl) < 0 }) return out }), protocmp.Transform(), protocmp.IgnoreFields((*v3statuspb.ClientConfig_GenericXdsConfig)(nil), "last_updated"), protocmp.IgnoreFields((*v3adminpb.UpdateFailureState)(nil), "last_update_attempt", "details"), } type s struct { grpctest.Tester } func Test(t *testing.T) { grpctest.RunSubTests(t, s{}) } func (s) TestCSDS(t *testing.T) { // Spin up a xDS management server on a local port. nodeID := uuid.New().String() mgmtServer, err := e2e.StartManagementServer(e2e.ManagementServerOptions{}) if err != nil { t.Fatal(err) } defer mgmtServer.Stop() // Create a bootstrap file in a temporary directory. bootstrapCleanup, err := bootstrap.CreateFile(bootstrap.Options{ Version: bootstrap.TransportV3, NodeID: nodeID, ServerURI: mgmtServer.Address, }) if err != nil { t.Fatal(err) } defer bootstrapCleanup() // Create an xDS client. This will end up using the same singleton as used // by the CSDS service. xdsC, close, err := xdsclient.New() if err != nil { t.Fatalf("Failed to create xDS client: %v", err) } defer close() // Initialize an gRPC server and register CSDS on it. server := grpc.NewServer() csdss, err := csds.NewClientStatusDiscoveryServer() if err != nil { t.Fatal(err) } v3statuspbgrpc.RegisterClientStatusDiscoveryServiceServer(server, csdss) defer func() { server.Stop() csdss.Close() }() // Create a local listener and pass it to Serve(). lis, err := testutils.LocalTCPListener() if err != nil { t.Fatalf("testutils.LocalTCPListener() failed: %v", err) } go func() { if err := server.Serve(lis); err != nil { t.Errorf("Serve() failed: %v", err) } }() // Create a client to the CSDS server. conn, err := grpc.Dial(lis.Addr().String(), grpc.WithTransportCredentials(insecure.NewCredentials())) if err != nil { t.Fatalf("Failed to dial CSDS server %q: %v", lis.Addr().String(), err) } c := v3statuspbgrpc.NewClientStatusDiscoveryServiceClient(conn) ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer cancel() stream, err := c.StreamClientStatus(ctx, grpc.WaitForReady(true)) if err != nil { t.Fatalf("Failed to create a stream for CSDS: %v", err) } defer conn.Close() // Verify that the xDS client reports an empty config. if err := checkClientStatusResponse(stream, nil); err != nil { t.Fatal(err) } // Initialize the xDS resources to be used in this test. ldsTargets := []string{"lds.target.good:0000", "lds.target.good:1111"} rdsTargets := []string{"route-config-0", "route-config-1"} cdsTargets := []string{"cluster-0", "cluster-1"} edsTargets := []string{"endpoints-0", "endpoints-1"} listeners := make([]*v3listenerpb.Listener, len(ldsTargets)) listenerAnys := make([]*anypb.Any, len(ldsTargets)) for i := range ldsTargets { listeners[i] = e2e.DefaultClientListener(ldsTargets[i], rdsTargets[i]) listenerAnys[i] = testutils.MarshalAny(listeners[i]) } routes := make([]*v3routepb.RouteConfiguration, len(rdsTargets)) routeAnys := make([]*anypb.Any, len(rdsTargets)) for i := range rdsTargets { routes[i] = e2e.DefaultRouteConfig(rdsTargets[i], ldsTargets[i], cdsTargets[i]) routeAnys[i] = testutils.MarshalAny(routes[i]) } clusters := make([]*v3clusterpb.Cluster, len(cdsTargets)) clusterAnys := make([]*anypb.Any, len(cdsTargets)) for i := range cdsTargets { clusters[i] = e2e.DefaultCluster(cdsTargets[i], edsTargets[i], e2e.SecurityLevelNone) clusterAnys[i] = testutils.MarshalAny(clusters[i]) } endpoints := make([]*v3endpointpb.ClusterLoadAssignment, len(edsTargets)) endpointAnys := make([]*anypb.Any, len(edsTargets)) ips := []string{"0.0.0.0", "1.1.1.1"} ports := []uint32{123, 456} for i := range edsTargets { endpoints[i] = e2e.DefaultEndpoint(edsTargets[i], ips[i], ports[i:i+1]) endpointAnys[i] = testutils.MarshalAny(endpoints[i]) } // Register watches on the xDS client for two resources of each type. for _, target := range ldsTargets { xdsC.WatchListener(target, func(xdsresource.ListenerUpdate, error) {}) } for _, target := range rdsTargets { xdsC.WatchRouteConfig(target, func(xdsresource.RouteConfigUpdate, error) {}) } for _, target := range cdsTargets { xdsC.WatchCluster(target, func(xdsresource.ClusterUpdate, error) {}) } for _, target := range edsTargets { xdsC.WatchEndpoints(target, func(xdsresource.EndpointsUpdate, error) {}) } // Verify that the xDS client reports the resources as being in "Requested" // state. want := []*v3statuspb.ClientConfig_GenericXdsConfig{} for i := range ldsTargets { want = append(want, makeGenericXdsConfig("type.googleapis.com/envoy.config.listener.v3.Listener", ldsTargets[i], "", v3adminpb.ClientResourceStatus_REQUESTED, nil)) } for i := range rdsTargets { want = append(want, makeGenericXdsConfig("type.googleapis.com/envoy.config.route.v3.RouteConfiguration", rdsTargets[i], "", v3adminpb.ClientResourceStatus_REQUESTED, nil)) } for i := range cdsTargets { want = append(want, makeGenericXdsConfig("type.googleapis.com/envoy.config.cluster.v3.Cluster", cdsTargets[i], "", v3adminpb.ClientResourceStatus_REQUESTED, nil)) } for i := range edsTargets { want = append(want, makeGenericXdsConfig("type.googleapis.com/envoy.config.endpoint.v3.ClusterLoadAssignment", edsTargets[i], "", v3adminpb.ClientResourceStatus_REQUESTED, nil)) } for { if err := ctx.Err(); err != nil { t.Fatalf("Timeout when waiting for resources in \"Requested\" state: %v", err) } if err := checkClientStatusResponse(stream, want); err == nil { break } time.Sleep(time.Millisecond * 100) } // Configure the management server with two resources of each type, // corresponding to the watches registered above. if err := mgmtServer.Update(ctx, e2e.UpdateOptions{ NodeID: nodeID, Listeners: listeners, Routes: routes, Clusters: clusters, Endpoints: endpoints, }); err != nil { t.Fatal(err) } // Verify that the xDS client reports the resources as being in "ACKed" // state, and in version "1". want = nil for i := range ldsTargets { want = append(want, makeGenericXdsConfig("type.googleapis.com/envoy.config.listener.v3.Listener", ldsTargets[i], "1", v3adminpb.ClientResourceStatus_ACKED, listenerAnys[i])) } for i := range rdsTargets { want = append(want, makeGenericXdsConfig("type.googleapis.com/envoy.config.route.v3.RouteConfiguration", rdsTargets[i], "1", v3adminpb.ClientResourceStatus_ACKED, routeAnys[i])) } for i := range cdsTargets { want = append(want, makeGenericXdsConfig("type.googleapis.com/envoy.config.cluster.v3.Cluster", cdsTargets[i], "1", v3adminpb.ClientResourceStatus_ACKED, clusterAnys[i])) } for i := range edsTargets { want = append(want, makeGenericXdsConfig("type.googleapis.com/envoy.config.endpoint.v3.ClusterLoadAssignment", edsTargets[i], "1", v3adminpb.ClientResourceStatus_ACKED, endpointAnys[i])) } for { if err := ctx.Err(); err != nil { t.Fatalf("Timeout when waiting for resources in \"ACKed\" state: %v", err) } err := checkClientStatusResponse(stream, want) if err == nil { break } time.Sleep(time.Millisecond * 100) } // Update the first resource of each type in the management server to a // value which is expected to be NACK'ed by the xDS client. const nackResourceIdx = 0 listeners[nackResourceIdx].ApiListener = &v3listenerpb.ApiListener{} routes[nackResourceIdx].VirtualHosts = []*v3routepb.VirtualHost{{Routes: []*v3routepb.Route{{}}}} clusters[nackResourceIdx].ClusterDiscoveryType = &v3clusterpb.Cluster_Type{Type: v3clusterpb.Cluster_STATIC} endpoints[nackResourceIdx].Endpoints = []*v3endpointpb.LocalityLbEndpoints{{}} if err := mgmtServer.Update(ctx, e2e.UpdateOptions{ NodeID: nodeID, Listeners: listeners, Routes: routes, Clusters: clusters, Endpoints: endpoints, SkipValidation: true, }); err != nil { t.Fatal(err) } // Verify that the xDS client reports the first resource of each type as // being in "NACKed" state, and the second resource of each type to be in // "ACKed" state. The version for the ACKed resource would be "2", while // that for the NACKed resource would be "1". In the NACKed resource, the // version which is NACKed is stored in the ErrorState field. want = nil for i := range ldsTargets { config := makeGenericXdsConfig("type.googleapis.com/envoy.config.listener.v3.Listener", ldsTargets[i], "2", v3adminpb.ClientResourceStatus_ACKED, listenerAnys[i]) if i == nackResourceIdx { config.VersionInfo = "1" config.ClientStatus = v3adminpb.ClientResourceStatus_NACKED config.ErrorState = &v3adminpb.UpdateFailureState{VersionInfo: "2"} } want = append(want, config) } for i := range rdsTargets { config := makeGenericXdsConfig("type.googleapis.com/envoy.config.route.v3.RouteConfiguration", rdsTargets[i], "2", v3adminpb.ClientResourceStatus_ACKED, routeAnys[i]) if i == nackResourceIdx { config.VersionInfo = "1" config.ClientStatus = v3adminpb.ClientResourceStatus_NACKED config.ErrorState = &v3adminpb.UpdateFailureState{VersionInfo: "2"} } want = append(want, config) } for i := range cdsTargets { config := makeGenericXdsConfig("type.googleapis.com/envoy.config.cluster.v3.Cluster", cdsTargets[i], "2", v3adminpb.ClientResourceStatus_ACKED, clusterAnys[i]) if i == nackResourceIdx { config.VersionInfo = "1" config.ClientStatus = v3adminpb.ClientResourceStatus_NACKED config.ErrorState = &v3adminpb.UpdateFailureState{VersionInfo: "2"} } want = append(want, config) } for i := range edsTargets { config := makeGenericXdsConfig("type.googleapis.com/envoy.config.endpoint.v3.ClusterLoadAssignment", edsTargets[i], "2", v3adminpb.ClientResourceStatus_ACKED, endpointAnys[i]) if i == nackResourceIdx { config.VersionInfo = "1" config.ClientStatus = v3adminpb.ClientResourceStatus_NACKED config.ErrorState = &v3adminpb.UpdateFailureState{VersionInfo: "2"} } want = append(want, config) } for { if err := ctx.Err(); err != nil { t.Fatalf("Timeout when waiting for resources in \"NACKed\" state: %v", err) } err := checkClientStatusResponse(stream, want) if err == nil { break } time.Sleep(time.Millisecond * 100) } } func makeGenericXdsConfig(typeURL, name, version string, status v3adminpb.ClientResourceStatus, config *anypb.Any) *v3statuspb.ClientConfig_GenericXdsConfig { return &v3statuspb.ClientConfig_GenericXdsConfig{ TypeUrl: typeURL, Name: name, VersionInfo: version, ClientStatus: status, XdsConfig: config, } } func checkClientStatusResponse(stream v3statuspbgrpc.ClientStatusDiscoveryService_StreamClientStatusClient, want []*v3statuspb.ClientConfig_GenericXdsConfig) error { if err := stream.Send(&v3statuspb.ClientStatusRequest{Node: nil}); err != nil { if err != io.EOF { return fmt.Errorf("failed to send ClientStatusRequest: %v", err) } // If the stream has closed, we call Recv() until it returns a non-nil // error to get the actual error on the stream. for { if _, err := stream.Recv(); err != nil { return fmt.Errorf("failed to recv ClientStatusResponse: %v", err) } } } resp, err := stream.Recv() if err != nil { return fmt.Errorf("failed to recv ClientStatusResponse: %v", err) } if n := len(resp.Config); n != 1 { return fmt.Errorf("got %d configs, want 1: %v", n, proto.MarshalTextString(resp)) } if diff := cmp.Diff(resp.Config[0].GenericXdsConfigs, want, cmpOpts); diff != "" { return fmt.Errorf(diff) } return nil } func (s) TestCSDSNoXDSClient(t *testing.T) { // Create a bootstrap file in a temporary directory. Since we pass empty // options, it would end up creating a bootstrap file with an empty // serverURI which will fail xDS client creation. bootstrapCleanup, err := bootstrap.CreateFile(bootstrap.Options{}) if err != nil { t.Fatal(err) } t.Cleanup(func() { bootstrapCleanup() }) // Initialize an gRPC server and register CSDS on it. server := grpc.NewServer() csdss, err := csds.NewClientStatusDiscoveryServer() if err != nil { t.Fatal(err) } defer csdss.Close() v3statuspbgrpc.RegisterClientStatusDiscoveryServiceServer(server, csdss) // Create a local listener and pass it to Serve(). lis, err := testutils.LocalTCPListener() if err != nil { t.Fatalf("testutils.LocalTCPListener() failed: %v", err) } go func() { if err := server.Serve(lis); err != nil { t.Errorf("Serve() failed: %v", err) } }() defer server.Stop() // Create a client to the CSDS server. conn, err := grpc.Dial(lis.Addr().String(), grpc.WithTransportCredentials(insecure.NewCredentials())) if err != nil { t.Fatalf("Failed to dial CSDS server %q: %v", lis.Addr().String(), err) } defer conn.Close() c := v3statuspbgrpc.NewClientStatusDiscoveryServiceClient(conn) ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer cancel() stream, err := c.StreamClientStatus(ctx, grpc.WaitForReady(true)) if err != nil { t.Fatalf("Failed to create a stream for CSDS: %v", err) } if err := stream.Send(&v3statuspb.ClientStatusRequest{Node: nil}); err != nil { t.Fatalf("Failed to send ClientStatusRequest: %v", err) } r, err := stream.Recv() if err != nil { // io.EOF is not ok. t.Fatalf("Failed to recv ClientStatusResponse: %v", err) } if n := len(r.Config); n != 0 { t.Fatalf("got %d configs, want 0: %v", n, proto.MarshalTextString(r)) } }