package grpc_logrus import ( "bytes" "context" "fmt" "github.com/golang/protobuf/jsonpb" "github.com/golang/protobuf/proto" "github.com/grpc-ecosystem/go-grpc-middleware/logging" "github.com/grpc-ecosystem/go-grpc-middleware/logging/logrus/ctxlogrus" "github.com/sirupsen/logrus" "google.golang.org/grpc" ) var ( // JsonPbMarshaller is the marshaller used for serializing protobuf messages. // If needed, this variable can be reassigned with a different marshaller with the same Marshal() signature. JsonPbMarshaller grpc_logging.JsonPbMarshaler = &jsonpb.Marshaler{} ) // PayloadUnaryServerInterceptor returns a new unary server interceptors that logs the payloads of requests. // // This *only* works when placed *after* the `grpc_logrus.UnaryServerInterceptor`. However, the logging can be done to a // separate instance of the logger. func PayloadUnaryServerInterceptor(entry *logrus.Entry, decider grpc_logging.ServerPayloadLoggingDecider) grpc.UnaryServerInterceptor { return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) { if !decider(ctx, info.FullMethod, info.Server) { return handler(ctx, req) } // Use the provided logrus.Entry for logging but use the fields from context. logEntry := entry.WithFields(ctxlogrus.Extract(ctx).Data) logProtoMessageAsJson(logEntry, req, "grpc.request.content", "server request payload logged as grpc.request.content field") resp, err := handler(ctx, req) if err == nil { logProtoMessageAsJson(logEntry, resp, "grpc.response.content", "server response payload logged as grpc.request.content field") } return resp, err } } // PayloadStreamServerInterceptor returns a new server server interceptors that logs the payloads of requests. // // This *only* works when placed *after* the `grpc_logrus.StreamServerInterceptor`. However, the logging can be done to a // separate instance of the logger. func PayloadStreamServerInterceptor(entry *logrus.Entry, decider grpc_logging.ServerPayloadLoggingDecider) grpc.StreamServerInterceptor { return func(srv interface{}, stream grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error { if !decider(stream.Context(), info.FullMethod, srv) { return handler(srv, stream) } // Use the provided logrus.Entry for logging but use the fields from context. logEntry := entry.WithFields(ctxlogrus.Extract(stream.Context()).Data) newStream := &loggingServerStream{ServerStream: stream, entry: logEntry} return handler(srv, newStream) } } // PayloadUnaryClientInterceptor returns a new unary client interceptor that logs the payloads of requests and responses. func PayloadUnaryClientInterceptor(entry *logrus.Entry, decider grpc_logging.ClientPayloadLoggingDecider) grpc.UnaryClientInterceptor { return func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error { if !decider(ctx, method) { return invoker(ctx, method, req, reply, cc, opts...) } logEntry := entry.WithFields(newClientLoggerFields(ctx, method)) logProtoMessageAsJson(logEntry, req, "grpc.request.content", "client request payload logged as grpc.request.content") err := invoker(ctx, method, req, reply, cc, opts...) if err == nil { logProtoMessageAsJson(logEntry, reply, "grpc.response.content", "client response payload logged as grpc.response.content") } return err } } // PayloadStreamClientInterceptor returns a new streaming client interceptor that logs the payloads of requests and responses. func PayloadStreamClientInterceptor(entry *logrus.Entry, decider grpc_logging.ClientPayloadLoggingDecider) grpc.StreamClientInterceptor { return func(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) { if !decider(ctx, method) { return streamer(ctx, desc, cc, method, opts...) } logEntry := entry.WithFields(newClientLoggerFields(ctx, method)) clientStream, err := streamer(ctx, desc, cc, method, opts...) newStream := &loggingClientStream{ClientStream: clientStream, entry: logEntry} return newStream, err } } type loggingClientStream struct { grpc.ClientStream entry *logrus.Entry } func (l *loggingClientStream) SendMsg(m interface{}) error { err := l.ClientStream.SendMsg(m) if err == nil { logProtoMessageAsJson(l.entry, m, "grpc.request.content", "server request payload logged as grpc.request.content field") } return err } func (l *loggingClientStream) RecvMsg(m interface{}) error { err := l.ClientStream.RecvMsg(m) if err == nil { logProtoMessageAsJson(l.entry, m, "grpc.response.content", "server response payload logged as grpc.response.content field") } return err } type loggingServerStream struct { grpc.ServerStream entry *logrus.Entry } func (l *loggingServerStream) SendMsg(m interface{}) error { err := l.ServerStream.SendMsg(m) if err == nil { logProtoMessageAsJson(l.entry, m, "grpc.response.content", "server response payload logged as grpc.response.content field") } return err } func (l *loggingServerStream) RecvMsg(m interface{}) error { err := l.ServerStream.RecvMsg(m) if err == nil { logProtoMessageAsJson(l.entry, m, "grpc.request.content", "server request payload logged as grpc.request.content field") } return err } func logProtoMessageAsJson(entry *logrus.Entry, pbMsg interface{}, key string, msg string) { if p, ok := pbMsg.(proto.Message); ok { entry.WithField(key, &jsonpbMarshalleble{p}).Info(msg) } } type jsonpbMarshalleble struct { proto.Message } func (j *jsonpbMarshalleble) MarshalJSON() ([]byte, error) { b := &bytes.Buffer{} if err := JsonPbMarshaller.Marshal(b, j.Message); err != nil { return nil, fmt.Errorf("jsonpb serializer failed: %v", err) } return b.Bytes(), nil }