/* * * Copyright 2022 gRPC authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. * */ // Package fakegrpclb provides a fake implementation of the grpclb server. package fakegrpclb import ( "errors" "fmt" "io" "net" "strconv" "sync" "time" "google.golang.org/grpc" lbgrpc "google.golang.org/grpc/balancer/grpclb/grpc_lb_v1" lbpb "google.golang.org/grpc/balancer/grpclb/grpc_lb_v1" "google.golang.org/grpc/codes" "google.golang.org/grpc/grpclog" "google.golang.org/grpc/internal/pretty" "google.golang.org/grpc/status" ) var logger = grpclog.Component("fake_grpclb") // ServerParams wraps options passed while creating a Server. type ServerParams struct { ListenPort int // Listening port for the balancer server. ServerOptions []grpc.ServerOption // gRPC options for the balancer server. LoadBalancedServiceName string // Service name being load balanced for. LoadBalancedServicePort int // Service port being load balanced for. BackendAddresses []string // Service backends to balance load across. ShortStream bool // End balancer stream after sending server list. } // Server is a fake implementation of the grpclb LoadBalancer service. It does // not support stats reporting from clients, and always sends back a static list // of backends to the client to balance load across. // // It is safe for concurrent access. type Server struct { lbgrpc.UnimplementedLoadBalancerServer // Options copied over from ServerParams passed to NewServer. sOpts []grpc.ServerOption // gRPC server options. serviceName string // Service name being load balanced for. servicePort int // Service port being load balanced for. shortStream bool // End balancer stream after sending server list. // Values initialized using ServerParams passed to NewServer. backends []*lbpb.Server // Service backends to balance load across. lis net.Listener // Listener for grpc connections to the LoadBalancer service. // mu guards access to below fields. mu sync.Mutex grpcServer *grpc.Server // Underlying grpc server. address string // Actual listening address. stopped chan struct{} // Closed when Stop() is called. } // NewServer creates a new Server with passed in params. Returns a non-nil error // if the params are invalid. func NewServer(params ServerParams) (*Server, error) { var servers []*lbpb.Server for _, addr := range params.BackendAddresses { ipStr, portStr, err := net.SplitHostPort(addr) if err != nil { return nil, fmt.Errorf("failed to parse list of backend address %q: %v", addr, err) } ip := net.ParseIP(ipStr) if ip == nil { return nil, fmt.Errorf("failed to parse ip: %q", ipStr) } port, err := strconv.Atoi(portStr) if err != nil { return nil, fmt.Errorf("failed to convert port %q to int", portStr) } logger.Infof("Adding backend ip: %q, port: %d to server list", ip.String(), port) servers = append(servers, &lbpb.Server{ IpAddress: ip, Port: int32(port), }) } lis, err := net.Listen("tcp", "localhost:"+strconv.Itoa(params.ListenPort)) if err != nil { return nil, fmt.Errorf("failed to listen on port %q: %v", params.ListenPort, err) } return &Server{ sOpts: params.ServerOptions, serviceName: params.LoadBalancedServiceName, servicePort: params.LoadBalancedServicePort, shortStream: params.ShortStream, backends: servers, lis: lis, address: lis.Addr().String(), stopped: make(chan struct{}), }, nil } // Serve starts serving the LoadBalancer service on a gRPC server. // // It returns early with a non-nil error if it is unable to start serving. // Otherwise, it blocks until Stop() is called, at which point it returns the // error returned by the underlying grpc.Server's Serve() method. func (s *Server) Serve() error { s.mu.Lock() if s.grpcServer != nil { s.mu.Unlock() return errors.New("Serve() called multiple times") } server := grpc.NewServer(s.sOpts...) s.grpcServer = server s.mu.Unlock() logger.Infof("Begin listening on %s", s.lis.Addr().String()) lbgrpc.RegisterLoadBalancerServer(server, s) return server.Serve(s.lis) // This call will block. } // Stop stops serving the LoadBalancer service and unblocks the preceding call // to Serve(). func (s *Server) Stop() { defer close(s.stopped) s.mu.Lock() if s.grpcServer != nil { s.grpcServer.Stop() s.grpcServer = nil } s.mu.Unlock() } // Address returns the host:port on which the LoadBalancer service is serving. func (s *Server) Address() string { s.mu.Lock() defer s.mu.Unlock() return s.address } // BalanceLoad provides a fake implementation of the LoadBalancer service. func (s *Server) BalanceLoad(stream lbgrpc.LoadBalancer_BalanceLoadServer) error { logger.Info("New BalancerLoad stream started") req, err := stream.Recv() if err == io.EOF { logger.Warning("Received EOF when reading from the stream") return nil } if err != nil { logger.Warning("Failed to read LoadBalanceRequest from stream: %v", err) return err } logger.Infof("Received LoadBalancerRequest:\n%s", pretty.ToJSON(req)) // Initial request contains the service being load balanced for. initialReq := req.GetInitialRequest() if initialReq == nil { logger.Info("First message on the stream does not contain an InitialLoadBalanceRequest") return status.Error(codes.Unknown, "First request not an InitialLoadBalanceRequest") } // Basic validation of the service name and port from the incoming request. // // Clients targeting service:port can sometimes include the ":port" suffix in // their requested names; handle this case. serviceName, port, err := net.SplitHostPort(initialReq.Name) if err != nil { // Requested name did not contain a port. So, use the name as is. serviceName = initialReq.Name } else { p, err := strconv.Atoi(port) if err != nil { logger.Info("Failed to parse requested service port %q to integer", port) return status.Error(codes.Unknown, "Bad requested service port number") } if p != s.servicePort { logger.Info("Requested service port number %q does not match expected", port, s.servicePort) return status.Error(codes.Unknown, "Bad requested service port number") } } if serviceName != s.serviceName { logger.Info("Requested service name %q does not match expected %q", serviceName, s.serviceName) return status.Error(codes.NotFound, "Bad requested service name") } // Empty initial response disables stats reporting from the client. Stats // reporting from the client is used to determine backend load and is not // required for the purposes of this fake. initResp := &lbpb.LoadBalanceResponse{ LoadBalanceResponseType: &lbpb.LoadBalanceResponse_InitialResponse{ InitialResponse: &lbpb.InitialLoadBalanceResponse{}, }, } if err := stream.Send(initResp); err != nil { logger.Warningf("Failed to send InitialLoadBalanceResponse on the stream: %v", err) return err } resp := &lbpb.LoadBalanceResponse{ LoadBalanceResponseType: &lbpb.LoadBalanceResponse_ServerList{ ServerList: &lbpb.ServerList{Servers: s.backends}, }, } logger.Infof("Sending response with server list: %s", pretty.ToJSON(resp)) if err := stream.Send(resp); err != nil { logger.Warningf("Failed to send InitialLoadBalanceResponse on the stream: %v", err) return err } if s.shortStream { logger.Info("Ending stream early as the short stream option was set") return nil } for { select { case <-stream.Context().Done(): return nil case <-s.stopped: return nil case <-time.After(10 * time.Second): logger.Infof("Sending response with server list: %s", pretty.ToJSON(resp)) if err := stream.Send(resp); err != nil { logger.Warningf("Failed to send InitialLoadBalanceResponse on the stream: %v", err) return err } } } }