/* * * Copyright 2019 gRPC authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package clusterresolver import ( "context" "errors" "fmt" "testing" "time" corepb "github.com/envoyproxy/go-control-plane/envoy/api/v2/core" "github.com/google/go-cmp/cmp" "google.golang.org/grpc/balancer" "google.golang.org/grpc/balancer/weightedtarget" "google.golang.org/grpc/connectivity" "google.golang.org/grpc/internal/balancergroup" internalserviceconfig "google.golang.org/grpc/internal/serviceconfig" "google.golang.org/grpc/internal/testutils" "google.golang.org/grpc/resolver" "google.golang.org/grpc/xds/internal/balancer/clusterimpl" "google.golang.org/grpc/xds/internal/balancer/priority" xdstestutils "google.golang.org/grpc/xds/internal/testutils" "google.golang.org/grpc/xds/internal/testutils/fakeclient" "google.golang.org/grpc/xds/internal/xdsclient" ) var ( testClusterNames = []string{"test-cluster-1", "test-cluster-2"} testSubZones = []string{"I", "II", "III", "IV"} testEndpointAddrs []string ) const testBackendAddrsCount = 12 func init() { for i := 0; i < testBackendAddrsCount; i++ { testEndpointAddrs = append(testEndpointAddrs, fmt.Sprintf("%d.%d.%d.%d:%d", i, i, i, i, i)) } balancergroup.DefaultSubBalancerCloseTimeout = time.Millisecond clusterimpl.NewRandomWRR = testutils.NewTestWRR weightedtarget.NewRandomWRR = testutils.NewTestWRR balancergroup.DefaultSubBalancerCloseTimeout = time.Millisecond * 100 } func setupTestEDS(t *testing.T, initChild *internalserviceconfig.BalancerConfig) (balancer.Balancer, *testutils.TestClientConn, *fakeclient.Client, func()) { xdsC := fakeclient.NewClientWithName(testBalancerNameFooBar) cc := testutils.NewTestClientConn(t) builder := balancer.Get(Name) edsb := builder.Build(cc, balancer.BuildOptions{Target: resolver.Target{URL: *testutils.MustParseURL("dns:///" + testEDSService)}}) if edsb == nil { t.Fatalf("builder.Build(%s) failed and returned nil", Name) } ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer cancel() if err := edsb.UpdateClientConnState(balancer.ClientConnState{ ResolverState: xdsclient.SetClient(resolver.State{}, xdsC), BalancerConfig: &LBConfig{ DiscoveryMechanisms: []DiscoveryMechanism{{ Cluster: testClusterName, Type: DiscoveryMechanismTypeEDS, }}, }, }); err != nil { edsb.Close() t.Fatal(err) } if _, err := xdsC.WaitForWatchEDS(ctx); err != nil { edsb.Close() t.Fatalf("xdsClient.WatchEndpoints failed with error: %v", err) } return edsb, cc, xdsC, func() { edsb.Close() } } // When a high priority is ready, adding/removing lower locality doesn't cause // changes. // // Init 0 and 1; 0 is up, use 0; add 2, use 0; remove 2, use 0. func (s) TestEDSPriority_HighPriorityReady(t *testing.T) { edsb, cc, xdsC, cleanup := setupTestEDS(t, nil) defer cleanup() // Two localities, with priorities [0, 1], each with one backend. clab1 := xdstestutils.NewClusterLoadAssignmentBuilder(testClusterNames[0], nil) clab1.AddLocality(testSubZones[0], 1, 0, testEndpointAddrs[:1], nil) clab1.AddLocality(testSubZones[1], 1, 1, testEndpointAddrs[1:2], nil) xdsC.InvokeWatchEDSCallback("", parseEDSRespProtoForTesting(clab1.Build()), nil) addrs1 := <-cc.NewSubConnAddrsCh if got, want := addrs1[0].Addr, testEndpointAddrs[0]; got != want { t.Fatalf("sc is created with addr %v, want %v", got, want) } sc1 := <-cc.NewSubConnCh // p0 is ready. edsb.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Connecting}) edsb.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Ready}) // Test roundrobin with only p0 subconns. ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer cancel() if err := cc.WaitForRoundRobinPicker(ctx, sc1); err != nil { t.Fatal(err) } // Add p2, it shouldn't cause any updates. clab2 := xdstestutils.NewClusterLoadAssignmentBuilder(testClusterNames[0], nil) clab2.AddLocality(testSubZones[0], 1, 0, testEndpointAddrs[:1], nil) clab2.AddLocality(testSubZones[1], 1, 1, testEndpointAddrs[1:2], nil) clab2.AddLocality(testSubZones[2], 1, 2, testEndpointAddrs[2:3], nil) xdsC.InvokeWatchEDSCallback("", parseEDSRespProtoForTesting(clab2.Build()), nil) select { case <-cc.NewSubConnCh: t.Fatalf("got unexpected new SubConn") case <-cc.RemoveSubConnCh: t.Fatalf("got unexpected remove SubConn") case <-time.After(defaultTestShortTimeout): } select { case p := <-cc.NewPickerCh: // If we do get a new picker, ensure it is still a p1 picker. if err := testutils.IsRoundRobin([]balancer.SubConn{sc1}, testutils.SubConnFromPicker(p)); err != nil { t.Fatal(err) } default: // No new picker; we were previously using p1 and should still be using // p1, so this is okay. No need to wait for defaultTestShortTimeout // since we just waited immediately above. } // Remove p2, no updates. clab3 := xdstestutils.NewClusterLoadAssignmentBuilder(testClusterNames[0], nil) clab3.AddLocality(testSubZones[0], 1, 0, testEndpointAddrs[:1], nil) clab3.AddLocality(testSubZones[1], 1, 1, testEndpointAddrs[1:2], nil) xdsC.InvokeWatchEDSCallback("", parseEDSRespProtoForTesting(clab3.Build()), nil) select { case <-cc.NewSubConnCh: t.Fatalf("got unexpected new SubConn") case <-cc.RemoveSubConnCh: t.Fatalf("got unexpected remove SubConn") case <-time.After(defaultTestShortTimeout): } select { case p := <-cc.NewPickerCh: // If we do get a new picker, ensure it is still a p1 picker. if err := testutils.IsRoundRobin([]balancer.SubConn{sc1}, testutils.SubConnFromPicker(p)); err != nil { t.Fatal(err) } default: // No new picker; we were previously using p1 and should still be using // p1, so this is okay. No need to wait for defaultTestShortTimeout // since we just waited immediately above. } } // Lower priority is used when higher priority is not ready. // // Init 0 and 1; 0 is up, use 0; 0 is down, 1 is up, use 1; add 2, use 1; 1 is // down, use 2; remove 2, use 1. func (s) TestEDSPriority_SwitchPriority(t *testing.T) { edsb, cc, xdsC, cleanup := setupTestEDS(t, nil) defer cleanup() // Two localities, with priorities [0, 1], each with one backend. clab1 := xdstestutils.NewClusterLoadAssignmentBuilder(testClusterNames[0], nil) clab1.AddLocality(testSubZones[0], 1, 0, testEndpointAddrs[:1], nil) clab1.AddLocality(testSubZones[1], 1, 1, testEndpointAddrs[1:2], nil) xdsC.InvokeWatchEDSCallback("", parseEDSRespProtoForTesting(clab1.Build()), nil) addrs0 := <-cc.NewSubConnAddrsCh if got, want := addrs0[0].Addr, testEndpointAddrs[0]; got != want { t.Fatalf("sc is created with addr %v, want %v", got, want) } sc0 := <-cc.NewSubConnCh // p0 is ready. edsb.UpdateSubConnState(sc0, balancer.SubConnState{ConnectivityState: connectivity.Connecting}) edsb.UpdateSubConnState(sc0, balancer.SubConnState{ConnectivityState: connectivity.Ready}) // Test roundrobin with only p0 subconns. ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer cancel() if err := cc.WaitForRoundRobinPicker(ctx, sc0); err != nil { t.Fatal(err) } // Turn down 0, 1 is used. edsb.UpdateSubConnState(sc0, balancer.SubConnState{ConnectivityState: connectivity.TransientFailure}) addrs1 := <-cc.NewSubConnAddrsCh if got, want := addrs1[0].Addr, testEndpointAddrs[1]; got != want { t.Fatalf("sc is created with addr %v, want %v", got, want) } sc1 := <-cc.NewSubConnCh edsb.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Connecting}) edsb.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Ready}) // Test pick with 1. if err := cc.WaitForRoundRobinPicker(ctx, sc1); err != nil { t.Fatal(err) } // Add p2, it shouldn't cause any updates. clab2 := xdstestutils.NewClusterLoadAssignmentBuilder(testClusterNames[0], nil) clab2.AddLocality(testSubZones[0], 1, 0, testEndpointAddrs[:1], nil) clab2.AddLocality(testSubZones[1], 1, 1, testEndpointAddrs[1:2], nil) clab2.AddLocality(testSubZones[2], 1, 2, testEndpointAddrs[2:3], nil) xdsC.InvokeWatchEDSCallback("", parseEDSRespProtoForTesting(clab2.Build()), nil) select { case <-cc.NewSubConnCh: t.Fatalf("got unexpected new SubConn") case <-cc.RemoveSubConnCh: t.Fatalf("got unexpected remove SubConn") case <-time.After(defaultTestShortTimeout): } select { case p := <-cc.NewPickerCh: // If we do get a new picker, ensure it is still a p1 picker. if err := testutils.IsRoundRobin([]balancer.SubConn{sc1}, testutils.SubConnFromPicker(p)); err != nil { t.Fatal(err) } default: // No new picker; we were previously using p1 and should still be using // p1, so this is okay. No need to wait for defaultTestShortTimeout // since we just waited immediately above. } // Turn down 1, use 2 scConnErr := errors.New("subConn connection error") edsb.UpdateSubConnState(sc1, balancer.SubConnState{ ConnectivityState: connectivity.TransientFailure, ConnectionError: scConnErr, }) addrs2 := <-cc.NewSubConnAddrsCh if got, want := addrs2[0].Addr, testEndpointAddrs[2]; got != want { t.Fatalf("sc is created with addr %v, want %v", got, want) } sc2 := <-cc.NewSubConnCh edsb.UpdateSubConnState(sc2, balancer.SubConnState{ConnectivityState: connectivity.Connecting}) edsb.UpdateSubConnState(sc2, balancer.SubConnState{ConnectivityState: connectivity.Ready}) // Test pick with 2. if err := cc.WaitForRoundRobinPicker(ctx, sc2); err != nil { t.Fatal(err) } // Remove 2, use 1. clab3 := xdstestutils.NewClusterLoadAssignmentBuilder(testClusterNames[0], nil) clab3.AddLocality(testSubZones[0], 1, 0, testEndpointAddrs[:1], nil) clab3.AddLocality(testSubZones[1], 1, 1, testEndpointAddrs[1:2], nil) xdsC.InvokeWatchEDSCallback("", parseEDSRespProtoForTesting(clab3.Build()), nil) // p2 SubConns are removed. scToRemove := <-cc.RemoveSubConnCh if !cmp.Equal(scToRemove, sc2, cmp.AllowUnexported(testutils.TestSubConn{})) { t.Fatalf("RemoveSubConn, want %v, got %v", sc2, scToRemove) } // Should get an update with 1's old picker, to override 2's old picker. want := errors.New("last connection error: subConn connection error") if err := cc.WaitForPickerWithErr(ctx, want); err != nil { t.Fatal(err) } } // Add a lower priority while the higher priority is down. // // Init 0 and 1; 0 and 1 both down; add 2, use 2. func (s) TestEDSPriority_HigherDownWhileAddingLower(t *testing.T) { edsb, cc, xdsC, cleanup := setupTestEDS(t, nil) defer cleanup() // Two localities, with different priorities, each with one backend. clab1 := xdstestutils.NewClusterLoadAssignmentBuilder(testClusterNames[0], nil) clab1.AddLocality(testSubZones[0], 1, 0, testEndpointAddrs[:1], nil) clab1.AddLocality(testSubZones[1], 1, 1, testEndpointAddrs[1:2], nil) xdsC.InvokeWatchEDSCallback("", parseEDSRespProtoForTesting(clab1.Build()), nil) addrs0 := <-cc.NewSubConnAddrsCh if got, want := addrs0[0].Addr, testEndpointAddrs[0]; got != want { t.Fatalf("sc is created with addr %v, want %v", got, want) } sc0 := <-cc.NewSubConnCh // Turn down 0, 1 is used. edsb.UpdateSubConnState(sc0, balancer.SubConnState{ConnectivityState: connectivity.TransientFailure}) addrs1 := <-cc.NewSubConnAddrsCh if got, want := addrs1[0].Addr, testEndpointAddrs[1]; got != want { t.Fatalf("sc is created with addr %v, want %v", got, want) } sc1 := <-cc.NewSubConnCh // Turn down 1, pick should error. scConnErr := errors.New("subConn connection error") edsb.UpdateSubConnState(sc1, balancer.SubConnState{ ConnectivityState: connectivity.TransientFailure, ConnectionError: scConnErr, }) // Test pick failure. ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer cancel() want := errors.New("last connection error: subConn connection error") if err := cc.WaitForPickerWithErr(ctx, want); err != nil { t.Fatal(err) } // Add p2, it should create a new SubConn. clab2 := xdstestutils.NewClusterLoadAssignmentBuilder(testClusterNames[0], nil) clab2.AddLocality(testSubZones[0], 1, 0, testEndpointAddrs[:1], nil) clab2.AddLocality(testSubZones[1], 1, 1, testEndpointAddrs[1:2], nil) clab2.AddLocality(testSubZones[2], 1, 2, testEndpointAddrs[2:3], nil) xdsC.InvokeWatchEDSCallback("", parseEDSRespProtoForTesting(clab2.Build()), nil) addrs2 := <-cc.NewSubConnAddrsCh if got, want := addrs2[0].Addr, testEndpointAddrs[2]; got != want { t.Fatalf("sc is created with addr %v, want %v", got, want) } sc2 := <-cc.NewSubConnCh edsb.UpdateSubConnState(sc2, balancer.SubConnState{ConnectivityState: connectivity.Connecting}) edsb.UpdateSubConnState(sc2, balancer.SubConnState{ConnectivityState: connectivity.Ready}) // Test pick with 2. if err := cc.WaitForRoundRobinPicker(ctx, sc2); err != nil { t.Fatal(err) } } // When a higher priority becomes available, all lower priorities are closed. // // Init 0,1,2; 0 and 1 down, use 2; 0 up, close 1 and 2. func (s) TestEDSPriority_HigherReadyCloseAllLower(t *testing.T) { edsb, cc, xdsC, cleanup := setupTestEDS(t, nil) defer cleanup() // Two localities, with priorities [0,1,2], each with one backend. clab1 := xdstestutils.NewClusterLoadAssignmentBuilder(testClusterNames[0], nil) clab1.AddLocality(testSubZones[0], 1, 0, testEndpointAddrs[:1], nil) clab1.AddLocality(testSubZones[1], 1, 1, testEndpointAddrs[1:2], nil) clab1.AddLocality(testSubZones[2], 1, 2, testEndpointAddrs[2:3], nil) xdsC.InvokeWatchEDSCallback("", parseEDSRespProtoForTesting(clab1.Build()), nil) addrs0 := <-cc.NewSubConnAddrsCh if got, want := addrs0[0].Addr, testEndpointAddrs[0]; got != want { t.Fatalf("sc is created with addr %v, want %v", got, want) } sc0 := <-cc.NewSubConnCh // Turn down 0, 1 is used. edsb.UpdateSubConnState(sc0, balancer.SubConnState{ConnectivityState: connectivity.TransientFailure}) addrs1 := <-cc.NewSubConnAddrsCh if got, want := addrs1[0].Addr, testEndpointAddrs[1]; got != want { t.Fatalf("sc is created with addr %v, want %v", got, want) } sc1 := <-cc.NewSubConnCh // Turn down 1, 2 is used. edsb.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.TransientFailure}) addrs2 := <-cc.NewSubConnAddrsCh if got, want := addrs2[0].Addr, testEndpointAddrs[2]; got != want { t.Fatalf("sc is created with addr %v, want %v", got, want) } sc2 := <-cc.NewSubConnCh edsb.UpdateSubConnState(sc2, balancer.SubConnState{ConnectivityState: connectivity.Connecting}) edsb.UpdateSubConnState(sc2, balancer.SubConnState{ConnectivityState: connectivity.Ready}) // Test pick with 2. ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer cancel() if err := cc.WaitForRoundRobinPicker(ctx, sc2); err != nil { t.Fatal(err) } // When 0 becomes ready, 0 should be used, 1 and 2 should all be closed. edsb.UpdateSubConnState(sc0, balancer.SubConnState{ConnectivityState: connectivity.Ready}) var ( scToRemove []balancer.SubConn scToRemoveMap = make(map[balancer.SubConn]struct{}) ) // Each subconn is removed. // The sub-balancer to be closed is priority's child, clusterimpl, who has // weightedtarget as children. // - When clusterimpl is closed, it closes weightedtarget, and this // weightedtarget's balancer removes all the subconns. for i := 0; i < 2; i++ { // We expect 2 subconns, so we recv from channel 2 times. scToRemoveMap[<-cc.RemoveSubConnCh] = struct{}{} } for sc := range scToRemoveMap { scToRemove = append(scToRemove, sc) } // sc1 and sc2 should be removed. // // With localities caching, the lower priorities are closed after a timeout, // in goroutines. The order is no longer guaranteed. if !(cmp.Equal(scToRemove[0], sc1, cmp.AllowUnexported(testutils.TestSubConn{})) && cmp.Equal(scToRemove[1], sc2, cmp.AllowUnexported(testutils.TestSubConn{}))) && !(cmp.Equal(scToRemove[0], sc2, cmp.AllowUnexported(testutils.TestSubConn{})) && cmp.Equal(scToRemove[1], sc1, cmp.AllowUnexported(testutils.TestSubConn{}))) { t.Errorf("RemoveSubConn, want [%v, %v], got %v", sc1, sc2, scToRemove) } // Test pick with 0. if err := cc.WaitForRoundRobinPicker(ctx, sc0); err != nil { t.Fatal(err) } } // At init, start the next lower priority after timeout if the higher priority // doesn't get ready. // // Init 0,1; 0 is not ready (in connecting), after timeout, use 1. func (s) TestEDSPriority_InitTimeout(t *testing.T) { const testPriorityInitTimeout = time.Second defer func() func() { old := priority.DefaultPriorityInitTimeout priority.DefaultPriorityInitTimeout = testPriorityInitTimeout return func() { priority.DefaultPriorityInitTimeout = old } }()() edsb, cc, xdsC, cleanup := setupTestEDS(t, nil) defer cleanup() // Two localities, with different priorities, each with one backend. clab1 := xdstestutils.NewClusterLoadAssignmentBuilder(testClusterNames[0], nil) clab1.AddLocality(testSubZones[0], 1, 0, testEndpointAddrs[:1], nil) clab1.AddLocality(testSubZones[1], 1, 1, testEndpointAddrs[1:2], nil) xdsC.InvokeWatchEDSCallback("", parseEDSRespProtoForTesting(clab1.Build()), nil) addrs0 := <-cc.NewSubConnAddrsCh if got, want := addrs0[0].Addr, testEndpointAddrs[0]; got != want { t.Fatalf("sc is created with addr %v, want %v", got, want) } sc0 := <-cc.NewSubConnCh // Keep 0 in connecting, 1 will be used after init timeout. edsb.UpdateSubConnState(sc0, balancer.SubConnState{ConnectivityState: connectivity.Connecting}) // Make sure new SubConn is created before timeout. select { case <-time.After(testPriorityInitTimeout * 3 / 4): case <-cc.NewSubConnAddrsCh: t.Fatalf("Got a new SubConn too early (Within timeout). Expect a new SubConn only after timeout") } addrs1 := <-cc.NewSubConnAddrsCh if got, want := addrs1[0].Addr, testEndpointAddrs[1]; got != want { t.Fatalf("sc is created with addr %v, want %v", got, want) } sc1 := <-cc.NewSubConnCh edsb.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Connecting}) edsb.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Ready}) // Test pick with 1. ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer cancel() if err := cc.WaitForRoundRobinPicker(ctx, sc1); err != nil { t.Fatal(err) } } // Add localities to existing priorities. // // - start with 2 locality with p0 and p1 // - add localities to existing p0 and p1 func (s) TestEDSPriority_MultipleLocalities(t *testing.T) { edsb, cc, xdsC, cleanup := setupTestEDS(t, nil) defer cleanup() // Two localities, with different priorities, each with one backend. clab0 := xdstestutils.NewClusterLoadAssignmentBuilder(testClusterNames[0], nil) clab0.AddLocality(testSubZones[0], 1, 0, testEndpointAddrs[:1], nil) clab0.AddLocality(testSubZones[1], 1, 1, testEndpointAddrs[1:2], nil) xdsC.InvokeWatchEDSCallback("", parseEDSRespProtoForTesting(clab0.Build()), nil) addrs0 := <-cc.NewSubConnAddrsCh if got, want := addrs0[0].Addr, testEndpointAddrs[0]; got != want { t.Fatalf("sc is created with addr %v, want %v", got, want) } sc0 := <-cc.NewSubConnCh edsb.UpdateSubConnState(sc0, balancer.SubConnState{ConnectivityState: connectivity.Connecting}) edsb.UpdateSubConnState(sc0, balancer.SubConnState{ConnectivityState: connectivity.Ready}) // Test roundrobin with only p0 subconns. ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer cancel() if err := cc.WaitForRoundRobinPicker(ctx, sc0); err != nil { t.Fatal(err) } // Turn down p0 subconns, p1 subconns will be created. edsb.UpdateSubConnState(sc0, balancer.SubConnState{ConnectivityState: connectivity.TransientFailure}) addrs1 := <-cc.NewSubConnAddrsCh if got, want := addrs1[0].Addr, testEndpointAddrs[1]; got != want { t.Fatalf("sc is created with addr %v, want %v", got, want) } sc1 := <-cc.NewSubConnCh edsb.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Connecting}) edsb.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Ready}) // Test roundrobin with only p1 subconns. if err := cc.WaitForRoundRobinPicker(ctx, sc1); err != nil { t.Fatal(err) } // Reconnect p0 subconns, p1 subconn will be closed. edsb.UpdateSubConnState(sc0, balancer.SubConnState{ConnectivityState: connectivity.Ready}) scToRemove := <-cc.RemoveSubConnCh if !cmp.Equal(scToRemove, sc1, cmp.AllowUnexported(testutils.TestSubConn{})) { t.Fatalf("RemoveSubConn, want %v, got %v", sc1, scToRemove) } // Test roundrobin with only p0 subconns. if err := cc.WaitForRoundRobinPicker(ctx, sc0); err != nil { t.Fatal(err) } // Add two localities, with two priorities, with one backend. clab1 := xdstestutils.NewClusterLoadAssignmentBuilder(testClusterNames[0], nil) clab1.AddLocality(testSubZones[0], 1, 0, testEndpointAddrs[:1], nil) clab1.AddLocality(testSubZones[1], 1, 1, testEndpointAddrs[1:2], nil) clab1.AddLocality(testSubZones[2], 1, 0, testEndpointAddrs[2:3], nil) clab1.AddLocality(testSubZones[3], 1, 1, testEndpointAddrs[3:4], nil) xdsC.InvokeWatchEDSCallback("", parseEDSRespProtoForTesting(clab1.Build()), nil) addrs2 := <-cc.NewSubConnAddrsCh if got, want := addrs2[0].Addr, testEndpointAddrs[2]; got != want { t.Fatalf("sc is created with addr %v, want %v", got, want) } sc2 := <-cc.NewSubConnCh edsb.UpdateSubConnState(sc2, balancer.SubConnState{ConnectivityState: connectivity.Connecting}) edsb.UpdateSubConnState(sc2, balancer.SubConnState{ConnectivityState: connectivity.Ready}) // Test roundrobin with only two p0 subconns. if err := cc.WaitForRoundRobinPicker(ctx, sc0, sc2); err != nil { t.Fatal(err) } // Turn down p0 subconns, p1 subconns will be created. edsb.UpdateSubConnState(sc0, balancer.SubConnState{ConnectivityState: connectivity.TransientFailure}) edsb.UpdateSubConnState(sc2, balancer.SubConnState{ConnectivityState: connectivity.TransientFailure}) sc3 := <-cc.NewSubConnCh edsb.UpdateSubConnState(sc3, balancer.SubConnState{ConnectivityState: connectivity.Connecting}) edsb.UpdateSubConnState(sc3, balancer.SubConnState{ConnectivityState: connectivity.Ready}) sc4 := <-cc.NewSubConnCh edsb.UpdateSubConnState(sc4, balancer.SubConnState{ConnectivityState: connectivity.Connecting}) edsb.UpdateSubConnState(sc4, balancer.SubConnState{ConnectivityState: connectivity.Ready}) // Test roundrobin with only p1 subconns. if err := cc.WaitForRoundRobinPicker(ctx, sc3, sc4); err != nil { t.Fatal(err) } } // EDS removes all localities, and re-adds them. func (s) TestEDSPriority_RemovesAllLocalities(t *testing.T) { const testPriorityInitTimeout = time.Second defer func() func() { old := priority.DefaultPriorityInitTimeout priority.DefaultPriorityInitTimeout = testPriorityInitTimeout return func() { priority.DefaultPriorityInitTimeout = old } }()() edsb, cc, xdsC, cleanup := setupTestEDS(t, nil) defer cleanup() // Two localities, with different priorities, each with one backend. clab0 := xdstestutils.NewClusterLoadAssignmentBuilder(testClusterNames[0], nil) clab0.AddLocality(testSubZones[0], 1, 0, testEndpointAddrs[:1], nil) clab0.AddLocality(testSubZones[1], 1, 1, testEndpointAddrs[1:2], nil) xdsC.InvokeWatchEDSCallback("", parseEDSRespProtoForTesting(clab0.Build()), nil) addrs0 := <-cc.NewSubConnAddrsCh if got, want := addrs0[0].Addr, testEndpointAddrs[0]; got != want { t.Fatalf("sc is created with addr %v, want %v", got, want) } sc0 := <-cc.NewSubConnCh edsb.UpdateSubConnState(sc0, balancer.SubConnState{ConnectivityState: connectivity.Connecting}) edsb.UpdateSubConnState(sc0, balancer.SubConnState{ConnectivityState: connectivity.Ready}) // Test roundrobin with only p0 subconns. ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer cancel() if err := cc.WaitForRoundRobinPicker(ctx, sc0); err != nil { t.Fatal(err) } // Remove all priorities. clab1 := xdstestutils.NewClusterLoadAssignmentBuilder(testClusterNames[0], nil) xdsC.InvokeWatchEDSCallback("", parseEDSRespProtoForTesting(clab1.Build()), nil) // p0 subconn should be removed. scToRemove := <-cc.RemoveSubConnCh if !cmp.Equal(scToRemove, sc0, cmp.AllowUnexported(testutils.TestSubConn{})) { t.Fatalf("RemoveSubConn, want %v, got %v", sc0, scToRemove) } // time.Sleep(time.Second) // Test pick return TransientFailure. if err := cc.WaitForPickerWithErr(ctx, priority.ErrAllPrioritiesRemoved); err != nil { t.Fatal(err) } // Re-add two localities, with previous priorities, but different backends. clab2 := xdstestutils.NewClusterLoadAssignmentBuilder(testClusterNames[0], nil) clab2.AddLocality(testSubZones[0], 1, 0, testEndpointAddrs[2:3], nil) clab2.AddLocality(testSubZones[1], 1, 1, testEndpointAddrs[3:4], nil) xdsC.InvokeWatchEDSCallback("", parseEDSRespProtoForTesting(clab2.Build()), nil) addrs01 := <-cc.NewSubConnAddrsCh if got, want := addrs01[0].Addr, testEndpointAddrs[2]; got != want { t.Fatalf("sc is created with addr %v, want %v", got, want) } sc01 := <-cc.NewSubConnCh // Don't send any update to p0, so to not override the old state of p0. // Later, connect to p1 and then remove p1. This will fallback to p0, and // will send p0's old picker if they are not correctly removed. // p1 will be used after priority init timeout. addrs11 := <-cc.NewSubConnAddrsCh if got, want := addrs11[0].Addr, testEndpointAddrs[3]; got != want { t.Fatalf("sc is created with addr %v, want %v", got, want) } sc11 := <-cc.NewSubConnCh edsb.UpdateSubConnState(sc11, balancer.SubConnState{ConnectivityState: connectivity.Connecting}) edsb.UpdateSubConnState(sc11, balancer.SubConnState{ConnectivityState: connectivity.Ready}) // Test roundrobin with only p1 subconns. if err := cc.WaitForRoundRobinPicker(ctx, sc11); err != nil { t.Fatal(err) } // Remove p1 from EDS, to fallback to p0. clab3 := xdstestutils.NewClusterLoadAssignmentBuilder(testClusterNames[0], nil) clab3.AddLocality(testSubZones[0], 1, 0, testEndpointAddrs[2:3], nil) xdsC.InvokeWatchEDSCallback("", parseEDSRespProtoForTesting(clab3.Build()), nil) // p1 subconn should be removed. scToRemove1 := <-cc.RemoveSubConnCh if !cmp.Equal(scToRemove1, sc11, cmp.AllowUnexported(testutils.TestSubConn{})) { t.Fatalf("RemoveSubConn, want %v, got %v", sc11, scToRemove1) } // Test pick return TransientFailure. if err := cc.WaitForPickerWithErr(ctx, balancer.ErrNoSubConnAvailable); err != nil { t.Fatal(err) } // Send an ready update for the p0 sc that was received when re-adding // localities to EDS. edsb.UpdateSubConnState(sc01, balancer.SubConnState{ConnectivityState: connectivity.Connecting}) edsb.UpdateSubConnState(sc01, balancer.SubConnState{ConnectivityState: connectivity.Ready}) // Test roundrobin with only p0 subconns. if err := cc.WaitForRoundRobinPicker(ctx, sc01); err != nil { t.Fatal(err) } select { case <-cc.NewPickerCh: t.Fatalf("got unexpected new picker") case <-cc.NewSubConnCh: t.Fatalf("got unexpected new SubConn") case <-cc.RemoveSubConnCh: t.Fatalf("got unexpected remove SubConn") case <-time.After(defaultTestShortTimeout): } } // Test the case where the high priority contains no backends. The low priority // will be used. func (s) TestEDSPriority_HighPriorityNoEndpoints(t *testing.T) { edsb, cc, xdsC, cleanup := setupTestEDS(t, nil) defer cleanup() // Two localities, with priorities [0, 1], each with one backend. clab1 := xdstestutils.NewClusterLoadAssignmentBuilder(testClusterNames[0], nil) clab1.AddLocality(testSubZones[0], 1, 0, testEndpointAddrs[:1], nil) clab1.AddLocality(testSubZones[1], 1, 1, testEndpointAddrs[1:2], nil) xdsC.InvokeWatchEDSCallback("", parseEDSRespProtoForTesting(clab1.Build()), nil) addrs1 := <-cc.NewSubConnAddrsCh if got, want := addrs1[0].Addr, testEndpointAddrs[0]; got != want { t.Fatalf("sc is created with addr %v, want %v", got, want) } sc1 := <-cc.NewSubConnCh // p0 is ready. edsb.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Connecting}) edsb.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Ready}) // Test roundrobin with only p0 subconns. ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer cancel() if err := cc.WaitForRoundRobinPicker(ctx, sc1); err != nil { t.Fatal(err) } // Remove addresses from priority 0, should use p1. clab2 := xdstestutils.NewClusterLoadAssignmentBuilder(testClusterNames[0], nil) clab2.AddLocality(testSubZones[0], 1, 0, nil, nil) clab2.AddLocality(testSubZones[1], 1, 1, testEndpointAddrs[1:2], nil) xdsC.InvokeWatchEDSCallback("", parseEDSRespProtoForTesting(clab2.Build()), nil) // p0 will remove the subconn, and ClientConn will send a sc update to // shutdown. scToRemove := <-cc.RemoveSubConnCh edsb.UpdateSubConnState(scToRemove, balancer.SubConnState{ConnectivityState: connectivity.Shutdown}) addrs2 := <-cc.NewSubConnAddrsCh if got, want := addrs2[0].Addr, testEndpointAddrs[1]; got != want { t.Fatalf("sc is created with addr %v, want %v", got, want) } sc2 := <-cc.NewSubConnCh // p1 is ready. edsb.UpdateSubConnState(sc2, balancer.SubConnState{ConnectivityState: connectivity.Connecting}) edsb.UpdateSubConnState(sc2, balancer.SubConnState{ConnectivityState: connectivity.Ready}) // Test roundrobin with only p1 subconns. if err := cc.WaitForRoundRobinPicker(ctx, sc2); err != nil { t.Fatal(err) } } // Test the case where the high priority contains no healthy backends. The low // priority will be used. func (s) TestEDSPriority_HighPriorityAllUnhealthy(t *testing.T) { edsb, cc, xdsC, cleanup := setupTestEDS(t, nil) defer cleanup() // Two localities, with priorities [0, 1], each with one backend. clab1 := xdstestutils.NewClusterLoadAssignmentBuilder(testClusterNames[0], nil) clab1.AddLocality(testSubZones[0], 1, 0, testEndpointAddrs[:1], nil) clab1.AddLocality(testSubZones[1], 1, 1, testEndpointAddrs[1:2], nil) xdsC.InvokeWatchEDSCallback("", parseEDSRespProtoForTesting(clab1.Build()), nil) addrs1 := <-cc.NewSubConnAddrsCh if got, want := addrs1[0].Addr, testEndpointAddrs[0]; got != want { t.Fatalf("sc is created with addr %v, want %v", got, want) } sc1 := <-cc.NewSubConnCh // p0 is ready. edsb.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Connecting}) edsb.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Ready}) // Test roundrobin with only p0 subconns. ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer cancel() if err := cc.WaitForRoundRobinPicker(ctx, sc1); err != nil { t.Fatal(err) } // Set priority 0 endpoints to all unhealthy, should use p1. clab2 := xdstestutils.NewClusterLoadAssignmentBuilder(testClusterNames[0], nil) clab2.AddLocality(testSubZones[0], 1, 0, testEndpointAddrs[:1], &xdstestutils.AddLocalityOptions{ Health: []corepb.HealthStatus{corepb.HealthStatus_UNHEALTHY}, }) clab2.AddLocality(testSubZones[1], 1, 1, testEndpointAddrs[1:2], nil) xdsC.InvokeWatchEDSCallback("", parseEDSRespProtoForTesting(clab2.Build()), nil) // p0 will remove the subconn, and ClientConn will send a sc update to // transient failure. scToRemove := <-cc.RemoveSubConnCh edsb.UpdateSubConnState(scToRemove, balancer.SubConnState{ConnectivityState: connectivity.Shutdown}) addrs2 := <-cc.NewSubConnAddrsCh if got, want := addrs2[0].Addr, testEndpointAddrs[1]; got != want { t.Fatalf("sc is created with addr %v, want %v", got, want) } sc2 := <-cc.NewSubConnCh // p1 is ready. edsb.UpdateSubConnState(sc2, balancer.SubConnState{ConnectivityState: connectivity.Connecting}) edsb.UpdateSubConnState(sc2, balancer.SubConnState{ConnectivityState: connectivity.Ready}) // Test roundrobin with only p1 subconns. if err := cc.WaitForRoundRobinPicker(ctx, sc2); err != nil { t.Fatal(err) } } // Test the case where the first and only priority is removed. func (s) TestEDSPriority_FirstPriorityRemoved(t *testing.T) { const testPriorityInitTimeout = time.Second defer func() func() { old := priority.DefaultPriorityInitTimeout priority.DefaultPriorityInitTimeout = testPriorityInitTimeout return func() { priority.DefaultPriorityInitTimeout = old } }()() _, cc, xdsC, cleanup := setupTestEDS(t, nil) defer cleanup() // One localities, with priorities [0], each with one backend. clab1 := xdstestutils.NewClusterLoadAssignmentBuilder(testClusterNames[0], nil) clab1.AddLocality(testSubZones[0], 1, 0, testEndpointAddrs[:1], nil) xdsC.InvokeWatchEDSCallback("", parseEDSRespProtoForTesting(clab1.Build()), nil) // Remove the only localities. clab2 := xdstestutils.NewClusterLoadAssignmentBuilder(testClusterNames[0], nil) xdsC.InvokeWatchEDSCallback("", parseEDSRespProtoForTesting(clab2.Build()), nil) ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer cancel() if err := cc.WaitForErrPicker(ctx); err != nil { t.Fatal(err) } } // Watch resources from EDS and DNS, with EDS as the higher priority. Lower // priority is used when higher priority is not ready. func (s) TestFallbackToDNS(t *testing.T) { const testDNSEndpointAddr = "3.1.4.1:5" // dnsTargetCh, dnsCloseCh, resolveNowCh, dnsR, cleanup := setupDNS() dnsTargetCh, _, resolveNowCh, dnsR, cleanupDNS := setupDNS() defer cleanupDNS() edsb, cc, xdsC, cleanup := setupTestEDS(t, nil) defer cleanup() if err := edsb.UpdateClientConnState(balancer.ClientConnState{ BalancerConfig: &LBConfig{ DiscoveryMechanisms: []DiscoveryMechanism{ { Type: DiscoveryMechanismTypeEDS, Cluster: testClusterName, }, { Type: DiscoveryMechanismTypeLogicalDNS, DNSHostname: testDNSTarget, }, }, }, }); err != nil { t.Fatal(err) } ctx, ctxCancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer ctxCancel() select { case target := <-dnsTargetCh: if diff := cmp.Diff(target, resolver.Target{Scheme: "dns", URL: *testutils.MustParseURL("dns:///" + testDNSTarget)}); diff != "" { t.Fatalf("got unexpected DNS target to watch, diff (-got, +want): %v", diff) } case <-ctx.Done(): t.Fatal("Timed out waiting for building DNS resolver") } // One locality with one backend. clab1 := xdstestutils.NewClusterLoadAssignmentBuilder(testClusterNames[0], nil) clab1.AddLocality(testSubZones[0], 1, 0, testEndpointAddrs[:1], nil) xdsC.InvokeWatchEDSCallback("", parseEDSRespProtoForTesting(clab1.Build()), nil) // Also send a DNS update, because the balancer needs both updates from all // resources to move on. dnsR.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: testDNSEndpointAddr}}}) addrs0 := <-cc.NewSubConnAddrsCh if got, want := addrs0[0].Addr, testEndpointAddrs[0]; got != want { t.Fatalf("sc is created with addr %v, want %v", got, want) } sc0 := <-cc.NewSubConnCh // p0 is ready. edsb.UpdateSubConnState(sc0, balancer.SubConnState{ConnectivityState: connectivity.Connecting}) edsb.UpdateSubConnState(sc0, balancer.SubConnState{ConnectivityState: connectivity.Ready}) // Test roundrobin with only p0 subconns. if err := cc.WaitForRoundRobinPicker(ctx, sc0); err != nil { t.Fatal(err) } // Turn down 0, p1 (DNS) will be used. edsb.UpdateSubConnState(sc0, balancer.SubConnState{ConnectivityState: connectivity.TransientFailure}) // The transient failure above should not trigger a re-resolve to the DNS // resolver. Need to read to clear the channel, to avoid potential deadlock // writing to the channel later. shortCtx, shortCancel := context.WithTimeout(context.Background(), defaultTestShortTimeout) defer shortCancel() select { case <-resolveNowCh: t.Fatal("unexpected re-resolve trigger by transient failure from EDS endpoint") case <-shortCtx.Done(): } // The addresses used to create new SubConn should be the DNS endpoint. addrs1 := <-cc.NewSubConnAddrsCh if got, want := addrs1[0].Addr, testDNSEndpointAddr; got != want { t.Fatalf("sc is created with addr %v, want %v", got, want) } sc1 := <-cc.NewSubConnCh edsb.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Connecting}) edsb.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Ready}) // Test pick with 1. if err := cc.WaitForRoundRobinPicker(ctx, sc1); err != nil { t.Fatal(err) } // Turn down the DNS endpoint, this should trigger an re-resolve in the DNS // resolver. edsb.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.TransientFailure}) // The transient failure above should trigger a re-resolve to the DNS // resolver. Need to read to clear the channel, to avoid potential deadlock // writing to the channel later. select { case <-resolveNowCh: case <-ctx.Done(): t.Fatal("Timed out waiting for re-resolve") } }