diff --git a/CHANGELOG.md b/CHANGELOG.md index 99726b7e422..3bc6cde8629 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,6 +12,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm - Add the new `go.opentelemetry.io/contrib/instrgen` package to provide auto-generated source code instrumentation. (#3068, #3108) - Add `"go.opentelemetry.io/contrib/samplers/jaegerremote".WithSamplingStrategyFetcher` which sets custom fetcher implementation. (#4045) +- Add metric support for `grpc.StatsHandler` in `go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc`. (#4356) ### Changed diff --git a/instrumentation/google.golang.org/grpc/otelgrpc/stats_handler.go b/instrumentation/google.golang.org/grpc/otelgrpc/stats_handler.go index c64a53443bc..51d96093361 100644 --- a/instrumentation/google.golang.org/grpc/otelgrpc/stats_handler.go +++ b/instrumentation/google.golang.org/grpc/otelgrpc/stats_handler.go @@ -23,7 +23,10 @@ import ( "google.golang.org/grpc/status" "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc/internal" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/codes" + "go.opentelemetry.io/otel/metric" semconv "go.opentelemetry.io/otel/semconv/v1.17.0" "go.opentelemetry.io/otel/trace" ) @@ -33,98 +36,204 @@ type gRPCContextKey struct{} type gRPCContext struct { messagesReceived int64 messagesSent int64 + requestSize int64 + responseSize int64 + metricAttrs []attribute.KeyValue } // NewServerHandler creates a stats.Handler for gRPC server. func NewServerHandler(opts ...Option) stats.Handler { h := &serverHandler{ config: newConfig(opts), + r: &handler{}, } - h.tracer = h.config.TracerProvider.Tracer( + h.r.tracer = h.config.TracerProvider.Tracer( instrumentationName, trace.WithInstrumentationVersion(SemVersion()), ) + h.r.meter = h.config.MeterProvider.Meter( + instrumentationName, + metric.WithInstrumentationVersion(Version()), + metric.WithSchemaURL(semconv.SchemaURL), + ) + var err error + h.r.rpcDuration, err = h.r.meter.Int64Histogram("rpc.server.duration", + metric.WithDescription("Measures the duration of inbound RPC."), + metric.WithUnit("ms")) + if err != nil { + otel.Handle(err) + } + + h.r.rpcRequestSize, err = h.r.meter.Int64Histogram("rpc.server.request.size", + metric.WithDescription("Measures size of RPC request messages (uncompressed)."), + metric.WithUnit("By")) + if err != nil { + otel.Handle(err) + } + + h.r.rpcResponseSize, err = h.r.meter.Int64Histogram("rpc.server.response.size", + metric.WithDescription("Measures size of RPC response messages (uncompressed)."), + metric.WithUnit("By")) + if err != nil { + otel.Handle(err) + } + + h.r.rpcRequestsPerRPC, err = h.r.meter.Int64Histogram("rpc.server.requests_per_rpc", + metric.WithDescription("Measures the number of messages received per RPC. Should be 1 for all non-streaming RPCs."), + metric.WithUnit("{count}")) + if err != nil { + otel.Handle(err) + } + + h.r.rpcResponsesPerRPC, err = h.r.meter.Int64Histogram("rpc.server.responses_per_rpc", + metric.WithDescription("Measures the number of messages received per RPC. Should be 1 for all non-streaming RPCs."), + metric.WithUnit("{count}")) + if err != nil { + otel.Handle(err) + } + return h } +type handler struct { + tracer trace.Tracer + meter metric.Meter + rpcDuration metric.Int64Histogram + rpcRequestSize metric.Int64Histogram + rpcResponseSize metric.Int64Histogram + rpcRequestsPerRPC metric.Int64Histogram + rpcResponsesPerRPC metric.Int64Histogram +} + type serverHandler struct { *config - tracer trace.Tracer + r *handler } // TagRPC can attach some information to the given context. func (h *serverHandler) TagRPC(ctx context.Context, info *stats.RPCTagInfo) context.Context { + // fmt.Printf("server TagRPC, ctx.Err %v\n", ctx.Err()) ctx = extract(ctx, h.config.Propagators) name, attrs := internal.ParseFullMethod(info.FullMethodName) attrs = append(attrs, RPCSystemGRPC) - ctx, _ = h.tracer.Start( + ctx, _ = h.r.tracer.Start( trace.ContextWithRemoteSpanContext(ctx, trace.SpanContextFromContext(ctx)), name, trace.WithSpanKind(trace.SpanKindServer), trace.WithAttributes(attrs...), ) - gctx := gRPCContext{} + gctx := gRPCContext{ + metricAttrs: attrs, + } return context.WithValue(ctx, gRPCContextKey{}, &gctx) } // HandleRPC processes the RPC stats. func (h *serverHandler) HandleRPC(ctx context.Context, rs stats.RPCStats) { - handleRPC(ctx, rs) + // fmt.Printf("server HandleRPC, ctx.Err %v\n", ctx.Err()) + h.r.handleRPC(ctx, rs) } // TagConn can attach some information to the given context. func (h *serverHandler) TagConn(ctx context.Context, info *stats.ConnTagInfo) context.Context { + // fmt.Printf("server TagConn, ctx.Err %v\n", ctx.Err()) span := trace.SpanFromContext(ctx) attrs := peerAttr(peerFromCtx(ctx)) span.SetAttributes(attrs...) + return ctx } // HandleConn processes the Conn stats. func (h *serverHandler) HandleConn(ctx context.Context, info stats.ConnStats) { + // fmt.Printf("server HandleConn, ctx.Err %v\n", ctx.Err()) } // NewClientHandler creates a stats.Handler for gRPC client. func NewClientHandler(opts ...Option) stats.Handler { h := &clientHandler{ config: newConfig(opts), + r: &handler{}, } - h.tracer = h.config.TracerProvider.Tracer( + h.r.tracer = h.config.TracerProvider.Tracer( instrumentationName, trace.WithInstrumentationVersion(SemVersion()), ) + h.r.meter = h.config.MeterProvider.Meter( + instrumentationName, + metric.WithInstrumentationVersion(Version()), + metric.WithSchemaURL(semconv.SchemaURL), + ) + var err error + h.r.rpcDuration, err = h.r.meter.Int64Histogram("rpc.client.duration", + metric.WithDescription("Measures the duration of inbound RPC."), + metric.WithUnit("ms")) + if err != nil { + otel.Handle(err) + } + + h.r.rpcRequestSize, err = h.r.meter.Int64Histogram("rpc.client.request.size", + metric.WithDescription("Measures size of RPC request messages (uncompressed)."), + metric.WithUnit("By")) + if err != nil { + otel.Handle(err) + } + + h.r.rpcResponseSize, err = h.r.meter.Int64Histogram("rpc.client.response.size", + metric.WithDescription("Measures size of RPC response messages (uncompressed)."), + metric.WithUnit("By")) + if err != nil { + otel.Handle(err) + } + + h.r.rpcRequestsPerRPC, err = h.r.meter.Int64Histogram("rpc.client.requests_per_rpc", + metric.WithDescription("Measures the number of messages received per RPC. Should be 1 for all non-streaming RPCs."), + metric.WithUnit("{count}")) + if err != nil { + otel.Handle(err) + } + + h.r.rpcResponsesPerRPC, err = h.r.meter.Int64Histogram("rpc.client.responses_per_rpc", + metric.WithDescription("Measures the number of messages received per RPC. Should be 1 for all non-streaming RPCs."), + metric.WithUnit("{count}")) + if err != nil { + otel.Handle(err) + } + return h } type clientHandler struct { *config - tracer trace.Tracer + r *handler } // TagRPC can attach some information to the given context. func (h *clientHandler) TagRPC(ctx context.Context, info *stats.RPCTagInfo) context.Context { name, attrs := internal.ParseFullMethod(info.FullMethodName) attrs = append(attrs, RPCSystemGRPC) - ctx, _ = h.tracer.Start( + ctx, _ = h.r.tracer.Start( ctx, name, trace.WithSpanKind(trace.SpanKindClient), trace.WithAttributes(attrs...), ) - gctx := gRPCContext{} + gctx := gRPCContext{ + metricAttrs: attrs, + } return inject(context.WithValue(ctx, gRPCContextKey{}, &gctx), h.config.Propagators) } // HandleRPC processes the RPC stats. func (h *clientHandler) HandleRPC(ctx context.Context, rs stats.RPCStats) { - handleRPC(ctx, rs) + h.r.handleRPC(ctx, rs) } // TagConn can attach some information to the given context. @@ -140,7 +249,7 @@ func (h *clientHandler) HandleConn(context.Context, stats.ConnStats) { // no-op } -func handleRPC(ctx context.Context, rs stats.RPCStats) { +func (r *handler) handleRPC(ctx context.Context, rs stats.RPCStats) { span := trace.SpanFromContext(ctx) gctx, _ := ctx.Value(gRPCContextKey{}).(*gRPCContext) var messageId int64 @@ -150,6 +259,7 @@ func handleRPC(ctx context.Context, rs stats.RPCStats) { case *stats.InPayload: if gctx != nil { messageId = atomic.AddInt64(&gctx.messagesReceived, 1) + atomic.StoreInt64(&gctx.requestSize, int64(rs.Length)) } span.AddEvent("message", trace.WithAttributes( @@ -162,6 +272,7 @@ func handleRPC(ctx context.Context, rs stats.RPCStats) { case *stats.OutPayload: if gctx != nil { messageId = atomic.AddInt64(&gctx.messagesSent, 1) + atomic.StoreInt64(&gctx.responseSize, int64(rs.Length)) } span.AddEvent("message", @@ -173,14 +284,27 @@ func handleRPC(ctx context.Context, rs stats.RPCStats) { ), ) case *stats.End: + var rpcStatusAttr attribute.KeyValue + metricAttrs := make([]attribute.KeyValue, 0, len(gctx.metricAttrs)+1) + metricAttrs = append(metricAttrs, gctx.metricAttrs...) + if rs.Error != nil { s, _ := status.FromError(rs.Error) span.SetStatus(codes.Error, s.Message()) - span.SetAttributes(statusCodeAttr(s.Code())) + rpcStatusAttr = semconv.RPCGRPCStatusCodeKey.Int(int(s.Code())) } else { - span.SetAttributes(statusCodeAttr(grpc_codes.OK)) + rpcStatusAttr = semconv.RPCGRPCStatusCodeKey.Int(int(grpc_codes.OK)) } + span.SetAttributes(rpcStatusAttr) span.End() + + metricAttrs = append(metricAttrs, rpcStatusAttr) + r.rpcDuration.Record(context.TODO(), int64(rs.EndTime.Sub(rs.BeginTime)), metric.WithAttributes(metricAttrs...)) + r.rpcRequestSize.Record(context.TODO(), gctx.requestSize, metric.WithAttributes(metricAttrs...)) + r.rpcResponseSize.Record(context.TODO(), gctx.responseSize, metric.WithAttributes(metricAttrs...)) + r.rpcRequestsPerRPC.Record(context.TODO(), gctx.messagesReceived, metric.WithAttributes(metricAttrs...)) + r.rpcResponsesPerRPC.Record(context.TODO(), gctx.messagesSent, metric.WithAttributes(metricAttrs...)) + default: return } diff --git a/instrumentation/google.golang.org/grpc/otelgrpc/test/grpc_stats_handler_test.go b/instrumentation/google.golang.org/grpc/otelgrpc/test/grpc_stats_handler_test.go index b6afd1a118d..31bad49cab5 100644 --- a/instrumentation/google.golang.org/grpc/otelgrpc/test/grpc_stats_handler_test.go +++ b/instrumentation/google.golang.org/grpc/otelgrpc/test/grpc_stats_handler_test.go @@ -15,6 +15,7 @@ package test import ( + "context" "testing" "github.com/stretchr/testify/assert" @@ -24,6 +25,8 @@ import ( "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc" "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/sdk/metric" + "go.opentelemetry.io/otel/sdk/metric/metricdata" "go.opentelemetry.io/otel/sdk/trace" "go.opentelemetry.io/otel/sdk/trace/tracetest" semconv "go.opentelemetry.io/otel/semconv/v1.17.0" @@ -32,25 +35,31 @@ import ( func TestStatsHandler(t *testing.T) { clientSR := tracetest.NewSpanRecorder() clientTP := trace.NewTracerProvider(trace.WithSpanProcessor(clientSR)) + clientMetricReader := metric.NewManualReader() + clientMP := metric.NewMeterProvider(metric.WithReader(clientMetricReader)) serverSR := tracetest.NewSpanRecorder() serverTP := trace.NewTracerProvider(trace.WithSpanProcessor(serverSR)) + serverMetricReader := metric.NewManualReader() + serverMP := metric.NewMeterProvider(metric.WithReader(serverMetricReader)) assert.NoError(t, doCalls( []grpc.DialOption{ - grpc.WithStatsHandler(otelgrpc.NewClientHandler(otelgrpc.WithTracerProvider(clientTP))), + grpc.WithStatsHandler(otelgrpc.NewClientHandler(otelgrpc.WithTracerProvider(clientTP), otelgrpc.WithMeterProvider(clientMP))), }, []grpc.ServerOption{ - grpc.StatsHandler(otelgrpc.NewServerHandler(otelgrpc.WithTracerProvider(serverTP))), + grpc.StatsHandler(otelgrpc.NewServerHandler(otelgrpc.WithTracerProvider(serverTP), otelgrpc.WithMeterProvider(serverMP))), }, )) - t.Run("ClientSpans", func(t *testing.T) { + t.Run("Client", func(t *testing.T) { checkClientSpans(t, clientSR.Ended()) + checkClientRecords(t, clientMetricReader) }) - t.Run("ServerSpans", func(t *testing.T) { + t.Run("Server", func(t *testing.T) { checkServerSpans(t, serverSR.Ended()) + checkServerRecords(t, serverMetricReader) }) } @@ -579,3 +588,49 @@ func checkServerSpans(t *testing.T, spans []trace.ReadOnlySpan) { otelgrpc.GRPCStatusCodeKey.Int64(int64(codes.OK)), }, pingPong.Attributes()) } + +func checkServerRecords(t *testing.T, reader metric.Reader) { + rm := metricdata.ResourceMetrics{} + err := reader.Collect(context.Background(), &rm) + assert.NoError(t, err) + require.Len(t, rm.ScopeMetrics, 1) + require.Len(t, rm.ScopeMetrics[0].Metrics, 5) + for _, m := range rm.ScopeMetrics[0].Metrics { + require.IsType(t, m.Data, metricdata.Histogram[int64]{}) + data := m.Data.(metricdata.Histogram[int64]) + for _, dpt := range data.DataPoints { + attr := dpt.Attributes.ToSlice() + method := getRPCMethod(attr) + assert.NotEmpty(t, method) + assert.ElementsMatch(t, []attribute.KeyValue{ + semconv.RPCMethod(method), + semconv.RPCService("grpc.testing.TestService"), + otelgrpc.RPCSystemGRPC, + otelgrpc.GRPCStatusCodeKey.Int64(int64(codes.OK)), + }, attr) + } + } +} + +func checkClientRecords(t *testing.T, reader metric.Reader) { + rm := metricdata.ResourceMetrics{} + err := reader.Collect(context.Background(), &rm) + assert.NoError(t, err) + require.Len(t, rm.ScopeMetrics, 1) + require.Len(t, rm.ScopeMetrics[0].Metrics, 5) + for _, m := range rm.ScopeMetrics[0].Metrics { + require.IsType(t, m.Data, metricdata.Histogram[int64]{}) + data := m.Data.(metricdata.Histogram[int64]) + for _, dpt := range data.DataPoints { + attr := dpt.Attributes.ToSlice() + method := getRPCMethod(attr) + assert.NotEmpty(t, method) + assert.ElementsMatch(t, []attribute.KeyValue{ + semconv.RPCMethod(method), + semconv.RPCService("grpc.testing.TestService"), + otelgrpc.RPCSystemGRPC, + otelgrpc.GRPCStatusCodeKey.Int64(int64(codes.OK)), + }, attr) + } + } +}