/* * Copyright 2019 gRPC authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. * */ // Package resolver implements the xds resolver, that does LDS and RDS to find // the cluster to use. package resolver import ( "errors" "fmt" "strings" "google.golang.org/grpc/credentials" "google.golang.org/grpc/internal" "google.golang.org/grpc/internal/grpclog" "google.golang.org/grpc/internal/grpcrand" "google.golang.org/grpc/internal/grpcsync" "google.golang.org/grpc/internal/pretty" iresolver "google.golang.org/grpc/internal/resolver" "google.golang.org/grpc/resolver" "google.golang.org/grpc/xds/internal/xdsclient" "google.golang.org/grpc/xds/internal/xdsclient/bootstrap" "google.golang.org/grpc/xds/internal/xdsclient/xdsresource" ) const xdsScheme = "xds" // newBuilderForTesting creates a new xds resolver builder using a specific xds // bootstrap config, so tests can use multiple xds clients in different // ClientConns at the same time. func newBuilderForTesting(config []byte) (resolver.Builder, error) { return &xdsResolverBuilder{ newXDSClient: func() (xdsclient.XDSClient, func(), error) { return xdsclient.NewWithBootstrapContentsForTesting(config) }, }, nil } // For overriding in unittests. var newXDSClient = func() (xdsclient.XDSClient, func(), error) { return xdsclient.New() } func init() { resolver.Register(&xdsResolverBuilder{}) internal.NewXDSResolverWithConfigForTesting = newBuilderForTesting } type xdsResolverBuilder struct { newXDSClient func() (xdsclient.XDSClient, func(), error) } // Build helps implement the resolver.Builder interface. // // The xds bootstrap process is performed (and a new xds client is built) every // time an xds resolver is built. func (b *xdsResolverBuilder) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (_ resolver.Resolver, retErr error) { r := &xdsResolver{ cc: cc, closed: grpcsync.NewEvent(), updateCh: make(chan suWithError, 1), activeClusters: make(map[string]*clusterInfo), channelID: grpcrand.Uint64(), } defer func() { if retErr != nil { r.Close() } }() r.logger = prefixLogger(r) r.logger.Infof("Creating resolver for target: %+v", target) newXDSClient := newXDSClient if b.newXDSClient != nil { newXDSClient = b.newXDSClient } client, close, err := newXDSClient() if err != nil { return nil, fmt.Errorf("xds: failed to create xds-client: %v", err) } r.xdsClient = client r.xdsClientClose = close bootstrapConfig := client.BootstrapConfig() if bootstrapConfig == nil { return nil, errors.New("bootstrap configuration is empty") } // If xds credentials were specified by the user, but bootstrap configs do // not contain any certificate provider configuration, it is better to fail // right now rather than failing when attempting to create certificate // providers after receiving an CDS response with security configuration. var creds credentials.TransportCredentials switch { case opts.DialCreds != nil: creds = opts.DialCreds case opts.CredsBundle != nil: creds = opts.CredsBundle.TransportCredentials() } if xc, ok := creds.(interface{ UsesXDS() bool }); ok && xc.UsesXDS() { if len(bootstrapConfig.CertProviderConfigs) == 0 { return nil, errors.New("xds: xdsCreds specified but certificate_providers config missing in bootstrap file") } } // Find the client listener template to use from the bootstrap config: // - If authority is not set in the target, use the top level template // - If authority is set, use the template from the authority map. template := bootstrapConfig.ClientDefaultListenerResourceNameTemplate if authority := target.URL.Host; authority != "" { a := bootstrapConfig.Authorities[authority] if a == nil { return nil, fmt.Errorf("xds: authority %q is not found in the bootstrap file", authority) } if a.ClientListenerResourceNameTemplate != "" { // This check will never be false, because // ClientListenerResourceNameTemplate is required to start with // xdstp://, and has a default value (not an empty string) if unset. template = a.ClientListenerResourceNameTemplate } } endpoint := target.URL.Path if endpoint == "" { endpoint = target.URL.Opaque } endpoint = strings.TrimPrefix(endpoint, "/") r.ldsResourceName = bootstrap.PopulateResourceTemplate(template, endpoint) // Register a watch on the xdsClient for the resource name determined above. cancelWatch := watchService(r.xdsClient, r.ldsResourceName, r.handleServiceUpdate, r.logger) r.logger.Infof("Watch started on resource name %v with xds-client %p", r.ldsResourceName, r.xdsClient) r.cancelWatch = func() { cancelWatch() r.logger.Infof("Watch cancel on resource name %v with xds-client %p", r.ldsResourceName, r.xdsClient) } go r.run() return r, nil } // Name helps implement the resolver.Builder interface. func (*xdsResolverBuilder) Scheme() string { return xdsScheme } // suWithError wraps the ServiceUpdate and error received through a watch API // callback, so that it can pushed onto the update channel as a single entity. type suWithError struct { su serviceUpdate emptyUpdate bool err error } // xdsResolver implements the resolver.Resolver interface. // // It registers a watcher for ServiceConfig updates with the xdsClient object // (which performs LDS/RDS queries for the same), and passes the received // updates to the ClientConn. type xdsResolver struct { cc resolver.ClientConn closed *grpcsync.Event logger *grpclog.PrefixLogger ldsResourceName string // The underlying xdsClient which performs all xDS requests and responses. xdsClient xdsclient.XDSClient xdsClientClose func() // A channel for the watch API callback to write service updates on to. The // updates are read by the run goroutine and passed on to the ClientConn. updateCh chan suWithError // cancelWatch is the function to cancel the watcher. cancelWatch func() // activeClusters is a map from cluster name to a ref count. Only read or // written during a service update (synchronous). activeClusters map[string]*clusterInfo curConfigSelector *configSelector // A random number which uniquely identifies the channel which owns this // resolver. channelID uint64 } // sendNewServiceConfig prunes active clusters, generates a new service config // based on the current set of active clusters, and sends an update to the // channel with that service config and the provided config selector. Returns // false if an error occurs while generating the service config and the update // cannot be sent. func (r *xdsResolver) sendNewServiceConfig(cs *configSelector) bool { // Delete entries from r.activeClusters with zero references; // otherwise serviceConfigJSON will generate a config including // them. r.pruneActiveClusters() if cs == nil && len(r.activeClusters) == 0 { // There are no clusters and we are sending a failing configSelector. // Send an empty config, which picks pick-first, with no address, and // puts the ClientConn into transient failure. r.cc.UpdateState(resolver.State{ServiceConfig: r.cc.ParseServiceConfig("{}")}) return true } sc, err := serviceConfigJSON(r.activeClusters) if err != nil { // JSON marshal error; should never happen. r.logger.Errorf("%v", err) r.cc.ReportError(err) return false } r.logger.Infof("Received update on resource %v from xds-client %p, generated service config: %v", r.ldsResourceName, r.xdsClient, pretty.FormatJSON(sc)) // Send the update to the ClientConn. state := iresolver.SetConfigSelector(resolver.State{ ServiceConfig: r.cc.ParseServiceConfig(string(sc)), }, cs) r.cc.UpdateState(xdsclient.SetClient(state, r.xdsClient)) return true } // run is a long running goroutine which blocks on receiving service updates // and passes it on the ClientConn. func (r *xdsResolver) run() { for { select { case <-r.closed.Done(): return case update := <-r.updateCh: if update.err != nil { r.logger.Warningf("Watch error on resource %v from xds-client %p, %v", r.ldsResourceName, r.xdsClient, update.err) if xdsresource.ErrType(update.err) == xdsresource.ErrorTypeResourceNotFound { // If error is resource-not-found, it means the LDS // resource was removed. Ultimately send an empty service // config, which picks pick-first, with no address, and // puts the ClientConn into transient failure. Before we // can do that, we may need to send a normal service config // along with an erroring (nil) config selector. r.sendNewServiceConfig(nil) // Stop and dereference the active config selector, if one exists. r.curConfigSelector.stop() r.curConfigSelector = nil continue } // Send error to ClientConn, and balancers, if error is not // resource not found. No need to update resolver state if we // can keep using the old config. r.cc.ReportError(update.err) continue } if update.emptyUpdate { r.sendNewServiceConfig(r.curConfigSelector) continue } // Create the config selector for this update. cs, err := r.newConfigSelector(update.su) if err != nil { r.logger.Warningf("Error parsing update on resource %v from xds-client %p: %v", r.ldsResourceName, r.xdsClient, err) r.cc.ReportError(err) continue } if !r.sendNewServiceConfig(cs) { // JSON error creating the service config (unexpected); erase // this config selector and ignore this update, continuing with // the previous config selector. cs.stop() continue } // Decrement references to the old config selector and assign the // new one as the current one. r.curConfigSelector.stop() r.curConfigSelector = cs } } } // handleServiceUpdate is the callback which handles service updates. It writes // the received update to the update channel, which is picked by the run // goroutine. func (r *xdsResolver) handleServiceUpdate(su serviceUpdate, err error) { if r.closed.HasFired() { // Do not pass updates to the ClientConn once the resolver is closed. return } // Remove any existing entry in updateCh and replace with the new one. select { case <-r.updateCh: default: } r.updateCh <- suWithError{su: su, err: err} } // ResolveNow is a no-op at this point. func (*xdsResolver) ResolveNow(o resolver.ResolveNowOptions) {} // Close closes the resolver, and also closes the underlying xdsClient. func (r *xdsResolver) Close() { // Note that Close needs to check for nils even if some of them are always // set in the constructor. This is because the constructor defers Close() in // error cases, and the fields might not be set when the error happens. if r.cancelWatch != nil { r.cancelWatch() } if r.xdsClientClose != nil { r.xdsClientClose() } r.closed.Fire() r.logger.Infof("Shutdown") }