/* * * Copyright 2020 gRPC authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. * */ package rls import ( "context" "net" "testing" "time" "google.golang.org/grpc" "google.golang.org/grpc/balancer" "google.golang.org/grpc/balancer/rls/internal/testutils/fakeserver" "google.golang.org/grpc/credentials" "google.golang.org/grpc/internal/grpctest" "google.golang.org/grpc/internal/testutils" "google.golang.org/grpc/testdata" ) const defaultTestTimeout = 1 * time.Second type s struct { grpctest.Tester } func Test(t *testing.T) { grpctest.RunSubTests(t, s{}) } type listenerWrapper struct { net.Listener connCh *testutils.Channel } // Accept waits for and returns the next connection to the listener. func (l *listenerWrapper) Accept() (net.Conn, error) { c, err := l.Listener.Accept() if err != nil { return nil, err } l.connCh.Send(c) return c, nil } func setupwithListener(t *testing.T, opts ...grpc.ServerOption) (*fakeserver.Server, *listenerWrapper, func()) { t.Helper() l, err := net.Listen("tcp", "localhost:0") if err != nil { t.Fatalf("net.Listen(tcp, localhost:0): %v", err) } lw := &listenerWrapper{ Listener: l, connCh: testutils.NewChannel(), } server, cleanup, err := fakeserver.Start(lw, opts...) if err != nil { t.Fatalf("fakeserver.Start(): %v", err) } t.Logf("Fake RLS server started at %s ...", server.Address) return server, lw, cleanup } type testBalancerCC struct { balancer.ClientConn } // TestUpdateControlChannelFirstConfig tests the scenario where the LB policy // receives its first service config and verifies that a control channel to the // RLS server specified in the serviceConfig is established. func (s) TestUpdateControlChannelFirstConfig(t *testing.T) { server, lis, cleanup := setupwithListener(t) defer cleanup() bb := balancer.Get(rlsBalancerName) if bb == nil { t.Fatalf("balancer.Get(%s) = nil", rlsBalancerName) } rlsB := bb.Build(&testBalancerCC{}, balancer.BuildOptions{}) defer rlsB.Close() t.Log("Built RLS LB policy ...") lbCfg := &lbConfig{lookupService: server.Address} t.Logf("Sending service config %+v to RLS LB policy ...", lbCfg) rlsB.UpdateClientConnState(balancer.ClientConnState{BalancerConfig: lbCfg}) ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer cancel() if _, err := lis.connCh.Receive(ctx); err != nil { t.Fatal("Timeout expired when waiting for LB policy to create control channel") } // TODO: Verify channel connectivity state once control channel connectivity // state monitoring is in place. // TODO: Verify RLS RPC can be made once we integrate with the picker. } // TestUpdateControlChannelSwitch tests the scenario where a control channel // exists and the LB policy receives a new serviceConfig with a different RLS // server name. Verifies that the new control channel is created and the old one // is closed (the leakchecker takes care of this). func (s) TestUpdateControlChannelSwitch(t *testing.T) { server1, lis1, cleanup1 := setupwithListener(t) defer cleanup1() server2, lis2, cleanup2 := setupwithListener(t) defer cleanup2() bb := balancer.Get(rlsBalancerName) if bb == nil { t.Fatalf("balancer.Get(%s) = nil", rlsBalancerName) } rlsB := bb.Build(&testBalancerCC{}, balancer.BuildOptions{}) defer rlsB.Close() t.Log("Built RLS LB policy ...") lbCfg := &lbConfig{lookupService: server1.Address} t.Logf("Sending service config %+v to RLS LB policy ...", lbCfg) rlsB.UpdateClientConnState(balancer.ClientConnState{BalancerConfig: lbCfg}) ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer cancel() if _, err := lis1.connCh.Receive(ctx); err != nil { t.Fatal("Timeout expired when waiting for LB policy to create control channel") } lbCfg = &lbConfig{lookupService: server2.Address} t.Logf("Sending service config %+v to RLS LB policy ...", lbCfg) rlsB.UpdateClientConnState(balancer.ClientConnState{BalancerConfig: lbCfg}) if _, err := lis2.connCh.Receive(ctx); err != nil { t.Fatal("Timeout expired when waiting for LB policy to create control channel") } // TODO: Verify channel connectivity state once control channel connectivity // state monitoring is in place. // TODO: Verify RLS RPC can be made once we integrate with the picker. } // TestUpdateControlChannelTimeout tests the scenario where the LB policy // receives a service config update with a different lookupServiceTimeout, but // the lookupService itself remains unchanged. It verifies that the LB policy // does not create a new control channel in this case. func (s) TestUpdateControlChannelTimeout(t *testing.T) { server, lis, cleanup := setupwithListener(t) defer cleanup() bb := balancer.Get(rlsBalancerName) if bb == nil { t.Fatalf("balancer.Get(%s) = nil", rlsBalancerName) } rlsB := bb.Build(&testBalancerCC{}, balancer.BuildOptions{}) defer rlsB.Close() t.Log("Built RLS LB policy ...") lbCfg := &lbConfig{lookupService: server.Address, lookupServiceTimeout: 1 * time.Second} t.Logf("Sending service config %+v to RLS LB policy ...", lbCfg) rlsB.UpdateClientConnState(balancer.ClientConnState{BalancerConfig: lbCfg}) ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer cancel() if _, err := lis.connCh.Receive(ctx); err != nil { t.Fatal("Timeout expired when waiting for LB policy to create control channel") } lbCfg = &lbConfig{lookupService: server.Address, lookupServiceTimeout: 2 * time.Second} t.Logf("Sending service config %+v to RLS LB policy ...", lbCfg) rlsB.UpdateClientConnState(balancer.ClientConnState{BalancerConfig: lbCfg}) if _, err := lis.connCh.Receive(ctx); err != context.DeadlineExceeded { t.Fatal("LB policy created new control channel when only lookupServiceTimeout changed") } // TODO: Verify channel connectivity state once control channel connectivity // state monitoring is in place. // TODO: Verify RLS RPC can be made once we integrate with the picker. } // TestUpdateControlChannelWithCreds tests the scenario where the control // channel is to established with credentials from the parent channel. func (s) TestUpdateControlChannelWithCreds(t *testing.T) { sCreds, err := credentials.NewServerTLSFromFile(testdata.Path("x509/server1_cert.pem"), testdata.Path("x509/server1_key.pem")) if err != nil { t.Fatalf("credentials.NewServerTLSFromFile(server1.pem, server1.key) = %v", err) } cCreds, err := credentials.NewClientTLSFromFile(testdata.Path("x509/server_ca_cert.pem"), "") if err != nil { t.Fatalf("credentials.NewClientTLSFromFile(ca.pem) = %v", err) } server, lis, cleanup := setupwithListener(t, grpc.Creds(sCreds)) defer cleanup() bb := balancer.Get(rlsBalancerName) if bb == nil { t.Fatalf("balancer.Get(%s) = nil", rlsBalancerName) } rlsB := bb.Build(&testBalancerCC{}, balancer.BuildOptions{ DialCreds: cCreds, }) defer rlsB.Close() t.Log("Built RLS LB policy ...") lbCfg := &lbConfig{lookupService: server.Address} t.Logf("Sending service config %+v to RLS LB policy ...", lbCfg) rlsB.UpdateClientConnState(balancer.ClientConnState{BalancerConfig: lbCfg}) ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer cancel() if _, err := lis.connCh.Receive(ctx); err != nil { t.Fatal("Timeout expired when waiting for LB policy to create control channel") } // TODO: Verify channel connectivity state once control channel connectivity // state monitoring is in place. // TODO: Verify RLS RPC can be made once we integrate with the picker. }