/* * * Copyright 2020 gRPC authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. * */ package client import ( "fmt" "sync" "time" ) type watchInfoState int const ( watchInfoStateStarted watchInfoState = iota watchInfoStateRespReceived watchInfoStateTimeout watchInfoStateCanceled ) // watchInfo holds all the information from a watch() call. type watchInfo struct { c *clientImpl rType ResourceType target string ldsCallback func(ListenerUpdate, error) rdsCallback func(RouteConfigUpdate, error) cdsCallback func(ClusterUpdate, error) edsCallback func(EndpointsUpdate, error) expiryTimer *time.Timer // mu protects state, and c.scheduleCallback(). // - No callback should be scheduled after watchInfo is canceled. // - No timeout error should be scheduled after watchInfo is resp received. mu sync.Mutex state watchInfoState } func (wi *watchInfo) newUpdate(update interface{}) { wi.mu.Lock() defer wi.mu.Unlock() if wi.state == watchInfoStateCanceled { return } wi.state = watchInfoStateRespReceived wi.expiryTimer.Stop() wi.c.scheduleCallback(wi, update, nil) } func (wi *watchInfo) resourceNotFound() { wi.mu.Lock() defer wi.mu.Unlock() if wi.state == watchInfoStateCanceled { return } wi.state = watchInfoStateRespReceived wi.expiryTimer.Stop() wi.sendErrorLocked(NewErrorf(ErrorTypeResourceNotFound, "xds: %v target %s not found in received response", wi.rType, wi.target)) } func (wi *watchInfo) timeout() { wi.mu.Lock() defer wi.mu.Unlock() if wi.state == watchInfoStateCanceled || wi.state == watchInfoStateRespReceived { return } wi.state = watchInfoStateTimeout wi.sendErrorLocked(fmt.Errorf("xds: %v target %s not found, watcher timeout", wi.rType, wi.target)) } // Caller must hold wi.mu. func (wi *watchInfo) sendErrorLocked(err error) { var ( u interface{} ) switch wi.rType { case ListenerResource: u = ListenerUpdate{} case RouteConfigResource: u = RouteConfigUpdate{} case ClusterResource: u = ClusterUpdate{} case EndpointsResource: u = EndpointsUpdate{} } wi.c.scheduleCallback(wi, u, err) } func (wi *watchInfo) cancel() { wi.mu.Lock() defer wi.mu.Unlock() if wi.state == watchInfoStateCanceled { return } wi.expiryTimer.Stop() wi.state = watchInfoStateCanceled } func (c *clientImpl) watch(wi *watchInfo) (cancel func()) { c.mu.Lock() defer c.mu.Unlock() c.logger.Debugf("new watch for type %v, resource name %v", wi.rType, wi.target) var watchers map[string]map[*watchInfo]bool switch wi.rType { case ListenerResource: watchers = c.ldsWatchers case RouteConfigResource: watchers = c.rdsWatchers case ClusterResource: watchers = c.cdsWatchers case EndpointsResource: watchers = c.edsWatchers } resourceName := wi.target s, ok := watchers[wi.target] if !ok { // If this is a new watcher, will ask lower level to send a new request // with the resource name. // // If this (type+name) is already being watched, will not notify the // underlying versioned apiClient. c.logger.Debugf("first watch for type %v, resource name %v, will send a new xDS request", wi.rType, wi.target) s = make(map[*watchInfo]bool) watchers[resourceName] = s c.apiClient.AddWatch(wi.rType, resourceName) } // No matter what, add the new watcher to the set, so it's callback will be // call for new responses. s[wi] = true // If the resource is in cache, call the callback with the value. switch wi.rType { case ListenerResource: if v, ok := c.ldsCache[resourceName]; ok { c.logger.Debugf("LDS resource with name %v found in cache: %+v", wi.target, v) wi.newUpdate(v) } case RouteConfigResource: if v, ok := c.rdsCache[resourceName]; ok { c.logger.Debugf("RDS resource with name %v found in cache: %+v", wi.target, v) wi.newUpdate(v) } case ClusterResource: if v, ok := c.cdsCache[resourceName]; ok { c.logger.Debugf("CDS resource with name %v found in cache: %+v", wi.target, v) wi.newUpdate(v) } case EndpointsResource: if v, ok := c.edsCache[resourceName]; ok { c.logger.Debugf("EDS resource with name %v found in cache: %+v", wi.target, v) wi.newUpdate(v) } } return func() { c.logger.Debugf("watch for type %v, resource name %v canceled", wi.rType, wi.target) wi.cancel() c.mu.Lock() defer c.mu.Unlock() if s := watchers[resourceName]; s != nil { // Remove this watcher, so it's callback will not be called in the // future. delete(s, wi) if len(s) == 0 { c.logger.Debugf("last watch for type %v, resource name %v canceled, will send a new xDS request", wi.rType, wi.target) // If this was the last watcher, also tell xdsv2Client to stop // watching this resource. delete(watchers, resourceName) c.apiClient.RemoveWatch(wi.rType, resourceName) // Remove the resource from cache. When a watch for this // resource is added later, it will trigger a xDS request with // resource names, and client will receive new xDS responses. switch wi.rType { case ListenerResource: delete(c.ldsCache, resourceName) case RouteConfigResource: delete(c.rdsCache, resourceName) case ClusterResource: delete(c.cdsCache, resourceName) case EndpointsResource: delete(c.edsCache, resourceName) } } } } } // WatchListener uses LDS to discover information about the provided listener. // // Note that during race (e.g. an xDS response is received while the user is // calling cancel()), there's a small window where the callback can be called // after the watcher is canceled. The caller needs to handle this case. func (c *clientImpl) WatchListener(serviceName string, cb func(ListenerUpdate, error)) (cancel func()) { wi := &watchInfo{ c: c, rType: ListenerResource, target: serviceName, ldsCallback: cb, } wi.expiryTimer = time.AfterFunc(c.watchExpiryTimeout, func() { wi.timeout() }) return c.watch(wi) } // WatchRouteConfig starts a listener watcher for the service.. // // Note that during race (e.g. an xDS response is received while the user is // calling cancel()), there's a small window where the callback can be called // after the watcher is canceled. The caller needs to handle this case. func (c *clientImpl) WatchRouteConfig(routeName string, cb func(RouteConfigUpdate, error)) (cancel func()) { wi := &watchInfo{ c: c, rType: RouteConfigResource, target: routeName, rdsCallback: cb, } wi.expiryTimer = time.AfterFunc(c.watchExpiryTimeout, func() { wi.timeout() }) return c.watch(wi) } // WatchCluster uses CDS to discover information about the provided // clusterName. // // WatchCluster can be called multiple times, with same or different // clusterNames. Each call will start an independent watcher for the resource. // // Note that during race (e.g. an xDS response is received while the user is // calling cancel()), there's a small window where the callback can be called // after the watcher is canceled. The caller needs to handle this case. func (c *clientImpl) WatchCluster(clusterName string, cb func(ClusterUpdate, error)) (cancel func()) { wi := &watchInfo{ c: c, rType: ClusterResource, target: clusterName, cdsCallback: cb, } wi.expiryTimer = time.AfterFunc(c.watchExpiryTimeout, func() { wi.timeout() }) return c.watch(wi) } // WatchEndpoints uses EDS to discover endpoints in the provided clusterName. // // WatchEndpoints can be called multiple times, with same or different // clusterNames. Each call will start an independent watcher for the resource. // // Note that during race (e.g. an xDS response is received while the user is // calling cancel()), there's a small window where the callback can be called // after the watcher is canceled. The caller needs to handle this case. func (c *clientImpl) WatchEndpoints(clusterName string, cb func(EndpointsUpdate, error)) (cancel func()) { wi := &watchInfo{ c: c, rType: EndpointsResource, target: clusterName, edsCallback: cb, } wi.expiryTimer = time.AfterFunc(c.watchExpiryTimeout, func() { wi.timeout() }) return c.watch(wi) }