diff --git a/instrumentation/google.golang.org/grpc/otelgrpc/stats_handler.go b/instrumentation/google.golang.org/grpc/otelgrpc/stats_handler.go index c64a53443bc..aa062980672 100644 --- a/instrumentation/google.golang.org/grpc/otelgrpc/stats_handler.go +++ b/instrumentation/google.golang.org/grpc/otelgrpc/stats_handler.go @@ -23,7 +23,10 @@ import ( "google.golang.org/grpc/status" "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc/internal" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/codes" + "go.opentelemetry.io/otel/metric" semconv "go.opentelemetry.io/otel/semconv/v1.17.0" "go.opentelemetry.io/otel/trace" ) @@ -33,84 +36,186 @@ type gRPCContextKey struct{} type gRPCContext struct { messagesReceived int64 messagesSent int64 + metricAttrs []attribute.KeyValue } // NewServerHandler creates a stats.Handler for gRPC server. func NewServerHandler(opts ...Option) stats.Handler { h := &serverHandler{ config: newConfig(opts), + r: &handler{}, } - h.tracer = h.config.TracerProvider.Tracer( + h.r.tracer = h.config.TracerProvider.Tracer( instrumentationName, trace.WithInstrumentationVersion(SemVersion()), ) + h.r.meter = h.config.MeterProvider.Meter( + instrumentationName, + metric.WithInstrumentationVersion(Version()), + metric.WithSchemaURL(semconv.SchemaURL), + ) + var err error + h.r.rpcDuration, err = h.r.meter.Int64Histogram("rpc.server.duration", + metric.WithDescription("Measures the duration of inbound RPC."), + metric.WithUnit("ms")) + if err != nil { + otel.Handle(err) + } + + h.r.rpcRequestSize, err = h.r.meter.Int64Histogram("rpc.server.request.size", + metric.WithDescription("Measures size of RPC request messages (uncompressed)."), + metric.WithUnit("By")) + if err != nil { + otel.Handle(err) + } + + h.r.rpcResponseSize, err = h.r.meter.Int64Histogram("rpc.server.response.size", + metric.WithDescription("Measures size of RPC response messages (uncompressed)."), + metric.WithUnit("By")) + if err != nil { + otel.Handle(err) + } + + h.r.rpcRequestsPerRPC, err = h.r.meter.Int64Histogram("rpc.server.requests_per_rpc", + metric.WithDescription("Measures the number of messages received per RPC. Should be 1 for all non-streaming RPCs."), + metric.WithUnit("{count}")) + if err != nil { + otel.Handle(err) + } + + h.r.rpcResponsesPerRPC, err = h.r.meter.Int64Histogram("rpc.server.responses_per_rpc", + metric.WithDescription("Measures the number of messages received per RPC. Should be 1 for all non-streaming RPCs."), + metric.WithUnit("{count}")) + if err != nil { + otel.Handle(err) + } + return h } +type handler struct { + tracer trace.Tracer + meter metric.Meter + rpcDuration metric.Int64Histogram + rpcRequestSize metric.Int64Histogram + rpcResponseSize metric.Int64Histogram + rpcRequestsPerRPC metric.Int64Histogram + rpcResponsesPerRPC metric.Int64Histogram +} + type serverHandler struct { *config - tracer trace.Tracer + r *handler } // TagRPC can attach some information to the given context. func (h *serverHandler) TagRPC(ctx context.Context, info *stats.RPCTagInfo) context.Context { + // fmt.Printf("server TagRPC, ctx.Err %v\n", ctx.Err()) ctx = extract(ctx, h.config.Propagators) name, attrs := internal.ParseFullMethod(info.FullMethodName) attrs = append(attrs, RPCSystemGRPC) - ctx, _ = h.tracer.Start( + ctx, _ = h.r.tracer.Start( trace.ContextWithRemoteSpanContext(ctx, trace.SpanContextFromContext(ctx)), name, trace.WithSpanKind(trace.SpanKindServer), trace.WithAttributes(attrs...), ) - gctx := gRPCContext{} + gctx := gRPCContext{ + metricAttrs: attrs, + } return context.WithValue(ctx, gRPCContextKey{}, &gctx) } // HandleRPC processes the RPC stats. func (h *serverHandler) HandleRPC(ctx context.Context, rs stats.RPCStats) { - handleRPC(ctx, rs) + // fmt.Printf("server HandleRPC, ctx.Err %v\n", ctx.Err()) + h.r.handleRPC(ctx, rs) } // TagConn can attach some information to the given context. func (h *serverHandler) TagConn(ctx context.Context, info *stats.ConnTagInfo) context.Context { + // fmt.Printf("server TagConn, ctx.Err %v\n", ctx.Err()) span := trace.SpanFromContext(ctx) attrs := peerAttr(peerFromCtx(ctx)) span.SetAttributes(attrs...) + return ctx } // HandleConn processes the Conn stats. func (h *serverHandler) HandleConn(ctx context.Context, info stats.ConnStats) { + // fmt.Printf("server HandleConn, ctx.Err %v\n", ctx.Err()) } // NewClientHandler creates a stats.Handler for gRPC client. func NewClientHandler(opts ...Option) stats.Handler { h := &clientHandler{ config: newConfig(opts), + r: &handler{}, } - h.tracer = h.config.TracerProvider.Tracer( + h.r.tracer = h.config.TracerProvider.Tracer( instrumentationName, trace.WithInstrumentationVersion(SemVersion()), ) + h.r.meter = h.config.MeterProvider.Meter( + instrumentationName, + metric.WithInstrumentationVersion(Version()), + metric.WithSchemaURL(semconv.SchemaURL), + ) + var err error + h.r.rpcDuration, err = h.r.meter.Int64Histogram("rpc.client.duration", + metric.WithDescription("Measures the duration of inbound RPC."), + metric.WithUnit("ms")) + if err != nil { + otel.Handle(err) + } + + h.r.rpcRequestSize, err = h.r.meter.Int64Histogram("rpc.client.request.size", + metric.WithDescription("Measures size of RPC request messages (uncompressed)."), + metric.WithUnit("By")) + if err != nil { + otel.Handle(err) + } + + h.r.rpcResponseSize, err = h.r.meter.Int64Histogram("rpc.client.response.size", + metric.WithDescription("Measures size of RPC response messages (uncompressed)."), + metric.WithUnit("By")) + if err != nil { + otel.Handle(err) + } + + h.r.rpcRequestsPerRPC, err = h.r.meter.Int64Histogram("rpc.client.requests_per_rpc", + metric.WithDescription("Measures the number of messages received per RPC. Should be 1 for all non-streaming RPCs."), + metric.WithUnit("{count}")) + if err != nil { + otel.Handle(err) + } + + h.r.rpcResponsesPerRPC, err = h.r.meter.Int64Histogram("rpc.client.responses_per_rpc", + metric.WithDescription("Measures the number of messages received per RPC. Should be 1 for all non-streaming RPCs."), + metric.WithUnit("{count}")) + if err != nil { + otel.Handle(err) + } + return h } type clientHandler struct { *config - tracer trace.Tracer + r *handler } // TagRPC can attach some information to the given context. func (h *clientHandler) TagRPC(ctx context.Context, info *stats.RPCTagInfo) context.Context { name, attrs := internal.ParseFullMethod(info.FullMethodName) attrs = append(attrs, RPCSystemGRPC) - ctx, _ = h.tracer.Start( + ctx, _ = h.r.tracer.Start( ctx, name, trace.WithSpanKind(trace.SpanKindClient), @@ -124,7 +229,7 @@ func (h *clientHandler) TagRPC(ctx context.Context, info *stats.RPCTagInfo) cont // HandleRPC processes the RPC stats. func (h *clientHandler) HandleRPC(ctx context.Context, rs stats.RPCStats) { - handleRPC(ctx, rs) + h.r.handleRPC(ctx, rs) } // TagConn can attach some information to the given context. @@ -140,16 +245,20 @@ func (h *clientHandler) HandleConn(context.Context, stats.ConnStats) { // no-op } -func handleRPC(ctx context.Context, rs stats.RPCStats) { +func (r *handler) handleRPC(ctx context.Context, rs stats.RPCStats) { span := trace.SpanFromContext(ctx) gctx, _ := ctx.Value(gRPCContextKey{}).(*gRPCContext) var messageId int64 + metricAttrs := make([]attribute.KeyValue, 0, len(gctx.metricAttrs)+1) + metricAttrs = append(metricAttrs, gctx.metricAttrs...) + switch rs := rs.(type) { case *stats.Begin: case *stats.InPayload: if gctx != nil { messageId = atomic.AddInt64(&gctx.messagesReceived, 1) + r.rpcRequestSize.Record(context.TODO(), int64(rs.Length), metric.WithAttributes(metricAttrs...)) } span.AddEvent("message", trace.WithAttributes( @@ -162,6 +271,7 @@ func handleRPC(ctx context.Context, rs stats.RPCStats) { case *stats.OutPayload: if gctx != nil { messageId = atomic.AddInt64(&gctx.messagesSent, 1) + r.rpcResponseSize.Record(context.TODO(), int64(rs.Length), metric.WithAttributes(metricAttrs...)) } span.AddEvent("message", @@ -173,14 +283,40 @@ func handleRPC(ctx context.Context, rs stats.RPCStats) { ), ) case *stats.End: + var rpcStatusAttr attribute.KeyValue if rs.Error != nil { s, _ := status.FromError(rs.Error) - span.SetStatus(codes.Error, s.Message()) - span.SetAttributes(statusCodeAttr(s.Code())) + rpcStatusAttr = semconv.RPCGRPCStatusCodeKey.Int(int(s.Code())) } else { - span.SetAttributes(statusCodeAttr(grpc_codes.OK)) + rpcStatusAttr = semconv.RPCGRPCStatusCodeKey.Int(int(grpc_codes.OK)) + } + metricAttrs = append(metricAttrs, rpcStatusAttr) + + span.SetAttributes(rpcStatusAttr) + if rs.Error != nil { + s, _ := status.FromError(rs.Error) + span.SetStatus(codes.Error, s.Message()) } span.End() + + r.rpcDuration.Record( + context.TODO(), + int64(rs.EndTime.Sub(rs.BeginTime)), + metric.WithAttributes(metricAttrs...), + ) + + r.rpcRequestsPerRPC.Record( + context.TODO(), + int64(gctx.messagesReceived), + metric.WithAttributes(metricAttrs...), + ) + + r.rpcResponsesPerRPC.Record( + context.TODO(), + int64(gctx.messagesSent), + metric.WithAttributes(metricAttrs...), + ) + default: return } diff --git a/instrumentation/google.golang.org/grpc/otelgrpc/test/grpc_stats_handler_test.go b/instrumentation/google.golang.org/grpc/otelgrpc/test/grpc_stats_handler_test.go index b6afd1a118d..591949192e2 100644 --- a/instrumentation/google.golang.org/grpc/otelgrpc/test/grpc_stats_handler_test.go +++ b/instrumentation/google.golang.org/grpc/otelgrpc/test/grpc_stats_handler_test.go @@ -15,6 +15,8 @@ package test import ( + "context" + "strings" "testing" "github.com/stretchr/testify/assert" @@ -24,24 +26,30 @@ import ( "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc" "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/sdk/metric" + "go.opentelemetry.io/otel/sdk/metric/metricdata" "go.opentelemetry.io/otel/sdk/trace" "go.opentelemetry.io/otel/sdk/trace/tracetest" semconv "go.opentelemetry.io/otel/semconv/v1.17.0" ) -func TestStatsHandler(t *testing.T) { +func TestStatsHandlerMix(t *testing.T) { clientSR := tracetest.NewSpanRecorder() clientTP := trace.NewTracerProvider(trace.WithSpanProcessor(clientSR)) + clientMetricReader := metric.NewManualReader() + clientMP := metric.NewMeterProvider(metric.WithReader(clientMetricReader)) serverSR := tracetest.NewSpanRecorder() serverTP := trace.NewTracerProvider(trace.WithSpanProcessor(serverSR)) + serverMetricReader := metric.NewManualReader() + serverMP := metric.NewMeterProvider(metric.WithReader(serverMetricReader)) assert.NoError(t, doCalls( []grpc.DialOption{ - grpc.WithStatsHandler(otelgrpc.NewClientHandler(otelgrpc.WithTracerProvider(clientTP))), + grpc.WithStatsHandler(otelgrpc.NewClientHandler(otelgrpc.WithTracerProvider(clientTP), otelgrpc.WithMeterProvider(clientMP))), }, []grpc.ServerOption{ - grpc.StatsHandler(otelgrpc.NewServerHandler(otelgrpc.WithTracerProvider(serverTP))), + grpc.StatsHandler(otelgrpc.NewServerHandler(otelgrpc.WithTracerProvider(serverTP), otelgrpc.WithMeterProvider(serverMP))), }, )) @@ -51,6 +59,7 @@ func TestStatsHandler(t *testing.T) { t.Run("ServerSpans", func(t *testing.T) { checkServerSpans(t, serverSR.Ended()) + checkServerRecords(t, serverMetricReader) }) } @@ -579,3 +588,40 @@ func checkServerSpans(t *testing.T, spans []trace.ReadOnlySpan) { otelgrpc.GRPCStatusCodeKey.Int64(int64(codes.OK)), }, pingPong.Attributes()) } + +func checkServerRecords(t *testing.T, reader metric.Reader) { + rm := metricdata.ResourceMetrics{} + err := reader.Collect(context.Background(), &rm) + assert.NoError(t, err) + require.Len(t, rm.ScopeMetrics, 1) + require.Len(t, rm.ScopeMetrics[0].Metrics, 5) + for _, metric := range rm.ScopeMetrics[0].Metrics { + require.IsType(t, metric.Data, metricdata.Histogram[int64]{}) + data := metric.Data.(metricdata.Histogram[int64]) + expectedKeys := make([]string, 0) + switch { + case strings.HasSuffix(metric.Name, "duration"), strings.HasSuffix(metric.Name, "requests_per_rpc"), strings.HasSuffix(metric.Name, "responses_per_rpc"): + expectedKeys = []string{ + string(semconv.RPCMethodKey), + string(semconv.RPCServiceKey), + string(semconv.RPCSystemKey), + string(otelgrpc.GRPCStatusCodeKey), + } + case strings.HasSuffix(metric.Name, "request.size"), strings.HasSuffix(metric.Name, "response.size"): + expectedKeys = []string{ + string(semconv.RPCMethodKey), + string(semconv.RPCServiceKey), + string(semconv.RPCSystemKey), + } + } + + for _, dpt := range data.DataPoints { + attrs := dpt.Attributes.ToSlice() + keys := make([]string, 0, len(attrs)) + for _, attr := range attrs { + keys = append(keys, string(attr.Key)) + } + assert.ElementsMatch(t, expectedKeys, keys) + } + } +}