/* * * Copyright 2020 gRPC authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. * */ package weightedtarget import ( "context" "encoding/json" "errors" "fmt" "strings" "testing" "time" "github.com/google/go-cmp/cmp" "google.golang.org/grpc/attributes" "google.golang.org/grpc/balancer" "google.golang.org/grpc/balancer/roundrobin" "google.golang.org/grpc/connectivity" "google.golang.org/grpc/internal/balancer/stub" "google.golang.org/grpc/internal/balancergroup" "google.golang.org/grpc/internal/grpctest" "google.golang.org/grpc/internal/hierarchy" "google.golang.org/grpc/internal/testutils" "google.golang.org/grpc/resolver" "google.golang.org/grpc/serviceconfig" ) const ( defaultTestTimeout = 5 * time.Second ) type s struct { grpctest.Tester } func Test(t *testing.T) { grpctest.RunSubTests(t, s{}) } type testConfigBalancerBuilder struct { balancer.Builder } func newTestConfigBalancerBuilder() *testConfigBalancerBuilder { return &testConfigBalancerBuilder{ Builder: balancer.Get(roundrobin.Name), } } // pickAndCheckError returns a function which takes a picker, invokes the Pick() method // multiple times and ensures that the error returned by the picker matches the provided error. func pickAndCheckError(want error) func(balancer.Picker) error { const rpcCount = 5 return func(p balancer.Picker) error { for i := 0; i < rpcCount; i++ { if _, err := p.Pick(balancer.PickInfo{}); err == nil || !strings.Contains(err.Error(), want.Error()) { return fmt.Errorf("picker.Pick() returned error: %v, want: %v", err, want) } } return nil } } func (t *testConfigBalancerBuilder) Build(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.Balancer { rr := t.Builder.Build(cc, opts) return &testConfigBalancer{ Balancer: rr, } } const testConfigBalancerName = "test_config_balancer" func (t *testConfigBalancerBuilder) Name() string { return testConfigBalancerName } type stringBalancerConfig struct { serviceconfig.LoadBalancingConfig configStr string } func (t *testConfigBalancerBuilder) ParseConfig(c json.RawMessage) (serviceconfig.LoadBalancingConfig, error) { var cfg string if err := json.Unmarshal(c, &cfg); err != nil { return nil, fmt.Errorf("failed to unmarshal config in %q: %v", testConfigBalancerName, err) } return stringBalancerConfig{configStr: cfg}, nil } // testConfigBalancer is a roundrobin balancer, but it takes the balancer config // string and adds it as an address attribute to the backend addresses. type testConfigBalancer struct { balancer.Balancer } // configKey is the type used as the key to store balancer config in the // Attributes field of resolver.Address. type configKey struct{} func setConfigKey(addr resolver.Address, config string) resolver.Address { addr.Attributes = addr.Attributes.WithValue(configKey{}, config) return addr } func getConfigKey(attr *attributes.Attributes) (string, bool) { v := attr.Value(configKey{}) name, ok := v.(string) return name, ok } func (b *testConfigBalancer) UpdateClientConnState(s balancer.ClientConnState) error { c, ok := s.BalancerConfig.(stringBalancerConfig) if !ok { return fmt.Errorf("unexpected balancer config with type %T", s.BalancerConfig) } addrsWithAttr := make([]resolver.Address, len(s.ResolverState.Addresses)) for i, addr := range s.ResolverState.Addresses { addrsWithAttr[i] = setConfigKey(addr, c.configStr) } s.BalancerConfig = nil s.ResolverState.Addresses = addrsWithAttr return b.Balancer.UpdateClientConnState(s) } func (b *testConfigBalancer) Close() { b.Balancer.Close() } var ( wtbBuilder balancer.Builder wtbParser balancer.ConfigParser testBackendAddrStrs []string ) const testBackendAddrsCount = 12 func init() { balancer.Register(newTestConfigBalancerBuilder()) for i := 0; i < testBackendAddrsCount; i++ { testBackendAddrStrs = append(testBackendAddrStrs, fmt.Sprintf("%d.%d.%d.%d:%d", i, i, i, i, i)) } wtbBuilder = balancer.Get(Name) wtbParser = wtbBuilder.(balancer.ConfigParser) balancergroup.DefaultSubBalancerCloseTimeout = time.Millisecond NewRandomWRR = testutils.NewTestWRR } // TestWeightedTarget covers the cases that a sub-balancer is added and a // sub-balancer is removed. It verifies that the addresses and balancer configs // are forwarded to the right sub-balancer. This test is intended to test the // glue code in weighted_target. func (s) TestWeightedTarget(t *testing.T) { cc := testutils.NewTestClientConn(t) wtb := wtbBuilder.Build(cc, balancer.BuildOptions{}) defer wtb.Close() // Start with "cluster_1: round_robin". config1, err := wtbParser.ParseConfig([]byte(` { "targets": { "cluster_1": { "weight":1, "childPolicy": [{"round_robin": ""}] } } }`)) if err != nil { t.Fatalf("failed to parse balancer config: %v", err) } // Send the config, and an address with hierarchy path ["cluster_1"]. addr1 := resolver.Address{Addr: testBackendAddrStrs[1], Attributes: nil} if err := wtb.UpdateClientConnState(balancer.ClientConnState{ ResolverState: resolver.State{Addresses: []resolver.Address{hierarchy.Set(addr1, []string{"cluster_1"})}}, BalancerConfig: config1, }); err != nil { t.Fatalf("failed to update ClientConn state: %v", err) } verifyAddressInNewSubConn(t, cc, addr1) // Send subconn state change. sc1 := <-cc.NewSubConnCh wtb.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Connecting}) <-cc.NewPickerCh wtb.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Ready}) p := <-cc.NewPickerCh // Test pick with one backend. for i := 0; i < 5; i++ { gotSCSt, _ := p.Pick(balancer.PickInfo{}) if !cmp.Equal(gotSCSt.SubConn, sc1, cmp.AllowUnexported(testutils.TestSubConn{})) { t.Fatalf("picker.Pick, got %v, want SubConn=%v", gotSCSt, sc1) } } // Remove cluster_1, and add "cluster_2: test_config_balancer". The // test_config_balancer adds an address attribute whose value is set to the // config that is passed to it. config2, err := wtbParser.ParseConfig([]byte(` { "targets": { "cluster_2": { "weight":1, "childPolicy": [{"test_config_balancer": "cluster_2"}] } } }`)) if err != nil { t.Fatalf("failed to parse balancer config: %v", err) } // Send the config, and one address with hierarchy path "cluster_2". addr2 := resolver.Address{Addr: testBackendAddrStrs[2], Attributes: nil} if err := wtb.UpdateClientConnState(balancer.ClientConnState{ ResolverState: resolver.State{Addresses: []resolver.Address{hierarchy.Set(addr2, []string{"cluster_2"})}}, BalancerConfig: config2, }); err != nil { t.Fatalf("failed to update ClientConn state: %v", err) } // Expect a new subConn from the test_config_balancer which has an address // attribute set to the config that was passed to it. verifyAddressInNewSubConn(t, cc, setConfigKey(addr2, "cluster_2")) // The subconn for cluster_1 should be removed. scRemoved := <-cc.RemoveSubConnCh if !cmp.Equal(scRemoved, sc1, cmp.AllowUnexported(testutils.TestSubConn{})) { t.Fatalf("RemoveSubConn, want %v, got %v", sc1, scRemoved) } wtb.UpdateSubConnState(scRemoved, balancer.SubConnState{ConnectivityState: connectivity.Shutdown}) sc2 := <-cc.NewSubConnCh wtb.UpdateSubConnState(sc2, balancer.SubConnState{ConnectivityState: connectivity.Connecting}) <-cc.NewPickerCh wtb.UpdateSubConnState(sc2, balancer.SubConnState{ConnectivityState: connectivity.Ready}) p = <-cc.NewPickerCh // Test pick with one backend. for i := 0; i < 5; i++ { gotSCSt, _ := p.Pick(balancer.PickInfo{}) if !cmp.Equal(gotSCSt.SubConn, sc2, cmp.AllowUnexported(testutils.TestSubConn{})) { t.Fatalf("picker.Pick, got %v, want SubConn=%v", gotSCSt, sc2) } } // Replace child policy of "cluster_1" to "round_robin". config3, err := wtbParser.ParseConfig([]byte(` { "targets": { "cluster_2": { "weight":1, "childPolicy": [{"round_robin": ""}] } } }`)) if err != nil { t.Fatalf("failed to parse balancer config: %v", err) } // Send the config, and an address with hierarchy path ["cluster_2"]. addr3 := resolver.Address{Addr: testBackendAddrStrs[3], Attributes: nil} if err := wtb.UpdateClientConnState(balancer.ClientConnState{ ResolverState: resolver.State{Addresses: []resolver.Address{hierarchy.Set(addr3, []string{"cluster_2"})}}, BalancerConfig: config3, }); err != nil { t.Fatalf("failed to update ClientConn state: %v", err) } verifyAddressInNewSubConn(t, cc, addr3) // The subconn from the test_config_balancer should be removed. scRemoved = <-cc.RemoveSubConnCh if !cmp.Equal(scRemoved, sc2, cmp.AllowUnexported(testutils.TestSubConn{})) { t.Fatalf("RemoveSubConn, want %v, got %v", sc1, scRemoved) } wtb.UpdateSubConnState(scRemoved, balancer.SubConnState{ConnectivityState: connectivity.Shutdown}) // Send subconn state change. sc3 := <-cc.NewSubConnCh wtb.UpdateSubConnState(sc3, balancer.SubConnState{ConnectivityState: connectivity.Connecting}) <-cc.NewPickerCh wtb.UpdateSubConnState(sc3, balancer.SubConnState{ConnectivityState: connectivity.Ready}) p = <-cc.NewPickerCh // Test pick with one backend. for i := 0; i < 5; i++ { gotSCSt, _ := p.Pick(balancer.PickInfo{}) if !cmp.Equal(gotSCSt.SubConn, sc3, cmp.AllowUnexported(testutils.TestSubConn{})) { t.Fatalf("picker.Pick, got %v, want SubConn=%v", gotSCSt, sc3) } } } // TestWeightedTarget_OneSubBalancer_AddRemoveBackend tests the case where we // have a weighted target balancer will one sub-balancer, and we add and remove // backends from the subBalancer. func (s) TestWeightedTarget_OneSubBalancer_AddRemoveBackend(t *testing.T) { cc := testutils.NewTestClientConn(t) wtb := wtbBuilder.Build(cc, balancer.BuildOptions{}) defer wtb.Close() // Start with "cluster_1: round_robin". config, err := wtbParser.ParseConfig([]byte(` { "targets": { "cluster_1": { "weight":1, "childPolicy": [{"round_robin": ""}] } } }`)) if err != nil { t.Fatalf("failed to parse balancer config: %v", err) } // Send the config, and an address with hierarchy path ["cluster_1"]. addr1 := resolver.Address{Addr: testBackendAddrStrs[1]} if err := wtb.UpdateClientConnState(balancer.ClientConnState{ ResolverState: resolver.State{Addresses: []resolver.Address{hierarchy.Set(addr1, []string{"cluster_1"})}}, BalancerConfig: config, }); err != nil { t.Fatalf("failed to update ClientConn state: %v", err) } verifyAddressInNewSubConn(t, cc, addr1) // Expect one SubConn, and move it to READY. sc1 := <-cc.NewSubConnCh wtb.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Connecting}) <-cc.NewPickerCh wtb.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Ready}) p := <-cc.NewPickerCh // Test pick with one backend. for i := 0; i < 5; i++ { gotSCSt, _ := p.Pick(balancer.PickInfo{}) if !cmp.Equal(gotSCSt.SubConn, sc1, cmp.AllowUnexported(testutils.TestSubConn{})) { t.Fatalf("picker.Pick, got %v, want SubConn=%v", gotSCSt, sc1) } } // Send two addresses. addr2 := resolver.Address{Addr: testBackendAddrStrs[2]} if err := wtb.UpdateClientConnState(balancer.ClientConnState{ ResolverState: resolver.State{Addresses: []resolver.Address{ hierarchy.Set(addr1, []string{"cluster_1"}), hierarchy.Set(addr2, []string{"cluster_1"}), }}, BalancerConfig: config, }); err != nil { t.Fatalf("failed to update ClientConn state: %v", err) } verifyAddressInNewSubConn(t, cc, addr2) // Expect one new SubConn, and move it to READY. sc2 := <-cc.NewSubConnCh // Update the SubConn to become READY. wtb.UpdateSubConnState(sc2, balancer.SubConnState{ConnectivityState: connectivity.Connecting}) <-cc.NewPickerCh wtb.UpdateSubConnState(sc2, balancer.SubConnState{ConnectivityState: connectivity.Ready}) p = <-cc.NewPickerCh // Test round robin pick. want := []balancer.SubConn{sc1, sc2} if err := testutils.IsRoundRobin(want, testutils.SubConnFromPicker(p)); err != nil { t.Fatalf("want %v, got %v", want, err) } // Remove the first address. if err := wtb.UpdateClientConnState(balancer.ClientConnState{ ResolverState: resolver.State{Addresses: []resolver.Address{hierarchy.Set(addr2, []string{"cluster_1"})}}, BalancerConfig: config, }); err != nil { t.Fatalf("failed to update ClientConn state: %v", err) } // Expect one SubConn to be removed. scRemoved := <-cc.RemoveSubConnCh if !cmp.Equal(scRemoved, sc1, cmp.AllowUnexported(testutils.TestSubConn{})) { t.Fatalf("RemoveSubConn, want %v, got %v", sc1, scRemoved) } wtb.UpdateSubConnState(scRemoved, balancer.SubConnState{ConnectivityState: connectivity.Shutdown}) p = <-cc.NewPickerCh // Test pick with only the second SubConn. for i := 0; i < 5; i++ { gotSC, _ := p.Pick(balancer.PickInfo{}) if !cmp.Equal(gotSC.SubConn, sc2, cmp.AllowUnexported(testutils.TestSubConn{})) { t.Fatalf("picker.Pick, got %v, want SubConn=%v", gotSC, sc2) } } } // TestWeightedTarget_TwoSubBalancers_OneBackend tests the case where we have a // weighted target balancer with two sub-balancers, each with one backend. func (s) TestWeightedTarget_TwoSubBalancers_OneBackend(t *testing.T) { cc := testutils.NewTestClientConn(t) wtb := wtbBuilder.Build(cc, balancer.BuildOptions{}) defer wtb.Close() // Start with "cluster_1: test_config_balancer, cluster_2: test_config_balancer". config, err := wtbParser.ParseConfig([]byte(` { "targets": { "cluster_1": { "weight":1, "childPolicy": [{"test_config_balancer": "cluster_1"}] }, "cluster_2": { "weight":1, "childPolicy": [{"test_config_balancer": "cluster_2"}] } } }`)) if err != nil { t.Fatalf("failed to parse balancer config: %v", err) } // Send the config with one address for each cluster. addr1 := resolver.Address{Addr: testBackendAddrStrs[1]} addr2 := resolver.Address{Addr: testBackendAddrStrs[2]} if err := wtb.UpdateClientConnState(balancer.ClientConnState{ ResolverState: resolver.State{Addresses: []resolver.Address{ hierarchy.Set(addr1, []string{"cluster_1"}), hierarchy.Set(addr2, []string{"cluster_2"}), }}, BalancerConfig: config, }); err != nil { t.Fatalf("failed to update ClientConn state: %v", err) } scs := waitForNewSubConns(t, cc, 2) verifySubConnAddrs(t, scs, map[string][]resolver.Address{ "cluster_1": {addr1}, "cluster_2": {addr2}, }) // We expect a single subConn on each subBalancer. sc1 := scs["cluster_1"][0].sc sc2 := scs["cluster_2"][0].sc // Send state changes for both SubConns, and wait for the picker. wtb.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Connecting}) <-cc.NewPickerCh wtb.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Ready}) <-cc.NewPickerCh wtb.UpdateSubConnState(sc2, balancer.SubConnState{ConnectivityState: connectivity.Connecting}) <-cc.NewPickerCh wtb.UpdateSubConnState(sc2, balancer.SubConnState{ConnectivityState: connectivity.Ready}) p := <-cc.NewPickerCh // Test roundrobin on the last picker. want := []balancer.SubConn{sc1, sc2} if err := testutils.IsRoundRobin(want, testutils.SubConnFromPicker(p)); err != nil { t.Fatalf("want %v, got %v", want, err) } } // TestWeightedTarget_TwoSubBalancers_MoreBackends tests the case where we have // a weighted target balancer with two sub-balancers, each with more than one // backend. func (s) TestWeightedTarget_TwoSubBalancers_MoreBackends(t *testing.T) { cc := testutils.NewTestClientConn(t) wtb := wtbBuilder.Build(cc, balancer.BuildOptions{}) defer wtb.Close() // Start with "cluster_1: round_robin, cluster_2: round_robin". config, err := wtbParser.ParseConfig([]byte(` { "targets": { "cluster_1": { "weight":1, "childPolicy": [{"test_config_balancer": "cluster_1"}] }, "cluster_2": { "weight":1, "childPolicy": [{"test_config_balancer": "cluster_2"}] } } }`)) if err != nil { t.Fatalf("failed to parse balancer config: %v", err) } // Send the config with two backends for each cluster. addr1 := resolver.Address{Addr: testBackendAddrStrs[1]} addr2 := resolver.Address{Addr: testBackendAddrStrs[2]} addr3 := resolver.Address{Addr: testBackendAddrStrs[3]} addr4 := resolver.Address{Addr: testBackendAddrStrs[4]} if err := wtb.UpdateClientConnState(balancer.ClientConnState{ ResolverState: resolver.State{Addresses: []resolver.Address{ hierarchy.Set(addr1, []string{"cluster_1"}), hierarchy.Set(addr2, []string{"cluster_1"}), hierarchy.Set(addr3, []string{"cluster_2"}), hierarchy.Set(addr4, []string{"cluster_2"}), }}, BalancerConfig: config, }); err != nil { t.Fatalf("failed to update ClientConn state: %v", err) } scs := waitForNewSubConns(t, cc, 4) verifySubConnAddrs(t, scs, map[string][]resolver.Address{ "cluster_1": {addr1, addr2}, "cluster_2": {addr3, addr4}, }) // We expect two subConns on each subBalancer. sc1 := scs["cluster_1"][0].sc sc2 := scs["cluster_1"][1].sc sc3 := scs["cluster_2"][0].sc sc4 := scs["cluster_2"][1].sc // Send state changes for all SubConns, and wait for the picker. wtb.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Connecting}) <-cc.NewPickerCh wtb.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Ready}) <-cc.NewPickerCh wtb.UpdateSubConnState(sc2, balancer.SubConnState{ConnectivityState: connectivity.Connecting}) <-cc.NewPickerCh wtb.UpdateSubConnState(sc2, balancer.SubConnState{ConnectivityState: connectivity.Ready}) <-cc.NewPickerCh wtb.UpdateSubConnState(sc3, balancer.SubConnState{ConnectivityState: connectivity.Connecting}) <-cc.NewPickerCh wtb.UpdateSubConnState(sc3, balancer.SubConnState{ConnectivityState: connectivity.Ready}) <-cc.NewPickerCh wtb.UpdateSubConnState(sc4, balancer.SubConnState{ConnectivityState: connectivity.Connecting}) <-cc.NewPickerCh wtb.UpdateSubConnState(sc4, balancer.SubConnState{ConnectivityState: connectivity.Ready}) p := <-cc.NewPickerCh // Test roundrobin on the last picker. RPCs should be sent equally to all // backends. want := []balancer.SubConn{sc1, sc2, sc3, sc4} if err := testutils.IsRoundRobin(want, testutils.SubConnFromPicker(p)); err != nil { t.Fatalf("want %v, got %v", want, err) } // Turn sc2's connection down, should be RR between balancers. wtb.UpdateSubConnState(sc2, balancer.SubConnState{ConnectivityState: connectivity.TransientFailure}) p = <-cc.NewPickerCh want = []balancer.SubConn{sc1, sc1, sc3, sc4} if err := testutils.IsRoundRobin(want, testutils.SubConnFromPicker(p)); err != nil { t.Fatalf("want %v, got %v", want, err) } // Remove subConn corresponding to addr3. if err := wtb.UpdateClientConnState(balancer.ClientConnState{ ResolverState: resolver.State{Addresses: []resolver.Address{ hierarchy.Set(addr1, []string{"cluster_1"}), hierarchy.Set(addr2, []string{"cluster_1"}), hierarchy.Set(addr4, []string{"cluster_2"}), }}, BalancerConfig: config, }); err != nil { t.Fatalf("failed to update ClientConn state: %v", err) } scRemoved := <-cc.RemoveSubConnCh if !cmp.Equal(scRemoved, sc3, cmp.AllowUnexported(testutils.TestSubConn{})) { t.Fatalf("RemoveSubConn, want %v, got %v", sc3, scRemoved) } wtb.UpdateSubConnState(scRemoved, balancer.SubConnState{ConnectivityState: connectivity.Shutdown}) p = <-cc.NewPickerCh want = []balancer.SubConn{sc1, sc4} if err := testutils.IsRoundRobin(want, testutils.SubConnFromPicker(p)); err != nil { t.Fatalf("want %v, got %v", want, err) } // Turn sc1's connection down. wantSubConnErr := errors.New("subConn connection error") wtb.UpdateSubConnState(sc1, balancer.SubConnState{ ConnectivityState: connectivity.TransientFailure, ConnectionError: wantSubConnErr, }) p = <-cc.NewPickerCh want = []balancer.SubConn{sc4} if err := testutils.IsRoundRobin(want, testutils.SubConnFromPicker(p)); err != nil { t.Fatalf("want %v, got %v", want, err) } // Turn last connection to connecting. wtb.UpdateSubConnState(sc4, balancer.SubConnState{ConnectivityState: connectivity.Connecting}) p = <-cc.NewPickerCh for i := 0; i < 5; i++ { if _, err := p.Pick(balancer.PickInfo{}); err != balancer.ErrNoSubConnAvailable { t.Fatalf("want pick error %v, got %v", balancer.ErrNoSubConnAvailable, err) } } // Turn all connections down. wtb.UpdateSubConnState(sc4, balancer.SubConnState{ ConnectivityState: connectivity.TransientFailure, ConnectionError: wantSubConnErr, }) ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer cancel() if err := cc.WaitForPicker(ctx, pickAndCheckError(wantSubConnErr)); err != nil { t.Fatal(err) } } // TestWeightedTarget_TwoSubBalancers_DifferentWeight_MoreBackends tests the // case where we have a weighted target balancer with two sub-balancers of // differing weights. func (s) TestWeightedTarget_TwoSubBalancers_DifferentWeight_MoreBackends(t *testing.T) { cc := testutils.NewTestClientConn(t) wtb := wtbBuilder.Build(cc, balancer.BuildOptions{}) defer wtb.Close() // Start with two subBalancers, one with twice the weight of the other. config, err := wtbParser.ParseConfig([]byte(` { "targets": { "cluster_1": { "weight": 2, "childPolicy": [{"test_config_balancer": "cluster_1"}] }, "cluster_2": { "weight": 1, "childPolicy": [{"test_config_balancer": "cluster_2"}] } } }`)) if err != nil { t.Fatalf("failed to parse balancer config: %v", err) } // Send the config with two backends for each cluster. addr1 := resolver.Address{Addr: testBackendAddrStrs[1]} addr2 := resolver.Address{Addr: testBackendAddrStrs[2]} addr3 := resolver.Address{Addr: testBackendAddrStrs[3]} addr4 := resolver.Address{Addr: testBackendAddrStrs[4]} if err := wtb.UpdateClientConnState(balancer.ClientConnState{ ResolverState: resolver.State{Addresses: []resolver.Address{ hierarchy.Set(addr1, []string{"cluster_1"}), hierarchy.Set(addr2, []string{"cluster_1"}), hierarchy.Set(addr3, []string{"cluster_2"}), hierarchy.Set(addr4, []string{"cluster_2"}), }}, BalancerConfig: config, }); err != nil { t.Fatalf("failed to update ClientConn state: %v", err) } scs := waitForNewSubConns(t, cc, 4) verifySubConnAddrs(t, scs, map[string][]resolver.Address{ "cluster_1": {addr1, addr2}, "cluster_2": {addr3, addr4}, }) // We expect two subConns on each subBalancer. sc1 := scs["cluster_1"][0].sc sc2 := scs["cluster_1"][1].sc sc3 := scs["cluster_2"][0].sc sc4 := scs["cluster_2"][1].sc // Send state changes for all SubConns, and wait for the picker. wtb.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Connecting}) <-cc.NewPickerCh wtb.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Ready}) <-cc.NewPickerCh wtb.UpdateSubConnState(sc2, balancer.SubConnState{ConnectivityState: connectivity.Connecting}) <-cc.NewPickerCh wtb.UpdateSubConnState(sc2, balancer.SubConnState{ConnectivityState: connectivity.Ready}) <-cc.NewPickerCh wtb.UpdateSubConnState(sc3, balancer.SubConnState{ConnectivityState: connectivity.Connecting}) <-cc.NewPickerCh wtb.UpdateSubConnState(sc3, balancer.SubConnState{ConnectivityState: connectivity.Ready}) <-cc.NewPickerCh wtb.UpdateSubConnState(sc4, balancer.SubConnState{ConnectivityState: connectivity.Connecting}) <-cc.NewPickerCh wtb.UpdateSubConnState(sc4, balancer.SubConnState{ConnectivityState: connectivity.Ready}) p := <-cc.NewPickerCh // Test roundrobin on the last picker. Twice the number of RPCs should be // sent to cluster_1 when compared to cluster_2. want := []balancer.SubConn{sc1, sc1, sc2, sc2, sc3, sc4} if err := testutils.IsRoundRobin(want, testutils.SubConnFromPicker(p)); err != nil { t.Fatalf("want %v, got %v", want, err) } } // TestWeightedTarget_ThreeSubBalancers_RemoveBalancer tests the case where we // have a weighted target balancer with three sub-balancers and we remove one of // the subBalancers. func (s) TestWeightedTarget_ThreeSubBalancers_RemoveBalancer(t *testing.T) { cc := testutils.NewTestClientConn(t) wtb := wtbBuilder.Build(cc, balancer.BuildOptions{}) defer wtb.Close() // Start with two subBalancers, one with twice the weight of the other. config, err := wtbParser.ParseConfig([]byte(` { "targets": { "cluster_1": { "weight": 1, "childPolicy": [{"test_config_balancer": "cluster_1"}] }, "cluster_2": { "weight": 1, "childPolicy": [{"test_config_balancer": "cluster_2"}] }, "cluster_3": { "weight": 1, "childPolicy": [{"test_config_balancer": "cluster_3"}] } } }`)) if err != nil { t.Fatalf("failed to parse balancer config: %v", err) } // Send the config with one backend for each cluster. addr1 := resolver.Address{Addr: testBackendAddrStrs[1]} addr2 := resolver.Address{Addr: testBackendAddrStrs[2]} addr3 := resolver.Address{Addr: testBackendAddrStrs[3]} if err := wtb.UpdateClientConnState(balancer.ClientConnState{ ResolverState: resolver.State{Addresses: []resolver.Address{ hierarchy.Set(addr1, []string{"cluster_1"}), hierarchy.Set(addr2, []string{"cluster_2"}), hierarchy.Set(addr3, []string{"cluster_3"}), }}, BalancerConfig: config, }); err != nil { t.Fatalf("failed to update ClientConn state: %v", err) } scs := waitForNewSubConns(t, cc, 3) verifySubConnAddrs(t, scs, map[string][]resolver.Address{ "cluster_1": {addr1}, "cluster_2": {addr2}, "cluster_3": {addr3}, }) // We expect one subConn on each subBalancer. sc1 := scs["cluster_1"][0].sc sc2 := scs["cluster_2"][0].sc sc3 := scs["cluster_3"][0].sc // Send state changes for all SubConns, and wait for the picker. wtb.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Connecting}) <-cc.NewPickerCh wtb.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Ready}) <-cc.NewPickerCh wtb.UpdateSubConnState(sc2, balancer.SubConnState{ConnectivityState: connectivity.Connecting}) <-cc.NewPickerCh wtb.UpdateSubConnState(sc2, balancer.SubConnState{ConnectivityState: connectivity.Ready}) <-cc.NewPickerCh wtb.UpdateSubConnState(sc3, balancer.SubConnState{ConnectivityState: connectivity.Connecting}) <-cc.NewPickerCh wtb.UpdateSubConnState(sc3, balancer.SubConnState{ConnectivityState: connectivity.Ready}) p := <-cc.NewPickerCh want := []balancer.SubConn{sc1, sc2, sc3} if err := testutils.IsRoundRobin(want, testutils.SubConnFromPicker(p)); err != nil { t.Fatalf("want %v, got %v", want, err) } // Remove the second balancer, while the others two are ready. config, err = wtbParser.ParseConfig([]byte(` { "targets": { "cluster_1": { "weight": 1, "childPolicy": [{"test_config_balancer": "cluster_1"}] }, "cluster_3": { "weight": 1, "childPolicy": [{"test_config_balancer": "cluster_3"}] } } }`)) if err != nil { t.Fatalf("failed to parse balancer config: %v", err) } if err := wtb.UpdateClientConnState(balancer.ClientConnState{ ResolverState: resolver.State{Addresses: []resolver.Address{ hierarchy.Set(addr1, []string{"cluster_1"}), hierarchy.Set(addr3, []string{"cluster_3"}), }}, BalancerConfig: config, }); err != nil { t.Fatalf("failed to update ClientConn state: %v", err) } // Removing a subBalancer causes the weighted target LB policy to push a new // picker which ensures that the removed subBalancer is not picked for RPCs. p = <-cc.NewPickerCh scRemoved := <-cc.RemoveSubConnCh if !cmp.Equal(scRemoved, sc2, cmp.AllowUnexported(testutils.TestSubConn{})) { t.Fatalf("RemoveSubConn, want %v, got %v", sc2, scRemoved) } want = []balancer.SubConn{sc1, sc3} if err := testutils.IsRoundRobin(want, testutils.SubConnFromPicker(p)); err != nil { t.Fatalf("want %v, got %v", want, err) } // Move balancer 3 into transient failure. wantSubConnErr := errors.New("subConn connection error") wtb.UpdateSubConnState(sc3, balancer.SubConnState{ ConnectivityState: connectivity.TransientFailure, ConnectionError: wantSubConnErr, }) <-cc.NewPickerCh // Remove the first balancer, while the third is transient failure. config, err = wtbParser.ParseConfig([]byte(` { "targets": { "cluster_3": { "weight": 1, "childPolicy": [{"test_config_balancer": "cluster_3"}] } } }`)) if err != nil { t.Fatalf("failed to parse balancer config: %v", err) } if err := wtb.UpdateClientConnState(balancer.ClientConnState{ ResolverState: resolver.State{Addresses: []resolver.Address{ hierarchy.Set(addr3, []string{"cluster_3"}), }}, BalancerConfig: config, }); err != nil { t.Fatalf("failed to update ClientConn state: %v", err) } // Removing a subBalancer causes the weighted target LB policy to push a new // picker which ensures that the removed subBalancer is not picked for RPCs. scRemoved = <-cc.RemoveSubConnCh if !cmp.Equal(scRemoved, sc1, cmp.AllowUnexported(testutils.TestSubConn{})) { t.Fatalf("RemoveSubConn, want %v, got %v", sc1, scRemoved) } ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer cancel() if err := cc.WaitForPicker(ctx, pickAndCheckError(wantSubConnErr)); err != nil { t.Fatal(err) } } // TestWeightedTarget_TwoSubBalancers_ChangeWeight_MoreBackends tests the case // where we have a weighted target balancer with two sub-balancers, and we // change the weight of these subBalancers. func (s) TestWeightedTarget_TwoSubBalancers_ChangeWeight_MoreBackends(t *testing.T) { cc := testutils.NewTestClientConn(t) wtb := wtbBuilder.Build(cc, balancer.BuildOptions{}) defer wtb.Close() // Start with two subBalancers, one with twice the weight of the other. config, err := wtbParser.ParseConfig([]byte(` { "targets": { "cluster_1": { "weight": 2, "childPolicy": [{"test_config_balancer": "cluster_1"}] }, "cluster_2": { "weight": 1, "childPolicy": [{"test_config_balancer": "cluster_2"}] } } }`)) if err != nil { t.Fatalf("failed to parse balancer config: %v", err) } // Send the config with two backends for each cluster. addr1 := resolver.Address{Addr: testBackendAddrStrs[1]} addr2 := resolver.Address{Addr: testBackendAddrStrs[2]} addr3 := resolver.Address{Addr: testBackendAddrStrs[3]} addr4 := resolver.Address{Addr: testBackendAddrStrs[4]} if err := wtb.UpdateClientConnState(balancer.ClientConnState{ ResolverState: resolver.State{Addresses: []resolver.Address{ hierarchy.Set(addr1, []string{"cluster_1"}), hierarchy.Set(addr2, []string{"cluster_1"}), hierarchy.Set(addr3, []string{"cluster_2"}), hierarchy.Set(addr4, []string{"cluster_2"}), }}, BalancerConfig: config, }); err != nil { t.Fatalf("failed to update ClientConn state: %v", err) } scs := waitForNewSubConns(t, cc, 4) verifySubConnAddrs(t, scs, map[string][]resolver.Address{ "cluster_1": {addr1, addr2}, "cluster_2": {addr3, addr4}, }) // We expect two subConns on each subBalancer. sc1 := scs["cluster_1"][0].sc sc2 := scs["cluster_1"][1].sc sc3 := scs["cluster_2"][0].sc sc4 := scs["cluster_2"][1].sc // Send state changes for all SubConns, and wait for the picker. wtb.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Connecting}) <-cc.NewPickerCh wtb.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Ready}) <-cc.NewPickerCh wtb.UpdateSubConnState(sc2, balancer.SubConnState{ConnectivityState: connectivity.Connecting}) <-cc.NewPickerCh wtb.UpdateSubConnState(sc2, balancer.SubConnState{ConnectivityState: connectivity.Ready}) <-cc.NewPickerCh wtb.UpdateSubConnState(sc3, balancer.SubConnState{ConnectivityState: connectivity.Connecting}) <-cc.NewPickerCh wtb.UpdateSubConnState(sc3, balancer.SubConnState{ConnectivityState: connectivity.Ready}) <-cc.NewPickerCh wtb.UpdateSubConnState(sc4, balancer.SubConnState{ConnectivityState: connectivity.Connecting}) <-cc.NewPickerCh wtb.UpdateSubConnState(sc4, balancer.SubConnState{ConnectivityState: connectivity.Ready}) p := <-cc.NewPickerCh // Test roundrobin on the last picker. Twice the number of RPCs should be // sent to cluster_1 when compared to cluster_2. want := []balancer.SubConn{sc1, sc1, sc2, sc2, sc3, sc4} if err := testutils.IsRoundRobin(want, testutils.SubConnFromPicker(p)); err != nil { t.Fatalf("want %v, got %v", want, err) } // Change the weight of cluster_1. config, err = wtbParser.ParseConfig([]byte(` { "targets": { "cluster_1": { "weight": 3, "childPolicy": [{"test_config_balancer": "cluster_1"}] }, "cluster_2": { "weight": 1, "childPolicy": [{"test_config_balancer": "cluster_2"}] } } }`)) if err != nil { t.Fatalf("failed to parse balancer config: %v", err) } if err := wtb.UpdateClientConnState(balancer.ClientConnState{ ResolverState: resolver.State{Addresses: []resolver.Address{ hierarchy.Set(addr1, []string{"cluster_1"}), hierarchy.Set(addr2, []string{"cluster_1"}), hierarchy.Set(addr3, []string{"cluster_2"}), hierarchy.Set(addr4, []string{"cluster_2"}), }}, BalancerConfig: config, }); err != nil { t.Fatalf("failed to update ClientConn state: %v", err) } // Weight change causes a new picker to be pushed to the channel. p = <-cc.NewPickerCh want = []balancer.SubConn{sc1, sc1, sc1, sc2, sc2, sc2, sc3, sc4} if err := testutils.IsRoundRobin(want, testutils.SubConnFromPicker(p)); err != nil { t.Fatalf("want %v, got %v", want, err) } } // TestWeightedTarget_InitOneSubBalancerTransientFailure tests that at init // time, with two sub-balancers, if one sub-balancer reports transient_failure, // the picks won't fail with transient_failure, and should instead wait for the // other sub-balancer. func (s) TestWeightedTarget_InitOneSubBalancerTransientFailure(t *testing.T) { cc := testutils.NewTestClientConn(t) wtb := wtbBuilder.Build(cc, balancer.BuildOptions{}) defer wtb.Close() // Start with "cluster_1: test_config_balancer, cluster_2: test_config_balancer". config, err := wtbParser.ParseConfig([]byte(` { "targets": { "cluster_1": { "weight":1, "childPolicy": [{"test_config_balancer": "cluster_1"}] }, "cluster_2": { "weight":1, "childPolicy": [{"test_config_balancer": "cluster_2"}] } } }`)) if err != nil { t.Fatalf("failed to parse balancer config: %v", err) } // Send the config with one address for each cluster. addr1 := resolver.Address{Addr: testBackendAddrStrs[1]} addr2 := resolver.Address{Addr: testBackendAddrStrs[2]} if err := wtb.UpdateClientConnState(balancer.ClientConnState{ ResolverState: resolver.State{Addresses: []resolver.Address{ hierarchy.Set(addr1, []string{"cluster_1"}), hierarchy.Set(addr2, []string{"cluster_2"}), }}, BalancerConfig: config, }); err != nil { t.Fatalf("failed to update ClientConn state: %v", err) } scs := waitForNewSubConns(t, cc, 2) verifySubConnAddrs(t, scs, map[string][]resolver.Address{ "cluster_1": {addr1}, "cluster_2": {addr2}, }) // We expect a single subConn on each subBalancer. sc1 := scs["cluster_1"][0].sc _ = scs["cluster_2"][0].sc // Set one subconn to TransientFailure, this will trigger one sub-balancer // to report transient failure. wtb.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.TransientFailure}) p := <-cc.NewPickerCh for i := 0; i < 5; i++ { r, err := p.Pick(balancer.PickInfo{}) if err != balancer.ErrNoSubConnAvailable { t.Fatalf("want pick to fail with %v, got result %v, err %v", balancer.ErrNoSubConnAvailable, r, err) } } } // Test that with two sub-balancers, both in transient_failure, if one turns // connecting, the overall state stays in transient_failure, and all picks // return transient failure error. func (s) TestBalancerGroup_SubBalancerTurnsConnectingFromTransientFailure(t *testing.T) { cc := testutils.NewTestClientConn(t) wtb := wtbBuilder.Build(cc, balancer.BuildOptions{}) defer wtb.Close() // Start with "cluster_1: test_config_balancer, cluster_2: test_config_balancer". config, err := wtbParser.ParseConfig([]byte(` { "targets": { "cluster_1": { "weight":1, "childPolicy": [{"test_config_balancer": "cluster_1"}] }, "cluster_2": { "weight":1, "childPolicy": [{"test_config_balancer": "cluster_2"}] } } }`)) if err != nil { t.Fatalf("failed to parse balancer config: %v", err) } // Send the config with one address for each cluster. addr1 := resolver.Address{Addr: testBackendAddrStrs[1]} addr2 := resolver.Address{Addr: testBackendAddrStrs[2]} if err := wtb.UpdateClientConnState(balancer.ClientConnState{ ResolverState: resolver.State{Addresses: []resolver.Address{ hierarchy.Set(addr1, []string{"cluster_1"}), hierarchy.Set(addr2, []string{"cluster_2"}), }}, BalancerConfig: config, }); err != nil { t.Fatalf("failed to update ClientConn state: %v", err) } scs := waitForNewSubConns(t, cc, 2) verifySubConnAddrs(t, scs, map[string][]resolver.Address{ "cluster_1": {addr1}, "cluster_2": {addr2}, }) // We expect a single subConn on each subBalancer. sc1 := scs["cluster_1"][0].sc sc2 := scs["cluster_2"][0].sc // Set both subconn to TransientFailure, this will put both sub-balancers in // transient failure. wantSubConnErr := errors.New("subConn connection error") wtb.UpdateSubConnState(sc1, balancer.SubConnState{ ConnectivityState: connectivity.TransientFailure, ConnectionError: wantSubConnErr, }) <-cc.NewPickerCh wtb.UpdateSubConnState(sc2, balancer.SubConnState{ ConnectivityState: connectivity.TransientFailure, ConnectionError: wantSubConnErr, }) p := <-cc.NewPickerCh for i := 0; i < 5; i++ { if _, err := p.Pick(balancer.PickInfo{}); err == nil || !strings.Contains(err.Error(), wantSubConnErr.Error()) { t.Fatalf("picker.Pick() returned error: %v, want: %v", err, wantSubConnErr) } } // Set one subconn to Connecting, it shouldn't change the overall state. wtb.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Connecting}) select { case <-time.After(100 * time.Millisecond): case <-cc.NewPickerCh: t.Fatal("received new picker from the LB policy when expecting none") } for i := 0; i < 5; i++ { if _, err := p.Pick(balancer.PickInfo{}); err == nil || !strings.Contains(err.Error(), wantSubConnErr.Error()) { t.Fatalf("picker.Pick() returned error: %v, want: %v", err, wantSubConnErr) } } } // Verify that a SubConn is created with the expected address and hierarchy // path cleared. func verifyAddressInNewSubConn(t *testing.T, cc *testutils.TestClientConn, addr resolver.Address) { t.Helper() gotAddr := <-cc.NewSubConnAddrsCh wantAddr := []resolver.Address{hierarchy.Set(addr, []string{})} if diff := cmp.Diff(gotAddr, wantAddr, cmp.AllowUnexported(attributes.Attributes{})); diff != "" { t.Fatalf("got unexpected new subconn addrs: %v", diff) } } // subConnWithAddr wraps a subConn and the address for which it was created. type subConnWithAddr struct { sc balancer.SubConn addr resolver.Address } // waitForNewSubConns waits for `num` number of subConns to be created. This is // expected to be used from tests using the "test_config_balancer" LB policy, // which adds an address attribute with value set to the balancer config. // // Returned value is a map from subBalancer (identified by its config) to // subConns created by it. func waitForNewSubConns(t *testing.T, cc *testutils.TestClientConn, num int) map[string][]subConnWithAddr { t.Helper() scs := make(map[string][]subConnWithAddr) for i := 0; i < num; i++ { addrs := <-cc.NewSubConnAddrsCh if len(addrs) != 1 { t.Fatalf("received subConns with %d addresses, want 1", len(addrs)) } cfg, ok := getConfigKey(addrs[0].Attributes) if !ok { t.Fatalf("received subConn address %v contains no attribute for balancer config", addrs[0]) } sc := <-cc.NewSubConnCh scWithAddr := subConnWithAddr{sc: sc, addr: addrs[0]} scs[cfg] = append(scs[cfg], scWithAddr) } return scs } func verifySubConnAddrs(t *testing.T, scs map[string][]subConnWithAddr, wantSubConnAddrs map[string][]resolver.Address) { t.Helper() if len(scs) != len(wantSubConnAddrs) { t.Fatalf("got new subConns %+v, want %v", scs, wantSubConnAddrs) } for cfg, scsWithAddr := range scs { if len(scsWithAddr) != len(wantSubConnAddrs[cfg]) { t.Fatalf("got new subConns %+v, want %v", scs, wantSubConnAddrs) } wantAddrs := wantSubConnAddrs[cfg] for i, scWithAddr := range scsWithAddr { if diff := cmp.Diff(wantAddrs[i].Addr, scWithAddr.addr.Addr); diff != "" { t.Fatalf("got unexpected new subconn addrs: %v", diff) } } } } const initIdleBalancerName = "test-init-Idle-balancer" var errTestInitIdle = fmt.Errorf("init Idle balancer error 0") func init() { stub.Register(initIdleBalancerName, stub.BalancerFuncs{ UpdateClientConnState: func(bd *stub.BalancerData, opts balancer.ClientConnState) error { bd.ClientConn.NewSubConn(opts.ResolverState.Addresses, balancer.NewSubConnOptions{}) return nil }, UpdateSubConnState: func(bd *stub.BalancerData, sc balancer.SubConn, state balancer.SubConnState) { err := fmt.Errorf("wrong picker error") if state.ConnectivityState == connectivity.Idle { err = errTestInitIdle } bd.ClientConn.UpdateState(balancer.State{ ConnectivityState: state.ConnectivityState, Picker: &testutils.TestConstPicker{Err: err}, }) }, }) } // TestInitialIdle covers the case that if the child reports Idle, the overall // state will be Idle. func (s) TestInitialIdle(t *testing.T) { cc := testutils.NewTestClientConn(t) wtb := wtbBuilder.Build(cc, balancer.BuildOptions{}) defer wtb.Close() config, err := wtbParser.ParseConfig([]byte(` { "targets": { "cluster_1": { "weight":1, "childPolicy": [{"test-init-Idle-balancer": ""}] } } }`)) if err != nil { t.Fatalf("failed to parse balancer config: %v", err) } // Send the config, and an address with hierarchy path ["cluster_1"]. addrs := []resolver.Address{{Addr: testBackendAddrStrs[0], Attributes: nil}} if err := wtb.UpdateClientConnState(balancer.ClientConnState{ ResolverState: resolver.State{Addresses: []resolver.Address{hierarchy.Set(addrs[0], []string{"cds:cluster_1"})}}, BalancerConfig: config, }); err != nil { t.Fatalf("failed to update ClientConn state: %v", err) } // Verify that a subconn is created with the address, and the hierarchy path // in the address is cleared. for range addrs { sc := <-cc.NewSubConnCh wtb.UpdateSubConnState(sc, balancer.SubConnState{ConnectivityState: connectivity.Idle}) } if state := <-cc.NewStateCh; state != connectivity.Idle { t.Fatalf("Received aggregated state: %v, want Idle", state) } } // TestIgnoreSubBalancerStateTransitions covers the case that if the child reports a // transition from TF to Connecting, the overall state will still be TF. func (s) TestIgnoreSubBalancerStateTransitions(t *testing.T) { cc := &tcc{TestClientConn: testutils.NewTestClientConn(t)} wtb := wtbBuilder.Build(cc, balancer.BuildOptions{}) defer wtb.Close() config, err := wtbParser.ParseConfig([]byte(` { "targets": { "cluster_1": { "weight":1, "childPolicy": [{"round_robin": ""}] } } }`)) if err != nil { t.Fatalf("failed to parse balancer config: %v", err) } // Send the config, and an address with hierarchy path ["cluster_1"]. addr := resolver.Address{Addr: testBackendAddrStrs[0], Attributes: nil} if err := wtb.UpdateClientConnState(balancer.ClientConnState{ ResolverState: resolver.State{Addresses: []resolver.Address{hierarchy.Set(addr, []string{"cluster_1"})}}, BalancerConfig: config, }); err != nil { t.Fatalf("failed to update ClientConn state: %v", err) } sc := <-cc.NewSubConnCh wtb.UpdateSubConnState(sc, balancer.SubConnState{ConnectivityState: connectivity.TransientFailure}) wtb.UpdateSubConnState(sc, balancer.SubConnState{ConnectivityState: connectivity.Connecting}) // Verify that the SubConnState update from TF to Connecting is ignored. if len(cc.states) != 2 || cc.states[0].ConnectivityState != connectivity.Connecting || cc.states[1].ConnectivityState != connectivity.TransientFailure { t.Fatalf("cc.states = %v; want [Connecting, TransientFailure]", cc.states) } } // tcc wraps a testutils.TestClientConn but stores all state transitions in a // slice. type tcc struct { *testutils.TestClientConn states []balancer.State } func (t *tcc) UpdateState(bs balancer.State) { t.states = append(t.states, bs) t.TestClientConn.UpdateState(bs) } func (s) TestUpdateStatePauses(t *testing.T) { cc := &tcc{TestClientConn: testutils.NewTestClientConn(t)} balFuncs := stub.BalancerFuncs{ UpdateClientConnState: func(bd *stub.BalancerData, s balancer.ClientConnState) error { bd.ClientConn.UpdateState(balancer.State{ConnectivityState: connectivity.TransientFailure, Picker: nil}) bd.ClientConn.UpdateState(balancer.State{ConnectivityState: connectivity.Ready, Picker: nil}) return nil }, } stub.Register("update_state_balancer", balFuncs) wtb := wtbBuilder.Build(cc, balancer.BuildOptions{}) defer wtb.Close() config, err := wtbParser.ParseConfig([]byte(` { "targets": { "cluster_1": { "weight":1, "childPolicy": [{"update_state_balancer": ""}] } } }`)) if err != nil { t.Fatalf("failed to parse balancer config: %v", err) } // Send the config, and an address with hierarchy path ["cluster_1"]. addrs := []resolver.Address{{Addr: testBackendAddrStrs[0], Attributes: nil}} if err := wtb.UpdateClientConnState(balancer.ClientConnState{ ResolverState: resolver.State{Addresses: []resolver.Address{hierarchy.Set(addrs[0], []string{"cds:cluster_1"})}}, BalancerConfig: config, }); err != nil { t.Fatalf("failed to update ClientConn state: %v", err) } // Verify that the only state update is the second one called by the child. if len(cc.states) != 1 || cc.states[0].ConnectivityState != connectivity.Ready { t.Fatalf("cc.states = %v; want [connectivity.Ready]", cc.states) } }