/* * Copyright 2022 gRPC authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ // Package orca implements Open Request Cost Aggregation, which is an open // standard for request cost aggregation and reporting by backends and the // corresponding aggregation of such reports by L7 load balancers (such as // Envoy) on the data plane. In a proxyless world with gRPC enabled // applications, aggregation of such reports will be done by the gRPC client. // // # Experimental // // Notice: All APIs is this package are EXPERIMENTAL and may be changed or // removed in a later release. package orca import ( "context" "errors" "fmt" "google.golang.org/grpc" "google.golang.org/grpc/grpclog" "google.golang.org/grpc/internal" "google.golang.org/grpc/internal/balancerload" "google.golang.org/grpc/metadata" "google.golang.org/protobuf/proto" v3orcapb "github.com/cncf/xds/go/xds/data/orca/v3" ) var ( logger = grpclog.Component("orca-backend-metrics") joinServerOptions = internal.JoinServerOptions.(func(...grpc.ServerOption) grpc.ServerOption) ) const trailerMetadataKey = "endpoint-load-metrics-bin" // CallMetricsServerOption returns a server option which enables the reporting // of per-RPC custom backend metrics for unary and streaming RPCs. // // Server applications interested in injecting custom backend metrics should // pass the server option returned from this function as the first argument to // grpc.NewServer(). // // Subsequently, server RPC handlers can retrieve a reference to the RPC // specific custom metrics recorder [CallMetricRecorder] to be used, via a call // to CallMetricRecorderFromContext(), and inject custom metrics at any time // during the RPC lifecycle. // // The injected custom metrics will be sent as part of trailer metadata, as a // binary-encoded [ORCA LoadReport] protobuf message, with the metadata key // being set be "endpoint-load-metrics-bin". // // [ORCA LoadReport]: https://github.com/cncf/xds/blob/main/xds/data/orca/v3/orca_load_report.proto#L15 func CallMetricsServerOption() grpc.ServerOption { return joinServerOptions(grpc.ChainUnaryInterceptor(unaryInt), grpc.ChainStreamInterceptor(streamInt)) } func unaryInt(ctx context.Context, req interface{}, _ *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) { // We don't allocate the metric recorder here. It will be allocated the // first time the user calls CallMetricRecorderFromContext(). rw := &recorderWrapper{} ctxWithRecorder := newContextWithRecorderWrapper(ctx, rw) resp, err := handler(ctxWithRecorder, req) // It is safe to access the underlying metric recorder inside the wrapper at // this point, as the user's RPC handler is done executing, and therefore // there will be no more calls to CallMetricRecorderFromContext(), which is // where the metric recorder is lazy allocated. if rw.r == nil { return resp, err } setTrailerMetadata(ctx, rw.r) return resp, err } func streamInt(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error { // We don't allocate the metric recorder here. It will be allocated the // first time the user calls CallMetricRecorderFromContext(). rw := &recorderWrapper{} ws := &wrappedStream{ ServerStream: ss, ctx: newContextWithRecorderWrapper(ss.Context(), rw), } err := handler(srv, ws) // It is safe to access the underlying metric recorder inside the wrapper at // this point, as the user's RPC handler is done executing, and therefore // there will be no more calls to CallMetricRecorderFromContext(), which is // where the metric recorder is lazy allocated. if rw.r == nil { return err } setTrailerMetadata(ss.Context(), rw.r) return err } // setTrailerMetadata adds a trailer metadata entry with key being set to // `trailerMetadataKey` and value being set to the binary-encoded // orca.OrcaLoadReport protobuf message. // // This function is called from the unary and streaming interceptors defined // above. Any errors encountered here are not propagated to the caller because // they are ignored there. Hence we simply log any errors encountered here at // warning level, and return nothing. func setTrailerMetadata(ctx context.Context, r *CallMetricRecorder) { b, err := proto.Marshal(r.toLoadReportProto()) if err != nil { logger.Warningf("failed to marshal load report: %v", err) return } if err := grpc.SetTrailer(ctx, metadata.Pairs(trailerMetadataKey, string(b))); err != nil { logger.Warningf("failed to set trailer metadata: %v", err) } } // wrappedStream wraps the grpc.ServerStream received by the streaming // interceptor. Overrides only the Context() method to return a context which // contains a reference to the CallMetricRecorder corresponding to this stream. type wrappedStream struct { grpc.ServerStream ctx context.Context } func (w *wrappedStream) Context() context.Context { return w.ctx } // ErrLoadReportMissing indicates no ORCA load report was found in trailers. var ErrLoadReportMissing = errors.New("orca load report missing in provided metadata") // ToLoadReport unmarshals a binary encoded [ORCA LoadReport] protobuf message // from md and returns the corresponding struct. The load report is expected to // be stored as the value for key "endpoint-load-metrics-bin". // // If no load report was found in the provided metadata, ErrLoadReportMissing is // returned. // // [ORCA LoadReport]: (https://github.com/cncf/xds/blob/main/xds/data/orca/v3/orca_load_report.proto#L15) func ToLoadReport(md metadata.MD) (*v3orcapb.OrcaLoadReport, error) { vs := md.Get(trailerMetadataKey) if len(vs) == 0 { return nil, ErrLoadReportMissing } ret := new(v3orcapb.OrcaLoadReport) if err := proto.Unmarshal([]byte(vs[0]), ret); err != nil { return nil, fmt.Errorf("failed to unmarshal load report found in metadata: %v", err) } return ret, nil } // loadParser implements the Parser interface defined in `internal/balancerload` // package. This interface is used by the client stream to parse load reports // sent by the server in trailer metadata. The parsed loads are then sent to // balancers via balancer.DoneInfo. // // The grpc package cannot directly call orca.ToLoadReport() as that would cause // an import cycle. Hence this roundabout method is used. type loadParser struct{} func (loadParser) Parse(md metadata.MD) interface{} { lr, err := ToLoadReport(md) if err != nil { logger.Errorf("Parse(%v) failed: %v", err) } return lr } func init() { balancerload.SetParser(loadParser{}) }