package addtransport import ( "context" "encoding/json" "fmt" "net/url" "strings" "time" "golang.org/x/time/rate" "github.com/go-kit/kit/circuitbreaker" "github.com/go-kit/kit/endpoint" "github.com/go-kit/kit/examples/addsvc/pkg/addendpoint" "github.com/go-kit/kit/examples/addsvc/pkg/addservice" "github.com/go-kit/kit/log" "github.com/go-kit/kit/ratelimit" "github.com/go-kit/kit/tracing/opentracing" "github.com/go-kit/kit/transport/http/jsonrpc" stdopentracing "github.com/opentracing/opentracing-go" "github.com/sony/gobreaker" ) // NewJSONRPCHandler returns a JSON RPC Server/Handler that can be passed to http.Handle() func NewJSONRPCHandler(endpoints addendpoint.Set, logger log.Logger) *jsonrpc.Server { handler := jsonrpc.NewServer( makeEndpointCodecMap(endpoints), jsonrpc.ServerErrorLogger(logger), ) return handler } // NewJSONRPCClient returns an addservice backed by a JSON RPC over HTTP server // living at the remote instance. We expect instance to come from a service // discovery system, so likely of the form "host:port". We bake-in certain // middlewares, implementing the client library pattern. func NewJSONRPCClient(instance string, tracer stdopentracing.Tracer, logger log.Logger) (addservice.Service, error) { // Quickly sanitize the instance string. if !strings.HasPrefix(instance, "http") { instance = "http://" + instance } u, err := url.Parse(instance) if err != nil { return nil, err } // We construct a single ratelimiter middleware, to limit the total outgoing // QPS from this client to all methods on the remote instance. We also // construct per-endpoint circuitbreaker middlewares to demonstrate how // that's done, although they could easily be combined into a single breaker // for the entire remote instance, too. limiter := ratelimit.NewErroringLimiter(rate.NewLimiter(rate.Every(time.Second), 100)) var sumEndpoint endpoint.Endpoint { sumEndpoint = jsonrpc.NewClient( u, "sum", jsonrpc.ClientRequestEncoder(encodeSumRequest), jsonrpc.ClientResponseDecoder(decodeSumResponse), ).Endpoint() sumEndpoint = opentracing.TraceClient(tracer, "Sum")(sumEndpoint) sumEndpoint = limiter(sumEndpoint) sumEndpoint = circuitbreaker.Gobreaker(gobreaker.NewCircuitBreaker(gobreaker.Settings{ Name: "Sum", Timeout: 30 * time.Second, }))(sumEndpoint) } var concatEndpoint endpoint.Endpoint { concatEndpoint = jsonrpc.NewClient( u, "concat", jsonrpc.ClientRequestEncoder(encodeConcatRequest), jsonrpc.ClientResponseDecoder(decodeConcatResponse), ).Endpoint() concatEndpoint = opentracing.TraceClient(tracer, "Concat")(concatEndpoint) concatEndpoint = limiter(concatEndpoint) concatEndpoint = circuitbreaker.Gobreaker(gobreaker.NewCircuitBreaker(gobreaker.Settings{ Name: "Concat", Timeout: 30 * time.Second, }))(concatEndpoint) } // Returning the endpoint.Set as a service.Service relies on the // endpoint.Set implementing the Service methods. That's just a simple bit // of glue code. return addendpoint.Set{ SumEndpoint: sumEndpoint, ConcatEndpoint: concatEndpoint, }, nil } // makeEndpointCodecMap returns a codec map configured for the addsvc. func makeEndpointCodecMap(endpoints addendpoint.Set) jsonrpc.EndpointCodecMap { return jsonrpc.EndpointCodecMap{ "sum": jsonrpc.EndpointCodec{ Endpoint: endpoints.SumEndpoint, Decode: decodeSumRequest, Encode: encodeSumResponse, }, "concat": jsonrpc.EndpointCodec{ Endpoint: endpoints.ConcatEndpoint, Decode: decodeConcatRequest, Encode: encodeConcatResponse, }, } } func decodeSumRequest(_ context.Context, msg json.RawMessage) (interface{}, error) { var req addendpoint.SumRequest err := json.Unmarshal(msg, &req) if err != nil { return nil, &jsonrpc.Error{ Code: -32000, Message: fmt.Sprintf("couldn't unmarshal body to sum request: %s", err), } } return req, nil } func encodeSumResponse(_ context.Context, obj interface{}) (json.RawMessage, error) { res, ok := obj.(addendpoint.SumResponse) if !ok { return nil, &jsonrpc.Error{ Code: -32000, Message: fmt.Sprintf("Asserting result to *SumResponse failed. Got %T, %+v", obj, obj), } } b, err := json.Marshal(res) if err != nil { return nil, fmt.Errorf("couldn't marshal response: %s", err) } return b, nil } func decodeSumResponse(_ context.Context, res jsonrpc.Response) (interface{}, error) { if res.Error != nil { return nil, *res.Error } var sumres addendpoint.SumResponse err := json.Unmarshal(res.Result, &sumres) if err != nil { return nil, fmt.Errorf("couldn't unmarshal body to SumResponse: %s", err) } return sumres, nil } func encodeSumRequest(_ context.Context, obj interface{}) (json.RawMessage, error) { req, ok := obj.(addendpoint.SumRequest) if !ok { return nil, fmt.Errorf("couldn't assert request as SumRequest, got %T", obj) } b, err := json.Marshal(req) if err != nil { return nil, fmt.Errorf("couldn't marshal request: %s", err) } return b, nil } func decodeConcatRequest(_ context.Context, msg json.RawMessage) (interface{}, error) { var req addendpoint.ConcatRequest err := json.Unmarshal(msg, &req) if err != nil { return nil, &jsonrpc.Error{ Code: -32000, Message: fmt.Sprintf("couldn't unmarshal body to concat request: %s", err), } } return req, nil } func encodeConcatResponse(_ context.Context, obj interface{}) (json.RawMessage, error) { res, ok := obj.(addendpoint.ConcatResponse) if !ok { return nil, &jsonrpc.Error{ Code: -32000, Message: fmt.Sprintf("Asserting result to *ConcatResponse failed. Got %T, %+v", obj, obj), } } b, err := json.Marshal(res) if err != nil { return nil, fmt.Errorf("couldn't marshal response: %s", err) } return b, nil } func decodeConcatResponse(_ context.Context, res jsonrpc.Response) (interface{}, error) { if res.Error != nil { return nil, *res.Error } var concatres addendpoint.ConcatResponse err := json.Unmarshal(res.Result, &concatres) if err != nil { return nil, fmt.Errorf("couldn't unmarshal body to ConcatResponse: %s", err) } return concatres, nil } func encodeConcatRequest(_ context.Context, obj interface{}) (json.RawMessage, error) { req, ok := obj.(addendpoint.ConcatRequest) if !ok { return nil, fmt.Errorf("couldn't assert request as ConcatRequest, got %T", obj) } b, err := json.Marshal(req) if err != nil { return nil, fmt.Errorf("couldn't marshal request: %s", err) } return b, nil }