package addtransport import ( "context" "errors" "time" "google.golang.org/grpc" stdopentracing "github.com/opentracing/opentracing-go" stdzipkin "github.com/openzipkin/zipkin-go" "github.com/sony/gobreaker" "golang.org/x/time/rate" "github.com/go-kit/kit/circuitbreaker" "github.com/go-kit/kit/endpoint" "github.com/go-kit/kit/log" "github.com/go-kit/kit/ratelimit" "github.com/go-kit/kit/tracing/opentracing" "github.com/go-kit/kit/tracing/zipkin" "github.com/go-kit/kit/transport" grpctransport "github.com/go-kit/kit/transport/grpc" "github.com/go-kit/kit/examples/addsvc/pb" "github.com/go-kit/kit/examples/addsvc/pkg/addendpoint" "github.com/go-kit/kit/examples/addsvc/pkg/addservice" ) type grpcServer struct { sum grpctransport.Handler concat grpctransport.Handler } // NewGRPCServer makes a set of endpoints available as a gRPC AddServer. func NewGRPCServer(endpoints addendpoint.Set, otTracer stdopentracing.Tracer, zipkinTracer *stdzipkin.Tracer, logger log.Logger) pb.AddServer { // Zipkin GRPC Server Trace can either be instantiated per gRPC method with a // provided operation name or a global tracing service can be instantiated // without an operation name and fed to each Go kit gRPC server as a // ServerOption. // In the latter case, the operation name will be the endpoint's grpc method // path if used in combination with the Go kit gRPC Interceptor. // // In this example, we demonstrate a global Zipkin tracing service with // Go kit gRPC Interceptor. zipkinServer := zipkin.GRPCServerTrace(zipkinTracer) options := []grpctransport.ServerOption{ grpctransport.ServerErrorHandler(transport.NewLogErrorHandler(logger)), zipkinServer, } return &grpcServer{ sum: grpctransport.NewServer( endpoints.SumEndpoint, decodeGRPCSumRequest, encodeGRPCSumResponse, append(options, grpctransport.ServerBefore(opentracing.GRPCToContext(otTracer, "Sum", logger)))..., ), concat: grpctransport.NewServer( endpoints.ConcatEndpoint, decodeGRPCConcatRequest, encodeGRPCConcatResponse, append(options, grpctransport.ServerBefore(opentracing.GRPCToContext(otTracer, "Concat", logger)))..., ), } } func (s *grpcServer) Sum(ctx context.Context, req *pb.SumRequest) (*pb.SumReply, error) { _, rep, err := s.sum.ServeGRPC(ctx, req) if err != nil { return nil, err } return rep.(*pb.SumReply), nil } func (s *grpcServer) Concat(ctx context.Context, req *pb.ConcatRequest) (*pb.ConcatReply, error) { _, rep, err := s.concat.ServeGRPC(ctx, req) if err != nil { return nil, err } return rep.(*pb.ConcatReply), nil } // NewGRPCClient returns an AddService backed by a gRPC server at the other end // of the conn. The caller is responsible for constructing the conn, and // eventually closing the underlying transport. We bake-in certain middlewares, // implementing the client library pattern. func NewGRPCClient(conn *grpc.ClientConn, otTracer stdopentracing.Tracer, zipkinTracer *stdzipkin.Tracer, logger log.Logger) addservice.Service { // We construct a single ratelimiter middleware, to limit the total outgoing // QPS from this client to all methods on the remote instance. We also // construct per-endpoint circuitbreaker middlewares to demonstrate how // that's done, although they could easily be combined into a single breaker // for the entire remote instance, too. limiter := ratelimit.NewErroringLimiter(rate.NewLimiter(rate.Every(time.Second), 100)) // Zipkin GRPC Client Trace can either be instantiated per gRPC method with a // provided operation name or a global tracing client can be instantiated // without an operation name and fed to each Go kit client as ClientOption. // In the latter case, the operation name will be the endpoint's grpc method // path. // // In this example, we demonstrace a global tracing client. zipkinClient := zipkin.GRPCClientTrace(zipkinTracer) // global client middlewares options := []grpctransport.ClientOption{ zipkinClient, } // Each individual endpoint is an grpc/transport.Client (which implements // endpoint.Endpoint) that gets wrapped with various middlewares. If you // made your own client library, you'd do this work there, so your server // could rely on a consistent set of client behavior. var sumEndpoint endpoint.Endpoint { sumEndpoint = grpctransport.NewClient( conn, "pb.Add", "Sum", encodeGRPCSumRequest, decodeGRPCSumResponse, pb.SumReply{}, append(options, grpctransport.ClientBefore(opentracing.ContextToGRPC(otTracer, logger)))..., ).Endpoint() sumEndpoint = opentracing.TraceClient(otTracer, "Sum")(sumEndpoint) sumEndpoint = limiter(sumEndpoint) sumEndpoint = circuitbreaker.Gobreaker(gobreaker.NewCircuitBreaker(gobreaker.Settings{ Name: "Sum", Timeout: 30 * time.Second, }))(sumEndpoint) } // The Concat endpoint is the same thing, with slightly different // middlewares to demonstrate how to specialize per-endpoint. var concatEndpoint endpoint.Endpoint { concatEndpoint = grpctransport.NewClient( conn, "pb.Add", "Concat", encodeGRPCConcatRequest, decodeGRPCConcatResponse, pb.ConcatReply{}, append(options, grpctransport.ClientBefore(opentracing.ContextToGRPC(otTracer, logger)))..., ).Endpoint() concatEndpoint = opentracing.TraceClient(otTracer, "Concat")(concatEndpoint) concatEndpoint = limiter(concatEndpoint) concatEndpoint = circuitbreaker.Gobreaker(gobreaker.NewCircuitBreaker(gobreaker.Settings{ Name: "Concat", Timeout: 10 * time.Second, }))(concatEndpoint) } // Returning the endpoint.Set as a service.Service relies on the // endpoint.Set implementing the Service methods. That's just a simple bit // of glue code. return addendpoint.Set{ SumEndpoint: sumEndpoint, ConcatEndpoint: concatEndpoint, } } // decodeGRPCSumRequest is a transport/grpc.DecodeRequestFunc that converts a // gRPC sum request to a user-domain sum request. Primarily useful in a server. func decodeGRPCSumRequest(_ context.Context, grpcReq interface{}) (interface{}, error) { req := grpcReq.(*pb.SumRequest) return addendpoint.SumRequest{A: int(req.A), B: int(req.B)}, nil } // decodeGRPCConcatRequest is a transport/grpc.DecodeRequestFunc that converts a // gRPC concat request to a user-domain concat request. Primarily useful in a // server. func decodeGRPCConcatRequest(_ context.Context, grpcReq interface{}) (interface{}, error) { req := grpcReq.(*pb.ConcatRequest) return addendpoint.ConcatRequest{A: req.A, B: req.B}, nil } // decodeGRPCSumResponse is a transport/grpc.DecodeResponseFunc that converts a // gRPC sum reply to a user-domain sum response. Primarily useful in a client. func decodeGRPCSumResponse(_ context.Context, grpcReply interface{}) (interface{}, error) { reply := grpcReply.(*pb.SumReply) return addendpoint.SumResponse{V: int(reply.V), Err: str2err(reply.Err)}, nil } // decodeGRPCConcatResponse is a transport/grpc.DecodeResponseFunc that converts // a gRPC concat reply to a user-domain concat response. Primarily useful in a // client. func decodeGRPCConcatResponse(_ context.Context, grpcReply interface{}) (interface{}, error) { reply := grpcReply.(*pb.ConcatReply) return addendpoint.ConcatResponse{V: reply.V, Err: str2err(reply.Err)}, nil } // encodeGRPCSumResponse is a transport/grpc.EncodeResponseFunc that converts a // user-domain sum response to a gRPC sum reply. Primarily useful in a server. func encodeGRPCSumResponse(_ context.Context, response interface{}) (interface{}, error) { resp := response.(addendpoint.SumResponse) return &pb.SumReply{V: int64(resp.V), Err: err2str(resp.Err)}, nil } // encodeGRPCConcatResponse is a transport/grpc.EncodeResponseFunc that converts // a user-domain concat response to a gRPC concat reply. Primarily useful in a // server. func encodeGRPCConcatResponse(_ context.Context, response interface{}) (interface{}, error) { resp := response.(addendpoint.ConcatResponse) return &pb.ConcatReply{V: resp.V, Err: err2str(resp.Err)}, nil } // encodeGRPCSumRequest is a transport/grpc.EncodeRequestFunc that converts a // user-domain sum request to a gRPC sum request. Primarily useful in a client. func encodeGRPCSumRequest(_ context.Context, request interface{}) (interface{}, error) { req := request.(addendpoint.SumRequest) return &pb.SumRequest{A: int64(req.A), B: int64(req.B)}, nil } // encodeGRPCConcatRequest is a transport/grpc.EncodeRequestFunc that converts a // user-domain concat request to a gRPC concat request. Primarily useful in a // client. func encodeGRPCConcatRequest(_ context.Context, request interface{}) (interface{}, error) { req := request.(addendpoint.ConcatRequest) return &pb.ConcatRequest{A: req.A, B: req.B}, nil } // These annoying helper functions are required to translate Go error types to // and from strings, which is the type we use in our IDLs to represent errors. // There is special casing to treat empty strings as nil errors. func str2err(s string) error { if s == "" { return nil } return errors.New(s) } func err2str(err error) string { if err == nil { return "" } return err.Error() }