/* * * Copyright 2020 gRPC authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. * */ // Package weightedaggregator implements state aggregator for weighted_target // balancer. // // This is a separate package so it can be shared by weighted_target and eds. // The eds balancer will be refactored to use weighted_target directly. After // that, all functions and structs in this package can be moved to package // weightedtarget and unexported. package weightedaggregator import ( "fmt" "sync" "google.golang.org/grpc/balancer" "google.golang.org/grpc/balancer/base" "google.golang.org/grpc/connectivity" "google.golang.org/grpc/internal/grpclog" "google.golang.org/grpc/internal/wrr" ) type weightedPickerState struct { weight uint32 state balancer.State // stateToAggregate is the connectivity state used only for state // aggregation. It could be different from state.ConnectivityState. For // example when a sub-balancer transitions from TransientFailure to // connecting, state.ConnectivityState is Connecting, but stateToAggregate // is still TransientFailure. stateToAggregate connectivity.State } func (s *weightedPickerState) String() string { return fmt.Sprintf("weight:%v,picker:%p,state:%v,stateToAggregate:%v", s.weight, s.state.Picker, s.state.ConnectivityState, s.stateToAggregate) } // Aggregator is the weighted balancer state aggregator. type Aggregator struct { cc balancer.ClientConn logger *grpclog.PrefixLogger newWRR func() wrr.WRR csEvltr *balancer.ConnectivityStateEvaluator mu sync.Mutex // If started is false, no updates should be sent to the parent cc. A closed // sub-balancer could still send pickers to this aggregator. This makes sure // that no updates will be forwarded to parent when the whole balancer group // and states aggregator is closed. started bool // All balancer IDs exist as keys in this map, even if balancer group is not // started. // // If an ID is not in map, it's either removed or never added. idToPickerState map[string]*weightedPickerState // Set when UpdateState call propagation is paused. pauseUpdateState bool // Set when UpdateState call propagation is paused and an UpdateState call // is suppressed. needUpdateStateOnResume bool } // New creates a new weighted balancer state aggregator. func New(cc balancer.ClientConn, logger *grpclog.PrefixLogger, newWRR func() wrr.WRR) *Aggregator { return &Aggregator{ cc: cc, logger: logger, newWRR: newWRR, csEvltr: &balancer.ConnectivityStateEvaluator{}, idToPickerState: make(map[string]*weightedPickerState), } } // Start starts the aggregator. It can be called after Stop to restart the // aggretator. func (wbsa *Aggregator) Start() { wbsa.mu.Lock() defer wbsa.mu.Unlock() wbsa.started = true } // Stop stops the aggregator. When the aggregator is stopped, it won't call // parent ClientConn to update balancer state. func (wbsa *Aggregator) Stop() { wbsa.mu.Lock() defer wbsa.mu.Unlock() wbsa.started = false wbsa.clearStates() } // Add adds a sub-balancer state with weight. It adds a place holder, and waits for // the real sub-balancer to update state. func (wbsa *Aggregator) Add(id string, weight uint32) { wbsa.mu.Lock() defer wbsa.mu.Unlock() wbsa.idToPickerState[id] = &weightedPickerState{ weight: weight, // Start everything in CONNECTING, so if one of the sub-balancers // reports TransientFailure, the RPCs will still wait for the other // sub-balancers. state: balancer.State{ ConnectivityState: connectivity.Connecting, Picker: base.NewErrPicker(balancer.ErrNoSubConnAvailable), }, stateToAggregate: connectivity.Connecting, } wbsa.csEvltr.RecordTransition(connectivity.Shutdown, connectivity.Connecting) wbsa.buildAndUpdateLocked() } // Remove removes the sub-balancer state. Future updates from this sub-balancer, // if any, will be ignored. func (wbsa *Aggregator) Remove(id string) { wbsa.mu.Lock() defer wbsa.mu.Unlock() if _, ok := wbsa.idToPickerState[id]; !ok { return } // Setting the state of the deleted sub-balancer to Shutdown will get csEvltr // to remove the previous state for any aggregated state evaluations. // transitions to and from connectivity.Shutdown are ignored by csEvltr. wbsa.csEvltr.RecordTransition(wbsa.idToPickerState[id].stateToAggregate, connectivity.Shutdown) // Remove id and picker from picker map. This also results in future updates // for this ID to be ignored. delete(wbsa.idToPickerState, id) wbsa.buildAndUpdateLocked() } // UpdateWeight updates the weight for the given id. Note that this doesn't // trigger an update to the parent ClientConn. The caller should decide when // it's necessary, and call BuildAndUpdate. func (wbsa *Aggregator) UpdateWeight(id string, newWeight uint32) { wbsa.mu.Lock() defer wbsa.mu.Unlock() pState, ok := wbsa.idToPickerState[id] if !ok { return } pState.weight = newWeight } // PauseStateUpdates causes UpdateState calls to not propagate to the parent // ClientConn. The last state will be remembered and propagated when // ResumeStateUpdates is called. func (wbsa *Aggregator) PauseStateUpdates() { wbsa.mu.Lock() defer wbsa.mu.Unlock() wbsa.pauseUpdateState = true wbsa.needUpdateStateOnResume = false } // ResumeStateUpdates will resume propagating UpdateState calls to the parent, // and call UpdateState on the parent if any UpdateState call was suppressed. func (wbsa *Aggregator) ResumeStateUpdates() { wbsa.mu.Lock() defer wbsa.mu.Unlock() wbsa.pauseUpdateState = false if wbsa.needUpdateStateOnResume { wbsa.cc.UpdateState(wbsa.build()) } } // UpdateState is called to report a balancer state change from sub-balancer. // It's usually called by the balancer group. // // It calls parent ClientConn's UpdateState with the new aggregated state. func (wbsa *Aggregator) UpdateState(id string, newState balancer.State) { wbsa.mu.Lock() defer wbsa.mu.Unlock() state, ok := wbsa.idToPickerState[id] if !ok { // All state starts with an entry in pickStateMap. If ID is not in map, // it's either removed, or never existed. return } if !(state.state.ConnectivityState == connectivity.TransientFailure && newState.ConnectivityState == connectivity.Connecting) { // If old state is TransientFailure, and new state is Connecting, don't // update the state, to prevent the aggregated state from being always // CONNECTING. Otherwise, stateToAggregate is the same as // state.ConnectivityState. wbsa.csEvltr.RecordTransition(state.stateToAggregate, newState.ConnectivityState) state.stateToAggregate = newState.ConnectivityState } state.state = newState wbsa.buildAndUpdateLocked() } // clearState Reset everything to init state (Connecting) but keep the entry in // map (to keep the weight). // // Caller must hold wbsa.mu. func (wbsa *Aggregator) clearStates() { for _, pState := range wbsa.idToPickerState { pState.state = balancer.State{ ConnectivityState: connectivity.Connecting, Picker: base.NewErrPicker(balancer.ErrNoSubConnAvailable), } pState.stateToAggregate = connectivity.Connecting } } // buildAndUpdateLocked aggregates the connectivity states of the sub-balancers, // builds a new picker and sends an update to the parent ClientConn. // // Caller must hold wbsa.mu. func (wbsa *Aggregator) buildAndUpdateLocked() { if !wbsa.started { return } if wbsa.pauseUpdateState { // If updates are paused, do not call UpdateState, but remember that we // need to call it when they are resumed. wbsa.needUpdateStateOnResume = true return } wbsa.cc.UpdateState(wbsa.build()) } // build combines sub-states into one. // // Caller must hold wbsa.mu. func (wbsa *Aggregator) build() balancer.State { wbsa.logger.Infof("Child pickers with config: %+v", wbsa.idToPickerState) // Make sure picker's return error is consistent with the aggregatedState. pickers := make([]weightedPickerState, 0, len(wbsa.idToPickerState)) switch aggState := wbsa.csEvltr.CurrentState(); aggState { case connectivity.Connecting: return balancer.State{ ConnectivityState: aggState, Picker: base.NewErrPicker(balancer.ErrNoSubConnAvailable)} case connectivity.TransientFailure: // this means that all sub-balancers are now in TransientFailure. for _, ps := range wbsa.idToPickerState { pickers = append(pickers, *ps) } return balancer.State{ ConnectivityState: aggState, Picker: newWeightedPickerGroup(pickers, wbsa.newWRR)} default: for _, ps := range wbsa.idToPickerState { if ps.stateToAggregate == connectivity.Ready { pickers = append(pickers, *ps) } } return balancer.State{ ConnectivityState: aggState, Picker: newWeightedPickerGroup(pickers, wbsa.newWRR)} } } type weightedPickerGroup struct { w wrr.WRR } // newWeightedPickerGroup takes pickers with weights, and groups them into one // picker. // // Note it only takes ready pickers. The map shouldn't contain non-ready // pickers. func newWeightedPickerGroup(readyWeightedPickers []weightedPickerState, newWRR func() wrr.WRR) *weightedPickerGroup { w := newWRR() for _, ps := range readyWeightedPickers { w.Add(ps.state.Picker, int64(ps.weight)) } return &weightedPickerGroup{ w: w, } } func (pg *weightedPickerGroup) Pick(info balancer.PickInfo) (balancer.PickResult, error) { p, ok := pg.w.Next().(balancer.Picker) if !ok { return balancer.PickResult{}, balancer.ErrNoSubConnAvailable } return p.Pick(info) }