From 17d82e68f4d7a2fa52f771c2660851b4a2edb0bb Mon Sep 17 00:00:00 2001 From: Ziqi Zhao Date: Sun, 1 Oct 2023 21:24:10 +0800 Subject: [PATCH 1/9] otelgrpc stats.Handler support metric Signed-off-by: Ziqi Zhao --- CHANGELOG.md | 1 + .../grpc/otelgrpc/stats_handler.go | 150 ++++++++++++++++-- .../otelgrpc/test/grpc_stats_handler_test.go | 63 +++++++- 3 files changed, 197 insertions(+), 17 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index cff94fe55d7..97ffc60c92d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -15,6 +15,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm - Add `"go.opentelemetry.io/contrib/config"` package that includes configuration models generated via go-jsonschema. (#4376) - Add `NewSDK` function to `"go.opentelemetry.io/contrib/config"`. The initial implementation only returns noop providers. (#4414) - Add metrics support (No-op, OTLP and Prometheus) to `go.opentelemetry.io/contrib/exporters/autoexport`. (#4229, #4479) +- Add metric support for `grpc.StatsHandler` in `go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc`. (#4356) ### Changed diff --git a/instrumentation/google.golang.org/grpc/otelgrpc/stats_handler.go b/instrumentation/google.golang.org/grpc/otelgrpc/stats_handler.go index c64a53443bc..51d96093361 100644 --- a/instrumentation/google.golang.org/grpc/otelgrpc/stats_handler.go +++ b/instrumentation/google.golang.org/grpc/otelgrpc/stats_handler.go @@ -23,7 +23,10 @@ import ( "google.golang.org/grpc/status" "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc/internal" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/codes" + "go.opentelemetry.io/otel/metric" semconv "go.opentelemetry.io/otel/semconv/v1.17.0" "go.opentelemetry.io/otel/trace" ) @@ -33,98 +36,204 @@ type gRPCContextKey struct{} type gRPCContext struct { messagesReceived int64 messagesSent int64 + requestSize int64 + responseSize int64 + metricAttrs []attribute.KeyValue } // NewServerHandler creates a stats.Handler for gRPC server. func NewServerHandler(opts ...Option) stats.Handler { h := &serverHandler{ config: newConfig(opts), + r: &handler{}, } - h.tracer = h.config.TracerProvider.Tracer( + h.r.tracer = h.config.TracerProvider.Tracer( instrumentationName, trace.WithInstrumentationVersion(SemVersion()), ) + h.r.meter = h.config.MeterProvider.Meter( + instrumentationName, + metric.WithInstrumentationVersion(Version()), + metric.WithSchemaURL(semconv.SchemaURL), + ) + var err error + h.r.rpcDuration, err = h.r.meter.Int64Histogram("rpc.server.duration", + metric.WithDescription("Measures the duration of inbound RPC."), + metric.WithUnit("ms")) + if err != nil { + otel.Handle(err) + } + + h.r.rpcRequestSize, err = h.r.meter.Int64Histogram("rpc.server.request.size", + metric.WithDescription("Measures size of RPC request messages (uncompressed)."), + metric.WithUnit("By")) + if err != nil { + otel.Handle(err) + } + + h.r.rpcResponseSize, err = h.r.meter.Int64Histogram("rpc.server.response.size", + metric.WithDescription("Measures size of RPC response messages (uncompressed)."), + metric.WithUnit("By")) + if err != nil { + otel.Handle(err) + } + + h.r.rpcRequestsPerRPC, err = h.r.meter.Int64Histogram("rpc.server.requests_per_rpc", + metric.WithDescription("Measures the number of messages received per RPC. Should be 1 for all non-streaming RPCs."), + metric.WithUnit("{count}")) + if err != nil { + otel.Handle(err) + } + + h.r.rpcResponsesPerRPC, err = h.r.meter.Int64Histogram("rpc.server.responses_per_rpc", + metric.WithDescription("Measures the number of messages received per RPC. Should be 1 for all non-streaming RPCs."), + metric.WithUnit("{count}")) + if err != nil { + otel.Handle(err) + } + return h } +type handler struct { + tracer trace.Tracer + meter metric.Meter + rpcDuration metric.Int64Histogram + rpcRequestSize metric.Int64Histogram + rpcResponseSize metric.Int64Histogram + rpcRequestsPerRPC metric.Int64Histogram + rpcResponsesPerRPC metric.Int64Histogram +} + type serverHandler struct { *config - tracer trace.Tracer + r *handler } // TagRPC can attach some information to the given context. func (h *serverHandler) TagRPC(ctx context.Context, info *stats.RPCTagInfo) context.Context { + // fmt.Printf("server TagRPC, ctx.Err %v\n", ctx.Err()) ctx = extract(ctx, h.config.Propagators) name, attrs := internal.ParseFullMethod(info.FullMethodName) attrs = append(attrs, RPCSystemGRPC) - ctx, _ = h.tracer.Start( + ctx, _ = h.r.tracer.Start( trace.ContextWithRemoteSpanContext(ctx, trace.SpanContextFromContext(ctx)), name, trace.WithSpanKind(trace.SpanKindServer), trace.WithAttributes(attrs...), ) - gctx := gRPCContext{} + gctx := gRPCContext{ + metricAttrs: attrs, + } return context.WithValue(ctx, gRPCContextKey{}, &gctx) } // HandleRPC processes the RPC stats. func (h *serverHandler) HandleRPC(ctx context.Context, rs stats.RPCStats) { - handleRPC(ctx, rs) + // fmt.Printf("server HandleRPC, ctx.Err %v\n", ctx.Err()) + h.r.handleRPC(ctx, rs) } // TagConn can attach some information to the given context. func (h *serverHandler) TagConn(ctx context.Context, info *stats.ConnTagInfo) context.Context { + // fmt.Printf("server TagConn, ctx.Err %v\n", ctx.Err()) span := trace.SpanFromContext(ctx) attrs := peerAttr(peerFromCtx(ctx)) span.SetAttributes(attrs...) + return ctx } // HandleConn processes the Conn stats. func (h *serverHandler) HandleConn(ctx context.Context, info stats.ConnStats) { + // fmt.Printf("server HandleConn, ctx.Err %v\n", ctx.Err()) } // NewClientHandler creates a stats.Handler for gRPC client. func NewClientHandler(opts ...Option) stats.Handler { h := &clientHandler{ config: newConfig(opts), + r: &handler{}, } - h.tracer = h.config.TracerProvider.Tracer( + h.r.tracer = h.config.TracerProvider.Tracer( instrumentationName, trace.WithInstrumentationVersion(SemVersion()), ) + h.r.meter = h.config.MeterProvider.Meter( + instrumentationName, + metric.WithInstrumentationVersion(Version()), + metric.WithSchemaURL(semconv.SchemaURL), + ) + var err error + h.r.rpcDuration, err = h.r.meter.Int64Histogram("rpc.client.duration", + metric.WithDescription("Measures the duration of inbound RPC."), + metric.WithUnit("ms")) + if err != nil { + otel.Handle(err) + } + + h.r.rpcRequestSize, err = h.r.meter.Int64Histogram("rpc.client.request.size", + metric.WithDescription("Measures size of RPC request messages (uncompressed)."), + metric.WithUnit("By")) + if err != nil { + otel.Handle(err) + } + + h.r.rpcResponseSize, err = h.r.meter.Int64Histogram("rpc.client.response.size", + metric.WithDescription("Measures size of RPC response messages (uncompressed)."), + metric.WithUnit("By")) + if err != nil { + otel.Handle(err) + } + + h.r.rpcRequestsPerRPC, err = h.r.meter.Int64Histogram("rpc.client.requests_per_rpc", + metric.WithDescription("Measures the number of messages received per RPC. Should be 1 for all non-streaming RPCs."), + metric.WithUnit("{count}")) + if err != nil { + otel.Handle(err) + } + + h.r.rpcResponsesPerRPC, err = h.r.meter.Int64Histogram("rpc.client.responses_per_rpc", + metric.WithDescription("Measures the number of messages received per RPC. Should be 1 for all non-streaming RPCs."), + metric.WithUnit("{count}")) + if err != nil { + otel.Handle(err) + } + return h } type clientHandler struct { *config - tracer trace.Tracer + r *handler } // TagRPC can attach some information to the given context. func (h *clientHandler) TagRPC(ctx context.Context, info *stats.RPCTagInfo) context.Context { name, attrs := internal.ParseFullMethod(info.FullMethodName) attrs = append(attrs, RPCSystemGRPC) - ctx, _ = h.tracer.Start( + ctx, _ = h.r.tracer.Start( ctx, name, trace.WithSpanKind(trace.SpanKindClient), trace.WithAttributes(attrs...), ) - gctx := gRPCContext{} + gctx := gRPCContext{ + metricAttrs: attrs, + } return inject(context.WithValue(ctx, gRPCContextKey{}, &gctx), h.config.Propagators) } // HandleRPC processes the RPC stats. func (h *clientHandler) HandleRPC(ctx context.Context, rs stats.RPCStats) { - handleRPC(ctx, rs) + h.r.handleRPC(ctx, rs) } // TagConn can attach some information to the given context. @@ -140,7 +249,7 @@ func (h *clientHandler) HandleConn(context.Context, stats.ConnStats) { // no-op } -func handleRPC(ctx context.Context, rs stats.RPCStats) { +func (r *handler) handleRPC(ctx context.Context, rs stats.RPCStats) { span := trace.SpanFromContext(ctx) gctx, _ := ctx.Value(gRPCContextKey{}).(*gRPCContext) var messageId int64 @@ -150,6 +259,7 @@ func handleRPC(ctx context.Context, rs stats.RPCStats) { case *stats.InPayload: if gctx != nil { messageId = atomic.AddInt64(&gctx.messagesReceived, 1) + atomic.StoreInt64(&gctx.requestSize, int64(rs.Length)) } span.AddEvent("message", trace.WithAttributes( @@ -162,6 +272,7 @@ func handleRPC(ctx context.Context, rs stats.RPCStats) { case *stats.OutPayload: if gctx != nil { messageId = atomic.AddInt64(&gctx.messagesSent, 1) + atomic.StoreInt64(&gctx.responseSize, int64(rs.Length)) } span.AddEvent("message", @@ -173,14 +284,27 @@ func handleRPC(ctx context.Context, rs stats.RPCStats) { ), ) case *stats.End: + var rpcStatusAttr attribute.KeyValue + metricAttrs := make([]attribute.KeyValue, 0, len(gctx.metricAttrs)+1) + metricAttrs = append(metricAttrs, gctx.metricAttrs...) + if rs.Error != nil { s, _ := status.FromError(rs.Error) span.SetStatus(codes.Error, s.Message()) - span.SetAttributes(statusCodeAttr(s.Code())) + rpcStatusAttr = semconv.RPCGRPCStatusCodeKey.Int(int(s.Code())) } else { - span.SetAttributes(statusCodeAttr(grpc_codes.OK)) + rpcStatusAttr = semconv.RPCGRPCStatusCodeKey.Int(int(grpc_codes.OK)) } + span.SetAttributes(rpcStatusAttr) span.End() + + metricAttrs = append(metricAttrs, rpcStatusAttr) + r.rpcDuration.Record(context.TODO(), int64(rs.EndTime.Sub(rs.BeginTime)), metric.WithAttributes(metricAttrs...)) + r.rpcRequestSize.Record(context.TODO(), gctx.requestSize, metric.WithAttributes(metricAttrs...)) + r.rpcResponseSize.Record(context.TODO(), gctx.responseSize, metric.WithAttributes(metricAttrs...)) + r.rpcRequestsPerRPC.Record(context.TODO(), gctx.messagesReceived, metric.WithAttributes(metricAttrs...)) + r.rpcResponsesPerRPC.Record(context.TODO(), gctx.messagesSent, metric.WithAttributes(metricAttrs...)) + default: return } diff --git a/instrumentation/google.golang.org/grpc/otelgrpc/test/grpc_stats_handler_test.go b/instrumentation/google.golang.org/grpc/otelgrpc/test/grpc_stats_handler_test.go index b6afd1a118d..31bad49cab5 100644 --- a/instrumentation/google.golang.org/grpc/otelgrpc/test/grpc_stats_handler_test.go +++ b/instrumentation/google.golang.org/grpc/otelgrpc/test/grpc_stats_handler_test.go @@ -15,6 +15,7 @@ package test import ( + "context" "testing" "github.com/stretchr/testify/assert" @@ -24,6 +25,8 @@ import ( "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc" "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/sdk/metric" + "go.opentelemetry.io/otel/sdk/metric/metricdata" "go.opentelemetry.io/otel/sdk/trace" "go.opentelemetry.io/otel/sdk/trace/tracetest" semconv "go.opentelemetry.io/otel/semconv/v1.17.0" @@ -32,25 +35,31 @@ import ( func TestStatsHandler(t *testing.T) { clientSR := tracetest.NewSpanRecorder() clientTP := trace.NewTracerProvider(trace.WithSpanProcessor(clientSR)) + clientMetricReader := metric.NewManualReader() + clientMP := metric.NewMeterProvider(metric.WithReader(clientMetricReader)) serverSR := tracetest.NewSpanRecorder() serverTP := trace.NewTracerProvider(trace.WithSpanProcessor(serverSR)) + serverMetricReader := metric.NewManualReader() + serverMP := metric.NewMeterProvider(metric.WithReader(serverMetricReader)) assert.NoError(t, doCalls( []grpc.DialOption{ - grpc.WithStatsHandler(otelgrpc.NewClientHandler(otelgrpc.WithTracerProvider(clientTP))), + grpc.WithStatsHandler(otelgrpc.NewClientHandler(otelgrpc.WithTracerProvider(clientTP), otelgrpc.WithMeterProvider(clientMP))), }, []grpc.ServerOption{ - grpc.StatsHandler(otelgrpc.NewServerHandler(otelgrpc.WithTracerProvider(serverTP))), + grpc.StatsHandler(otelgrpc.NewServerHandler(otelgrpc.WithTracerProvider(serverTP), otelgrpc.WithMeterProvider(serverMP))), }, )) - t.Run("ClientSpans", func(t *testing.T) { + t.Run("Client", func(t *testing.T) { checkClientSpans(t, clientSR.Ended()) + checkClientRecords(t, clientMetricReader) }) - t.Run("ServerSpans", func(t *testing.T) { + t.Run("Server", func(t *testing.T) { checkServerSpans(t, serverSR.Ended()) + checkServerRecords(t, serverMetricReader) }) } @@ -579,3 +588,49 @@ func checkServerSpans(t *testing.T, spans []trace.ReadOnlySpan) { otelgrpc.GRPCStatusCodeKey.Int64(int64(codes.OK)), }, pingPong.Attributes()) } + +func checkServerRecords(t *testing.T, reader metric.Reader) { + rm := metricdata.ResourceMetrics{} + err := reader.Collect(context.Background(), &rm) + assert.NoError(t, err) + require.Len(t, rm.ScopeMetrics, 1) + require.Len(t, rm.ScopeMetrics[0].Metrics, 5) + for _, m := range rm.ScopeMetrics[0].Metrics { + require.IsType(t, m.Data, metricdata.Histogram[int64]{}) + data := m.Data.(metricdata.Histogram[int64]) + for _, dpt := range data.DataPoints { + attr := dpt.Attributes.ToSlice() + method := getRPCMethod(attr) + assert.NotEmpty(t, method) + assert.ElementsMatch(t, []attribute.KeyValue{ + semconv.RPCMethod(method), + semconv.RPCService("grpc.testing.TestService"), + otelgrpc.RPCSystemGRPC, + otelgrpc.GRPCStatusCodeKey.Int64(int64(codes.OK)), + }, attr) + } + } +} + +func checkClientRecords(t *testing.T, reader metric.Reader) { + rm := metricdata.ResourceMetrics{} + err := reader.Collect(context.Background(), &rm) + assert.NoError(t, err) + require.Len(t, rm.ScopeMetrics, 1) + require.Len(t, rm.ScopeMetrics[0].Metrics, 5) + for _, m := range rm.ScopeMetrics[0].Metrics { + require.IsType(t, m.Data, metricdata.Histogram[int64]{}) + data := m.Data.(metricdata.Histogram[int64]) + for _, dpt := range data.DataPoints { + attr := dpt.Attributes.ToSlice() + method := getRPCMethod(attr) + assert.NotEmpty(t, method) + assert.ElementsMatch(t, []attribute.KeyValue{ + semconv.RPCMethod(method), + semconv.RPCService("grpc.testing.TestService"), + otelgrpc.RPCSystemGRPC, + otelgrpc.GRPCStatusCodeKey.Int64(int64(codes.OK)), + }, attr) + } + } +} From 80c29273fe9bccb854215ca738fd873de0246238 Mon Sep 17 00:00:00 2001 From: Ziqi Zhao Date: Tue, 31 Oct 2023 09:14:46 +0800 Subject: [PATCH 2/9] refract the unit test & change duration to float64 Signed-off-by: Ziqi Zhao --- .../grpc/otelgrpc/stats_handler.go | 78 +- .../otelgrpc/test/grpc_stats_handler_test.go | 762 +++++++++++++++++- 2 files changed, 766 insertions(+), 74 deletions(-) diff --git a/instrumentation/google.golang.org/grpc/otelgrpc/stats_handler.go b/instrumentation/google.golang.org/grpc/otelgrpc/stats_handler.go index 51d96093361..bcc84e965b8 100644 --- a/instrumentation/google.golang.org/grpc/otelgrpc/stats_handler.go +++ b/instrumentation/google.golang.org/grpc/otelgrpc/stats_handler.go @@ -36,11 +36,24 @@ type gRPCContextKey struct{} type gRPCContext struct { messagesReceived int64 messagesSent int64 - requestSize int64 - responseSize int64 metricAttrs []attribute.KeyValue } +type handler struct { + tracer trace.Tracer + meter metric.Meter + rpcDuration metric.Float64Histogram + rpcRequestSize metric.Int64Histogram + rpcResponseSize metric.Int64Histogram + rpcRequestsPerRPC metric.Int64Histogram + rpcResponsesPerRPC metric.Int64Histogram +} + +type serverHandler struct { + *config + r *handler +} + // NewServerHandler creates a stats.Handler for gRPC server. func NewServerHandler(opts ...Option) stats.Handler { h := &serverHandler{ @@ -58,7 +71,7 @@ func NewServerHandler(opts ...Option) stats.Handler { metric.WithSchemaURL(semconv.SchemaURL), ) var err error - h.r.rpcDuration, err = h.r.meter.Int64Histogram("rpc.server.duration", + h.r.rpcDuration, err = h.r.meter.Float64Histogram("rpc.server.duration", metric.WithDescription("Measures the duration of inbound RPC."), metric.WithUnit("ms")) if err != nil { @@ -96,24 +109,21 @@ func NewServerHandler(opts ...Option) stats.Handler { return h } -type handler struct { - tracer trace.Tracer - meter metric.Meter - rpcDuration metric.Int64Histogram - rpcRequestSize metric.Int64Histogram - rpcResponseSize metric.Int64Histogram - rpcRequestsPerRPC metric.Int64Histogram - rpcResponsesPerRPC metric.Int64Histogram +// TagConn can attach some information to the given context. +func (h *serverHandler) TagConn(ctx context.Context, info *stats.ConnTagInfo) context.Context { + span := trace.SpanFromContext(ctx) + attrs := peerAttr(peerFromCtx(ctx)) + span.SetAttributes(attrs...) + + return ctx } -type serverHandler struct { - *config - r *handler +// HandleConn processes the Conn stats. +func (h *serverHandler) HandleConn(ctx context.Context, info stats.ConnStats) { } // TagRPC can attach some information to the given context. func (h *serverHandler) TagRPC(ctx context.Context, info *stats.RPCTagInfo) context.Context { - // fmt.Printf("server TagRPC, ctx.Err %v\n", ctx.Err()) ctx = extract(ctx, h.config.Propagators) name, attrs := internal.ParseFullMethod(info.FullMethodName) @@ -133,23 +143,12 @@ func (h *serverHandler) TagRPC(ctx context.Context, info *stats.RPCTagInfo) cont // HandleRPC processes the RPC stats. func (h *serverHandler) HandleRPC(ctx context.Context, rs stats.RPCStats) { - // fmt.Printf("server HandleRPC, ctx.Err %v\n", ctx.Err()) h.r.handleRPC(ctx, rs) } -// TagConn can attach some information to the given context. -func (h *serverHandler) TagConn(ctx context.Context, info *stats.ConnTagInfo) context.Context { - // fmt.Printf("server TagConn, ctx.Err %v\n", ctx.Err()) - span := trace.SpanFromContext(ctx) - attrs := peerAttr(peerFromCtx(ctx)) - span.SetAttributes(attrs...) - - return ctx -} - -// HandleConn processes the Conn stats. -func (h *serverHandler) HandleConn(ctx context.Context, info stats.ConnStats) { - // fmt.Printf("server HandleConn, ctx.Err %v\n", ctx.Err()) +type clientHandler struct { + *config + r *handler } // NewClientHandler creates a stats.Handler for gRPC client. @@ -170,7 +169,7 @@ func NewClientHandler(opts ...Option) stats.Handler { metric.WithSchemaURL(semconv.SchemaURL), ) var err error - h.r.rpcDuration, err = h.r.meter.Int64Histogram("rpc.client.duration", + h.r.rpcDuration, err = h.r.meter.Float64Histogram("rpc.client.duration", metric.WithDescription("Measures the duration of inbound RPC."), metric.WithUnit("ms")) if err != nil { @@ -208,11 +207,6 @@ func NewClientHandler(opts ...Option) stats.Handler { return h } -type clientHandler struct { - *config - r *handler -} - // TagRPC can attach some information to the given context. func (h *clientHandler) TagRPC(ctx context.Context, info *stats.RPCTagInfo) context.Context { name, attrs := internal.ParseFullMethod(info.FullMethodName) @@ -253,14 +247,17 @@ func (r *handler) handleRPC(ctx context.Context, rs stats.RPCStats) { span := trace.SpanFromContext(ctx) gctx, _ := ctx.Value(gRPCContextKey{}).(*gRPCContext) var messageId int64 + metricAttrs := make([]attribute.KeyValue, 0, len(gctx.metricAttrs)+1) + metricAttrs = append(metricAttrs, gctx.metricAttrs...) switch rs := rs.(type) { case *stats.Begin: case *stats.InPayload: if gctx != nil { messageId = atomic.AddInt64(&gctx.messagesReceived, 1) - atomic.StoreInt64(&gctx.requestSize, int64(rs.Length)) + r.rpcRequestSize.Record(ctx, int64(rs.Length), metric.WithAttributes(metricAttrs...)) } + span.AddEvent("message", trace.WithAttributes( semconv.MessageTypeReceived, @@ -272,7 +269,7 @@ func (r *handler) handleRPC(ctx context.Context, rs stats.RPCStats) { case *stats.OutPayload: if gctx != nil { messageId = atomic.AddInt64(&gctx.messagesSent, 1) - atomic.StoreInt64(&gctx.responseSize, int64(rs.Length)) + r.rpcResponseSize.Record(ctx, int64(rs.Length), metric.WithAttributes(metricAttrs...)) } span.AddEvent("message", @@ -283,10 +280,9 @@ func (r *handler) handleRPC(ctx context.Context, rs stats.RPCStats) { semconv.MessageUncompressedSizeKey.Int(rs.Length), ), ) + case *stats.OutTrailer: case *stats.End: var rpcStatusAttr attribute.KeyValue - metricAttrs := make([]attribute.KeyValue, 0, len(gctx.metricAttrs)+1) - metricAttrs = append(metricAttrs, gctx.metricAttrs...) if rs.Error != nil { s, _ := status.FromError(rs.Error) @@ -299,9 +295,7 @@ func (r *handler) handleRPC(ctx context.Context, rs stats.RPCStats) { span.End() metricAttrs = append(metricAttrs, rpcStatusAttr) - r.rpcDuration.Record(context.TODO(), int64(rs.EndTime.Sub(rs.BeginTime)), metric.WithAttributes(metricAttrs...)) - r.rpcRequestSize.Record(context.TODO(), gctx.requestSize, metric.WithAttributes(metricAttrs...)) - r.rpcResponseSize.Record(context.TODO(), gctx.responseSize, metric.WithAttributes(metricAttrs...)) + r.rpcDuration.Record(context.TODO(), float64(rs.EndTime.Sub(rs.BeginTime)), metric.WithAttributes(metricAttrs...)) r.rpcRequestsPerRPC.Record(context.TODO(), gctx.messagesReceived, metric.WithAttributes(metricAttrs...)) r.rpcResponsesPerRPC.Record(context.TODO(), gctx.messagesSent, metric.WithAttributes(metricAttrs...)) diff --git a/instrumentation/google.golang.org/grpc/otelgrpc/test/grpc_stats_handler_test.go b/instrumentation/google.golang.org/grpc/otelgrpc/test/grpc_stats_handler_test.go index 31bad49cab5..9e289fa0632 100644 --- a/instrumentation/google.golang.org/grpc/otelgrpc/test/grpc_stats_handler_test.go +++ b/instrumentation/google.golang.org/grpc/otelgrpc/test/grpc_stats_handler_test.go @@ -27,6 +27,8 @@ import ( "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/sdk/metric" "go.opentelemetry.io/otel/sdk/metric/metricdata" + "go.opentelemetry.io/otel/sdk/metric/metricdata/metricdatatest" + "go.opentelemetry.io/otel/sdk/trace" "go.opentelemetry.io/otel/sdk/trace/tracetest" semconv "go.opentelemetry.io/otel/semconv/v1.17.0" @@ -52,13 +54,19 @@ func TestStatsHandler(t *testing.T) { }, )) - t.Run("Client", func(t *testing.T) { + t.Run("ClientSpans", func(t *testing.T) { checkClientSpans(t, clientSR.Ended()) + }) + + t.Run("ClientMetrics", func(t *testing.T) { checkClientRecords(t, clientMetricReader) }) - t.Run("Server", func(t *testing.T) { + t.Run("ServerSpans", func(t *testing.T) { checkServerSpans(t, serverSR.Ended()) + }) + + t.Run("ServerMetrics", func(t *testing.T) { checkServerRecords(t, serverMetricReader) }) } @@ -589,48 +597,738 @@ func checkServerSpans(t *testing.T, spans []trace.ReadOnlySpan) { }, pingPong.Attributes()) } -func checkServerRecords(t *testing.T, reader metric.Reader) { +func checkClientRecords(t *testing.T, reader metric.Reader) { rm := metricdata.ResourceMetrics{} err := reader.Collect(context.Background(), &rm) assert.NoError(t, err) require.Len(t, rm.ScopeMetrics, 1) require.Len(t, rm.ScopeMetrics[0].Metrics, 5) - for _, m := range rm.ScopeMetrics[0].Metrics { - require.IsType(t, m.Data, metricdata.Histogram[int64]{}) - data := m.Data.(metricdata.Histogram[int64]) - for _, dpt := range data.DataPoints { - attr := dpt.Attributes.ToSlice() - method := getRPCMethod(attr) - assert.NotEmpty(t, method) - assert.ElementsMatch(t, []attribute.KeyValue{ - semconv.RPCMethod(method), - semconv.RPCService("grpc.testing.TestService"), - otelgrpc.RPCSystemGRPC, - otelgrpc.GRPCStatusCodeKey.Int64(int64(codes.OK)), - }, attr) - } + expectedMetrics := []struct { + mc metricdata.Metrics + opts []metricdatatest.Option + }{ + { + mc: metricdata.Metrics{ + Name: "rpc.client.duration", + Description: "Measures the duration of inbound RPC.", + Unit: "ms", + Data: metricdata.Histogram[float64]{ + Temporality: metricdata.CumulativeTemporality, + DataPoints: []metricdata.HistogramDataPoint[float64]{ + { + Attributes: attribute.NewSet( + semconv.RPCGRPCStatusCodeOk, + semconv.RPCMethod("EmptyCall"), + semconv.RPCService("grpc.testing.TestService"), + semconv.RPCSystemGRPC), + }, + { + Attributes: attribute.NewSet( + semconv.RPCGRPCStatusCodeOk, + semconv.RPCMethod("UnaryCall"), + semconv.RPCService("grpc.testing.TestService"), + semconv.RPCSystemGRPC), + }, + { + Attributes: attribute.NewSet( + semconv.RPCGRPCStatusCodeOk, + semconv.RPCMethod("StreamingInputCall"), + semconv.RPCService("grpc.testing.TestService"), + semconv.RPCSystemGRPC), + }, + { + Attributes: attribute.NewSet( + semconv.RPCGRPCStatusCodeOk, + semconv.RPCMethod("StreamingOutputCall"), + semconv.RPCService("grpc.testing.TestService"), + semconv.RPCSystemGRPC), + }, + { + Attributes: attribute.NewSet( + semconv.RPCGRPCStatusCodeOk, + semconv.RPCMethod("FullDuplexCall"), + semconv.RPCService("grpc.testing.TestService"), + semconv.RPCSystemGRPC), + }, + }, + }, + }, + opts: []metricdatatest.Option{metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreValue()}, + }, + { + mc: metricdata.Metrics{ + Name: "rpc.client.request.size", + Description: "Measures size of RPC request messages (uncompressed).", + Unit: "By", + Data: metricdata.Histogram[int64]{ + Temporality: metricdata.CumulativeTemporality, + DataPoints: []metricdata.HistogramDataPoint[int64]{ + { + Attributes: attribute.NewSet( + semconv.RPCMethod("EmptyCall"), + semconv.RPCService("grpc.testing.TestService"), + semconv.RPCSystemGRPC), + Bounds: []float64{0, 5, 10, 25, 50, 75, 100, 250, 500, 750, 1000, 2500, 5000, 7500, 10000}, + BucketCounts: []uint64{1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, + Max: metricdata.NewExtrema(int64(0)), + Min: metricdata.NewExtrema(int64(0)), + Count: 1, + Sum: 0, + }, + { + Attributes: attribute.NewSet( + semconv.RPCMethod("UnaryCall"), + semconv.RPCService("grpc.testing.TestService"), + semconv.RPCSystemGRPC), + Bounds: []float64{0, 5, 10, 25, 50, 75, 100, 250, 500, 750, 1000, 2500, 5000, 7500, 10000}, + BucketCounts: []uint64{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1}, + Max: metricdata.NewExtrema(int64(314167)), + Min: metricdata.NewExtrema(int64(314167)), + Count: 1, + Sum: 314167, + }, + { + Attributes: attribute.NewSet( + semconv.RPCMethod("StreamingInputCall"), + semconv.RPCService("grpc.testing.TestService"), + semconv.RPCSystemGRPC), + Bounds: []float64{0, 5, 10, 25, 50, 75, 100, 250, 500, 750, 1000, 2500, 5000, 7500, 10000}, + BucketCounts: []uint64{0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, + Max: metricdata.NewExtrema(int64(4)), + Min: metricdata.NewExtrema(int64(4)), + Count: 1, + Sum: 4, + }, + { + Attributes: attribute.NewSet( + semconv.RPCMethod("StreamingOutputCall"), + semconv.RPCService("grpc.testing.TestService"), + semconv.RPCSystemGRPC), + Bounds: []float64{0, 5, 10, 25, 50, 75, 100, 250, 500, 750, 1000, 2500, 5000, 7500, 10000}, + BucketCounts: []uint64{0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 2}, + Max: metricdata.NewExtrema(int64(58987)), + Min: metricdata.NewExtrema(int64(13)), + Count: 4, + Sum: 93082, + }, + { + Attributes: attribute.NewSet( + semconv.RPCMethod("FullDuplexCall"), + semconv.RPCService("grpc.testing.TestService"), + semconv.RPCSystemGRPC), + Bounds: []float64{0, 5, 10, 25, 50, 75, 100, 250, 500, 750, 1000, 2500, 5000, 7500, 10000}, + BucketCounts: []uint64{0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 2}, + Max: metricdata.NewExtrema(int64(58987)), + Min: metricdata.NewExtrema(int64(13)), + Count: 4, + Sum: 93082, + }, + }, + }, + }, + opts: []metricdatatest.Option{metricdatatest.IgnoreTimestamp()}, + }, + { + mc: metricdata.Metrics{ + Name: "rpc.client.response.size", + Description: "Measures size of RPC response messages (uncompressed).", + Unit: "By", + Data: metricdata.Histogram[int64]{ + Temporality: metricdata.CumulativeTemporality, + DataPoints: []metricdata.HistogramDataPoint[int64]{ + { + Attributes: attribute.NewSet( + semconv.RPCMethod("EmptyCall"), + semconv.RPCService("grpc.testing.TestService"), + semconv.RPCSystemGRPC), + Bounds: []float64{0, 5, 10, 25, 50, 75, 100, 250, 500, 750, 1000, 2500, 5000, 7500, 10000}, + BucketCounts: []uint64{1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, + Max: metricdata.NewExtrema(int64(0)), + Min: metricdata.NewExtrema(int64(0)), + Count: 1, + Sum: 0, + }, + { + Attributes: attribute.NewSet( + semconv.RPCMethod("UnaryCall"), + semconv.RPCService("grpc.testing.TestService"), + semconv.RPCSystemGRPC), + Bounds: []float64{0, 5, 10, 25, 50, 75, 100, 250, 500, 750, 1000, 2500, 5000, 7500, 10000}, + BucketCounts: []uint64{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1}, + Max: metricdata.NewExtrema(int64(271840)), + Min: metricdata.NewExtrema(int64(271840)), + Count: 1, + Sum: 271840, + }, + { + Attributes: attribute.NewSet( + semconv.RPCMethod("StreamingInputCall"), + semconv.RPCService("grpc.testing.TestService"), + semconv.RPCSystemGRPC), + Bounds: []float64{0, 5, 10, 25, 50, 75, 100, 250, 500, 750, 1000, 2500, 5000, 7500, 10000}, + BucketCounts: []uint64{0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 2}, + Max: metricdata.NewExtrema(int64(45912)), + Min: metricdata.NewExtrema(int64(12)), + Count: 4, + Sum: 74948, + }, + { + Attributes: attribute.NewSet( + semconv.RPCMethod("StreamingOutputCall"), + semconv.RPCService("grpc.testing.TestService"), + semconv.RPCSystemGRPC), + Bounds: []float64{0, 5, 10, 25, 50, 75, 100, 250, 500, 750, 1000, 2500, 5000, 7500, 10000}, + BucketCounts: []uint64{0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, + Max: metricdata.NewExtrema(int64(21)), + Min: metricdata.NewExtrema(int64(21)), + Count: 1, + Sum: 21, + }, + { + Attributes: attribute.NewSet( + semconv.RPCMethod("FullDuplexCall"), + semconv.RPCService("grpc.testing.TestService"), + semconv.RPCSystemGRPC), + Bounds: []float64{0, 5, 10, 25, 50, 75, 100, 250, 500, 750, 1000, 2500, 5000, 7500, 10000}, + BucketCounts: []uint64{0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 2}, + Max: metricdata.NewExtrema(int64(45918)), + Min: metricdata.NewExtrema(int64(16)), + Count: 4, + Sum: 74969, + }, + }, + }, + }, + opts: []metricdatatest.Option{metricdatatest.IgnoreTimestamp()}, + }, + { + mc: metricdata.Metrics{ + Name: "rpc.client.requests_per_rpc", + Description: "Measures the number of messages received per RPC. Should be 1 for all non-streaming RPCs.", + Unit: "{count}", + Data: metricdata.Histogram[int64]{ + Temporality: metricdata.CumulativeTemporality, + DataPoints: []metricdata.HistogramDataPoint[int64]{ + { + Attributes: attribute.NewSet( + semconv.RPCGRPCStatusCodeOk, + semconv.RPCMethod("EmptyCall"), + semconv.RPCService("grpc.testing.TestService"), + semconv.RPCSystemGRPC), + Bounds: []float64{0, 5, 10, 25, 50, 75, 100, 250, 500, 750, 1000, 2500, 5000, 7500, 10000}, + BucketCounts: []uint64{0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, + Max: metricdata.NewExtrema(int64(1)), + Min: metricdata.NewExtrema(int64(1)), + Count: 1, + Sum: 1, + }, + { + Attributes: attribute.NewSet( + semconv.RPCGRPCStatusCodeOk, + semconv.RPCMethod("UnaryCall"), + semconv.RPCService("grpc.testing.TestService"), + semconv.RPCSystemGRPC), + Bounds: []float64{0, 5, 10, 25, 50, 75, 100, 250, 500, 750, 1000, 2500, 5000, 7500, 10000}, + BucketCounts: []uint64{0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, + Max: metricdata.NewExtrema(int64(1)), + Min: metricdata.NewExtrema(int64(1)), + Count: 1, + Sum: 1, + }, + { + Attributes: attribute.NewSet( + semconv.RPCGRPCStatusCodeOk, + semconv.RPCMethod("StreamingInputCall"), + semconv.RPCService("grpc.testing.TestService"), + semconv.RPCSystemGRPC), + Bounds: []float64{0, 5, 10, 25, 50, 75, 100, 250, 500, 750, 1000, 2500, 5000, 7500, 10000}, + BucketCounts: []uint64{0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, + Max: metricdata.NewExtrema(int64(1)), + Min: metricdata.NewExtrema(int64(1)), + Count: 1, + Sum: 1, + }, + { + Attributes: attribute.NewSet( + semconv.RPCGRPCStatusCodeOk, + semconv.RPCMethod("StreamingOutputCall"), + semconv.RPCService("grpc.testing.TestService"), + semconv.RPCSystemGRPC), + Bounds: []float64{0, 5, 10, 25, 50, 75, 100, 250, 500, 750, 1000, 2500, 5000, 7500, 10000}, + BucketCounts: []uint64{0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, + Max: metricdata.NewExtrema(int64(4)), + Min: metricdata.NewExtrema(int64(4)), + Count: 1, + Sum: 4, + }, + { + Attributes: attribute.NewSet( + semconv.RPCGRPCStatusCodeOk, + semconv.RPCMethod("FullDuplexCall"), + semconv.RPCService("grpc.testing.TestService"), + semconv.RPCSystemGRPC), + Bounds: []float64{0, 5, 10, 25, 50, 75, 100, 250, 500, 750, 1000, 2500, 5000, 7500, 10000}, + BucketCounts: []uint64{0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, + Max: metricdata.NewExtrema(int64(4)), + Min: metricdata.NewExtrema(int64(4)), + Count: 1, + Sum: 4, + }, + }, + }, + }, + opts: []metricdatatest.Option{metricdatatest.IgnoreTimestamp()}, + }, + { + mc: metricdata.Metrics{ + Name: "rpc.client.responses_per_rpc", + Description: "Measures the number of messages received per RPC. Should be 1 for all non-streaming RPCs.", + Unit: "{count}", + Data: metricdata.Histogram[int64]{ + Temporality: metricdata.CumulativeTemporality, + DataPoints: []metricdata.HistogramDataPoint[int64]{ + { + Attributes: attribute.NewSet( + semconv.RPCGRPCStatusCodeOk, + semconv.RPCMethod("EmptyCall"), + semconv.RPCService("grpc.testing.TestService"), + semconv.RPCSystemGRPC), + Bounds: []float64{0, 5, 10, 25, 50, 75, 100, 250, 500, 750, 1000, 2500, 5000, 7500, 10000}, + BucketCounts: []uint64{0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, + Max: metricdata.NewExtrema(int64(1)), + Min: metricdata.NewExtrema(int64(1)), + Count: 1, + Sum: 1, + }, + { + Attributes: attribute.NewSet( + semconv.RPCGRPCStatusCodeOk, + semconv.RPCMethod("UnaryCall"), + semconv.RPCService("grpc.testing.TestService"), + semconv.RPCSystemGRPC), + Bounds: []float64{0, 5, 10, 25, 50, 75, 100, 250, 500, 750, 1000, 2500, 5000, 7500, 10000}, + BucketCounts: []uint64{0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, + Max: metricdata.NewExtrema(int64(1)), + Min: metricdata.NewExtrema(int64(1)), + Count: 1, + Sum: 1, + }, + { + Attributes: attribute.NewSet( + semconv.RPCGRPCStatusCodeOk, + semconv.RPCMethod("StreamingInputCall"), + semconv.RPCService("grpc.testing.TestService"), + semconv.RPCSystemGRPC), + Bounds: []float64{0, 5, 10, 25, 50, 75, 100, 250, 500, 750, 1000, 2500, 5000, 7500, 10000}, + BucketCounts: []uint64{0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, + Max: metricdata.NewExtrema(int64(4)), + Min: metricdata.NewExtrema(int64(4)), + Count: 1, + Sum: 4, + }, + { + Attributes: attribute.NewSet( + semconv.RPCGRPCStatusCodeOk, + semconv.RPCMethod("StreamingOutputCall"), + semconv.RPCService("grpc.testing.TestService"), + semconv.RPCSystemGRPC), + Bounds: []float64{0, 5, 10, 25, 50, 75, 100, 250, 500, 750, 1000, 2500, 5000, 7500, 10000}, + BucketCounts: []uint64{0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, + Max: metricdata.NewExtrema(int64(1)), + Min: metricdata.NewExtrema(int64(1)), + Count: 1, + Sum: 1, + }, + { + Attributes: attribute.NewSet( + semconv.RPCGRPCStatusCodeOk, + semconv.RPCMethod("FullDuplexCall"), + semconv.RPCService("grpc.testing.TestService"), + semconv.RPCSystemGRPC), + Bounds: []float64{0, 5, 10, 25, 50, 75, 100, 250, 500, 750, 1000, 2500, 5000, 7500, 10000}, + BucketCounts: []uint64{0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, + Max: metricdata.NewExtrema(int64(4)), + Min: metricdata.NewExtrema(int64(4)), + Count: 1, + Sum: 4, + }, + }, + }, + }, + opts: []metricdatatest.Option{metricdatatest.IgnoreTimestamp()}, + }, + } + for i, mc := range rm.ScopeMetrics[0].Metrics { + t.Run(mc.Name, func(t *testing.T) { + metricdatatest.AssertEqual(t, expectedMetrics[i].mc, mc, expectedMetrics[i].opts...) + }) } } -func checkClientRecords(t *testing.T, reader metric.Reader) { +func checkServerRecords(t *testing.T, reader metric.Reader) { rm := metricdata.ResourceMetrics{} err := reader.Collect(context.Background(), &rm) assert.NoError(t, err) require.Len(t, rm.ScopeMetrics, 1) require.Len(t, rm.ScopeMetrics[0].Metrics, 5) - for _, m := range rm.ScopeMetrics[0].Metrics { - require.IsType(t, m.Data, metricdata.Histogram[int64]{}) - data := m.Data.(metricdata.Histogram[int64]) - for _, dpt := range data.DataPoints { - attr := dpt.Attributes.ToSlice() - method := getRPCMethod(attr) - assert.NotEmpty(t, method) - assert.ElementsMatch(t, []attribute.KeyValue{ - semconv.RPCMethod(method), - semconv.RPCService("grpc.testing.TestService"), - otelgrpc.RPCSystemGRPC, - otelgrpc.GRPCStatusCodeKey.Int64(int64(codes.OK)), - }, attr) - } + expectedMetrics := []struct { + mc metricdata.Metrics + opts []metricdatatest.Option + }{ + { + mc: metricdata.Metrics{ + Name: "rpc.server.duration", + Description: "Measures the duration of inbound RPC.", + Unit: "ms", + Data: metricdata.Histogram[float64]{ + Temporality: metricdata.CumulativeTemporality, + DataPoints: []metricdata.HistogramDataPoint[float64]{ + { + Attributes: attribute.NewSet( + semconv.RPCGRPCStatusCodeOk, + semconv.RPCMethod("EmptyCall"), + semconv.RPCService("grpc.testing.TestService"), + semconv.RPCSystemGRPC), + }, + { + Attributes: attribute.NewSet( + semconv.RPCGRPCStatusCodeOk, + semconv.RPCMethod("UnaryCall"), + semconv.RPCService("grpc.testing.TestService"), + semconv.RPCSystemGRPC), + }, + { + Attributes: attribute.NewSet( + semconv.RPCGRPCStatusCodeOk, + semconv.RPCMethod("StreamingInputCall"), + semconv.RPCService("grpc.testing.TestService"), + semconv.RPCSystemGRPC), + }, + { + Attributes: attribute.NewSet( + semconv.RPCGRPCStatusCodeOk, + semconv.RPCMethod("StreamingOutputCall"), + semconv.RPCService("grpc.testing.TestService"), + semconv.RPCSystemGRPC), + }, + { + Attributes: attribute.NewSet( + semconv.RPCGRPCStatusCodeOk, + semconv.RPCMethod("FullDuplexCall"), + semconv.RPCService("grpc.testing.TestService"), + semconv.RPCSystemGRPC), + }, + }, + }, + }, + opts: []metricdatatest.Option{metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreValue()}, + }, + { + mc: metricdata.Metrics{ + Name: "rpc.server.request.size", + Description: "Measures size of RPC request messages (uncompressed).", + Unit: "By", + Data: metricdata.Histogram[int64]{ + Temporality: metricdata.CumulativeTemporality, + DataPoints: []metricdata.HistogramDataPoint[int64]{ + { + Attributes: attribute.NewSet( + semconv.RPCMethod("EmptyCall"), + semconv.RPCService("grpc.testing.TestService"), + semconv.RPCSystemGRPC), + Bounds: []float64{0, 5, 10, 25, 50, 75, 100, 250, 500, 750, 1000, 2500, 5000, 7500, 10000}, + BucketCounts: []uint64{1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, + Max: metricdata.NewExtrema(int64(0)), + Min: metricdata.NewExtrema(int64(0)), + Count: 1, + Sum: 0, + }, + { + Attributes: attribute.NewSet( + semconv.RPCMethod("UnaryCall"), + semconv.RPCService("grpc.testing.TestService"), + semconv.RPCSystemGRPC), + Bounds: []float64{0, 5, 10, 25, 50, 75, 100, 250, 500, 750, 1000, 2500, 5000, 7500, 10000}, + BucketCounts: []uint64{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1}, + Max: metricdata.NewExtrema(int64(271840)), + Min: metricdata.NewExtrema(int64(271840)), + Count: 1, + Sum: 271840, + }, + { + Attributes: attribute.NewSet( + semconv.RPCMethod("StreamingInputCall"), + semconv.RPCService("grpc.testing.TestService"), + semconv.RPCSystemGRPC), + Bounds: []float64{0, 5, 10, 25, 50, 75, 100, 250, 500, 750, 1000, 2500, 5000, 7500, 10000}, + BucketCounts: []uint64{0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 2}, + Max: metricdata.NewExtrema(int64(45912)), + Min: metricdata.NewExtrema(int64(12)), + Count: 4, + Sum: 74948, + }, + { + Attributes: attribute.NewSet( + semconv.RPCMethod("StreamingOutputCall"), + semconv.RPCService("grpc.testing.TestService"), + semconv.RPCSystemGRPC), + Bounds: []float64{0, 5, 10, 25, 50, 75, 100, 250, 500, 750, 1000, 2500, 5000, 7500, 10000}, + BucketCounts: []uint64{0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, + Max: metricdata.NewExtrema(int64(21)), + Min: metricdata.NewExtrema(int64(21)), + Count: 1, + Sum: 21, + }, + { + Attributes: attribute.NewSet( + semconv.RPCMethod("FullDuplexCall"), + semconv.RPCService("grpc.testing.TestService"), + semconv.RPCSystemGRPC), + Bounds: []float64{0, 5, 10, 25, 50, 75, 100, 250, 500, 750, 1000, 2500, 5000, 7500, 10000}, + BucketCounts: []uint64{0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 2}, + Max: metricdata.NewExtrema(int64(45918)), + Min: metricdata.NewExtrema(int64(16)), + Count: 4, + Sum: 74969, + }, + }, + }, + }, + opts: []metricdatatest.Option{metricdatatest.IgnoreTimestamp()}, + }, + { + mc: metricdata.Metrics{ + Name: "rpc.server.response.size", + Description: "Measures size of RPC response messages (uncompressed).", + Unit: "By", + Data: metricdata.Histogram[int64]{ + Temporality: metricdata.CumulativeTemporality, + DataPoints: []metricdata.HistogramDataPoint[int64]{ + { + Attributes: attribute.NewSet( + semconv.RPCMethod("EmptyCall"), + semconv.RPCService("grpc.testing.TestService"), + semconv.RPCSystemGRPC), + Bounds: []float64{0, 5, 10, 25, 50, 75, 100, 250, 500, 750, 1000, 2500, 5000, 7500, 10000}, + BucketCounts: []uint64{1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, + Max: metricdata.NewExtrema(int64(0)), + Min: metricdata.NewExtrema(int64(0)), + Count: 1, + Sum: 0, + }, + { + Attributes: attribute.NewSet( + semconv.RPCMethod("UnaryCall"), + semconv.RPCService("grpc.testing.TestService"), + semconv.RPCSystemGRPC), + Bounds: []float64{0, 5, 10, 25, 50, 75, 100, 250, 500, 750, 1000, 2500, 5000, 7500, 10000}, + BucketCounts: []uint64{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1}, + Max: metricdata.NewExtrema(int64(314167)), + Min: metricdata.NewExtrema(int64(314167)), + Count: 1, + Sum: 314167, + }, + { + Attributes: attribute.NewSet( + semconv.RPCMethod("StreamingInputCall"), + semconv.RPCService("grpc.testing.TestService"), + semconv.RPCSystemGRPC), + Bounds: []float64{0, 5, 10, 25, 50, 75, 100, 250, 500, 750, 1000, 2500, 5000, 7500, 10000}, + BucketCounts: []uint64{0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, + Max: metricdata.NewExtrema(int64(4)), + Min: metricdata.NewExtrema(int64(4)), + Count: 1, + Sum: 4, + }, + { + Attributes: attribute.NewSet( + semconv.RPCMethod("StreamingOutputCall"), + semconv.RPCService("grpc.testing.TestService"), + semconv.RPCSystemGRPC), + Bounds: []float64{0, 5, 10, 25, 50, 75, 100, 250, 500, 750, 1000, 2500, 5000, 7500, 10000}, + BucketCounts: []uint64{0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 2}, + Max: metricdata.NewExtrema(int64(58987)), + Min: metricdata.NewExtrema(int64(13)), + Count: 4, + Sum: 93082, + }, + { + Attributes: attribute.NewSet( + semconv.RPCMethod("FullDuplexCall"), + semconv.RPCService("grpc.testing.TestService"), + semconv.RPCSystemGRPC), + Bounds: []float64{0, 5, 10, 25, 50, 75, 100, 250, 500, 750, 1000, 2500, 5000, 7500, 10000}, + BucketCounts: []uint64{0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 2}, + Max: metricdata.NewExtrema(int64(58987)), + Min: metricdata.NewExtrema(int64(13)), + Count: 4, + Sum: 93082, + }, + }, + }, + }, + opts: []metricdatatest.Option{metricdatatest.IgnoreTimestamp()}, + }, + { + mc: metricdata.Metrics{ + Name: "rpc.server.requests_per_rpc", + Description: "Measures the number of messages received per RPC. Should be 1 for all non-streaming RPCs.", + Unit: "{count}", + Data: metricdata.Histogram[int64]{ + Temporality: metricdata.CumulativeTemporality, + DataPoints: []metricdata.HistogramDataPoint[int64]{ + { + Attributes: attribute.NewSet( + semconv.RPCGRPCStatusCodeOk, + semconv.RPCMethod("EmptyCall"), + semconv.RPCService("grpc.testing.TestService"), + semconv.RPCSystemGRPC), + Bounds: []float64{0, 5, 10, 25, 50, 75, 100, 250, 500, 750, 1000, 2500, 5000, 7500, 10000}, + BucketCounts: []uint64{0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, + Max: metricdata.NewExtrema(int64(1)), + Min: metricdata.NewExtrema(int64(1)), + Count: 1, + Sum: 1, + }, + { + Attributes: attribute.NewSet( + semconv.RPCGRPCStatusCodeOk, + semconv.RPCMethod("UnaryCall"), + semconv.RPCService("grpc.testing.TestService"), + semconv.RPCSystemGRPC), + Bounds: []float64{0, 5, 10, 25, 50, 75, 100, 250, 500, 750, 1000, 2500, 5000, 7500, 10000}, + BucketCounts: []uint64{0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, + Max: metricdata.NewExtrema(int64(1)), + Min: metricdata.NewExtrema(int64(1)), + Count: 1, + Sum: 1, + }, + { + Attributes: attribute.NewSet( + semconv.RPCGRPCStatusCodeOk, + semconv.RPCMethod("StreamingInputCall"), + semconv.RPCService("grpc.testing.TestService"), + semconv.RPCSystemGRPC), + Bounds: []float64{0, 5, 10, 25, 50, 75, 100, 250, 500, 750, 1000, 2500, 5000, 7500, 10000}, + BucketCounts: []uint64{0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, + Max: metricdata.NewExtrema(int64(4)), + Min: metricdata.NewExtrema(int64(4)), + Count: 1, + Sum: 4, + }, + { + Attributes: attribute.NewSet( + semconv.RPCGRPCStatusCodeOk, + semconv.RPCMethod("StreamingOutputCall"), + semconv.RPCService("grpc.testing.TestService"), + semconv.RPCSystemGRPC), + Bounds: []float64{0, 5, 10, 25, 50, 75, 100, 250, 500, 750, 1000, 2500, 5000, 7500, 10000}, + BucketCounts: []uint64{0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, + Max: metricdata.NewExtrema(int64(1)), + Min: metricdata.NewExtrema(int64(1)), + Count: 1, + Sum: 1, + }, + { + Attributes: attribute.NewSet( + semconv.RPCGRPCStatusCodeOk, + semconv.RPCMethod("FullDuplexCall"), + semconv.RPCService("grpc.testing.TestService"), + semconv.RPCSystemGRPC), + Bounds: []float64{0, 5, 10, 25, 50, 75, 100, 250, 500, 750, 1000, 2500, 5000, 7500, 10000}, + BucketCounts: []uint64{0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, + Max: metricdata.NewExtrema(int64(4)), + Min: metricdata.NewExtrema(int64(4)), + Count: 1, + Sum: 4, + }, + }, + }, + }, + opts: []metricdatatest.Option{metricdatatest.IgnoreTimestamp()}, + }, + { + mc: metricdata.Metrics{ + Name: "rpc.server.responses_per_rpc", + Description: "Measures the number of messages received per RPC. Should be 1 for all non-streaming RPCs.", + Unit: "{count}", + Data: metricdata.Histogram[int64]{ + Temporality: metricdata.CumulativeTemporality, + DataPoints: []metricdata.HistogramDataPoint[int64]{ + { + Attributes: attribute.NewSet( + semconv.RPCGRPCStatusCodeOk, + semconv.RPCMethod("EmptyCall"), + semconv.RPCService("grpc.testing.TestService"), + semconv.RPCSystemGRPC), + Bounds: []float64{0, 5, 10, 25, 50, 75, 100, 250, 500, 750, 1000, 2500, 5000, 7500, 10000}, + BucketCounts: []uint64{0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, + Max: metricdata.NewExtrema(int64(1)), + Min: metricdata.NewExtrema(int64(1)), + Count: 1, + Sum: 1, + }, + { + Attributes: attribute.NewSet( + semconv.RPCGRPCStatusCodeOk, + semconv.RPCMethod("UnaryCall"), + semconv.RPCService("grpc.testing.TestService"), + semconv.RPCSystemGRPC), + Bounds: []float64{0, 5, 10, 25, 50, 75, 100, 250, 500, 750, 1000, 2500, 5000, 7500, 10000}, + BucketCounts: []uint64{0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, + Max: metricdata.NewExtrema(int64(1)), + Min: metricdata.NewExtrema(int64(1)), + Count: 1, + Sum: 1, + }, + { + Attributes: attribute.NewSet( + semconv.RPCGRPCStatusCodeOk, + semconv.RPCMethod("StreamingInputCall"), + semconv.RPCService("grpc.testing.TestService"), + semconv.RPCSystemGRPC), + Bounds: []float64{0, 5, 10, 25, 50, 75, 100, 250, 500, 750, 1000, 2500, 5000, 7500, 10000}, + BucketCounts: []uint64{0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, + Max: metricdata.NewExtrema(int64(1)), + Min: metricdata.NewExtrema(int64(1)), + Count: 1, + Sum: 1, + }, + { + Attributes: attribute.NewSet( + semconv.RPCGRPCStatusCodeOk, + semconv.RPCMethod("StreamingOutputCall"), + semconv.RPCService("grpc.testing.TestService"), + semconv.RPCSystemGRPC), + Bounds: []float64{0, 5, 10, 25, 50, 75, 100, 250, 500, 750, 1000, 2500, 5000, 7500, 10000}, + BucketCounts: []uint64{0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, + Max: metricdata.NewExtrema(int64(4)), + Min: metricdata.NewExtrema(int64(4)), + Count: 1, + Sum: 4, + }, + { + Attributes: attribute.NewSet( + semconv.RPCGRPCStatusCodeOk, + semconv.RPCMethod("FullDuplexCall"), + semconv.RPCService("grpc.testing.TestService"), + semconv.RPCSystemGRPC), + Bounds: []float64{0, 5, 10, 25, 50, 75, 100, 250, 500, 750, 1000, 2500, 5000, 7500, 10000}, + BucketCounts: []uint64{0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, + Max: metricdata.NewExtrema(int64(4)), + Min: metricdata.NewExtrema(int64(4)), + Count: 1, + Sum: 4, + }, + }, + }, + }, + opts: []metricdatatest.Option{metricdatatest.IgnoreTimestamp()}, + }, + } + for i, mc := range rm.ScopeMetrics[0].Metrics { + t.Run(mc.Name, func(t *testing.T) { + metricdatatest.AssertEqual(t, expectedMetrics[i].mc, mc, expectedMetrics[i].opts...) + }) } } From 39b0b97eaea7d4dff7326a4ce941d4d98a7dceb8 Mon Sep 17 00:00:00 2001 From: Ziqi Zhao Date: Tue, 31 Oct 2023 09:32:26 +0800 Subject: [PATCH 3/9] rename the test function Signed-off-by: Ziqi Zhao --- .../grpc/otelgrpc/test/grpc_stats_handler_test.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/instrumentation/google.golang.org/grpc/otelgrpc/test/grpc_stats_handler_test.go b/instrumentation/google.golang.org/grpc/otelgrpc/test/grpc_stats_handler_test.go index 9e289fa0632..3bc1d8af542 100644 --- a/instrumentation/google.golang.org/grpc/otelgrpc/test/grpc_stats_handler_test.go +++ b/instrumentation/google.golang.org/grpc/otelgrpc/test/grpc_stats_handler_test.go @@ -59,7 +59,7 @@ func TestStatsHandler(t *testing.T) { }) t.Run("ClientMetrics", func(t *testing.T) { - checkClientRecords(t, clientMetricReader) + checkClientMetrics(t, clientMetricReader) }) t.Run("ServerSpans", func(t *testing.T) { @@ -67,7 +67,7 @@ func TestStatsHandler(t *testing.T) { }) t.Run("ServerMetrics", func(t *testing.T) { - checkServerRecords(t, serverMetricReader) + checkServerMetrics(t, serverMetricReader) }) } @@ -597,7 +597,7 @@ func checkServerSpans(t *testing.T, spans []trace.ReadOnlySpan) { }, pingPong.Attributes()) } -func checkClientRecords(t *testing.T, reader metric.Reader) { +func checkClientMetrics(t *testing.T, reader metric.Reader) { rm := metricdata.ResourceMetrics{} err := reader.Collect(context.Background(), &rm) assert.NoError(t, err) @@ -965,7 +965,7 @@ func checkClientRecords(t *testing.T, reader metric.Reader) { } } -func checkServerRecords(t *testing.T, reader metric.Reader) { +func checkServerMetrics(t *testing.T, reader metric.Reader) { rm := metricdata.ResourceMetrics{} err := reader.Collect(context.Background(), &rm) assert.NoError(t, err) From 37de1ec481f0aa09a289df5e0d02bc412c931aac Mon Sep 17 00:00:00 2001 From: Ziqi Zhao Date: Wed, 1 Nov 2023 09:26:58 +0800 Subject: [PATCH 4/9] modify unit test to compare scopeMetrics directly Signed-off-by: Ziqi Zhao --- .../otelgrpc/test/grpc_stats_handler_test.go | 86 +++++++------------ 1 file changed, 30 insertions(+), 56 deletions(-) diff --git a/instrumentation/google.golang.org/grpc/otelgrpc/test/grpc_stats_handler_test.go b/instrumentation/google.golang.org/grpc/otelgrpc/test/grpc_stats_handler_test.go index 3bc1d8af542..933969be81a 100644 --- a/instrumentation/google.golang.org/grpc/otelgrpc/test/grpc_stats_handler_test.go +++ b/instrumentation/google.golang.org/grpc/otelgrpc/test/grpc_stats_handler_test.go @@ -25,6 +25,7 @@ import ( "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc" "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/sdk/instrumentation" "go.opentelemetry.io/otel/sdk/metric" "go.opentelemetry.io/otel/sdk/metric/metricdata" "go.opentelemetry.io/otel/sdk/metric/metricdata/metricdatatest" @@ -603,12 +604,14 @@ func checkClientMetrics(t *testing.T, reader metric.Reader) { assert.NoError(t, err) require.Len(t, rm.ScopeMetrics, 1) require.Len(t, rm.ScopeMetrics[0].Metrics, 5) - expectedMetrics := []struct { - mc metricdata.Metrics - opts []metricdatatest.Option - }{ - { - mc: metricdata.Metrics{ + expectedScopeMetric := metricdata.ScopeMetrics{ + Scope: instrumentation.Scope{ + Name: "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc", + Version: otelgrpc.Version(), + SchemaURL: "https://opentelemetry.io/schemas/1.17.0", + }, + Metrics: []metricdata.Metrics{ + { Name: "rpc.client.duration", Description: "Measures the duration of inbound RPC.", Unit: "ms", @@ -653,10 +656,7 @@ func checkClientMetrics(t *testing.T, reader metric.Reader) { }, }, }, - opts: []metricdatatest.Option{metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreValue()}, - }, - { - mc: metricdata.Metrics{ + { Name: "rpc.client.request.size", Description: "Measures size of RPC request messages (uncompressed).", Unit: "By", @@ -726,10 +726,7 @@ func checkClientMetrics(t *testing.T, reader metric.Reader) { }, }, }, - opts: []metricdatatest.Option{metricdatatest.IgnoreTimestamp()}, - }, - { - mc: metricdata.Metrics{ + { Name: "rpc.client.response.size", Description: "Measures size of RPC response messages (uncompressed).", Unit: "By", @@ -799,10 +796,7 @@ func checkClientMetrics(t *testing.T, reader metric.Reader) { }, }, }, - opts: []metricdatatest.Option{metricdatatest.IgnoreTimestamp()}, - }, - { - mc: metricdata.Metrics{ + { Name: "rpc.client.requests_per_rpc", Description: "Measures the number of messages received per RPC. Should be 1 for all non-streaming RPCs.", Unit: "{count}", @@ -877,10 +871,7 @@ func checkClientMetrics(t *testing.T, reader metric.Reader) { }, }, }, - opts: []metricdatatest.Option{metricdatatest.IgnoreTimestamp()}, - }, - { - mc: metricdata.Metrics{ + { Name: "rpc.client.responses_per_rpc", Description: "Measures the number of messages received per RPC. Should be 1 for all non-streaming RPCs.", Unit: "{count}", @@ -955,14 +946,10 @@ func checkClientMetrics(t *testing.T, reader metric.Reader) { }, }, }, - opts: []metricdatatest.Option{metricdatatest.IgnoreTimestamp()}, }, } - for i, mc := range rm.ScopeMetrics[0].Metrics { - t.Run(mc.Name, func(t *testing.T) { - metricdatatest.AssertEqual(t, expectedMetrics[i].mc, mc, expectedMetrics[i].opts...) - }) - } + metricdatatest.AssertEqual(t, expectedScopeMetric, rm.ScopeMetrics[0], metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreValue()) + } func checkServerMetrics(t *testing.T, reader metric.Reader) { @@ -971,12 +958,14 @@ func checkServerMetrics(t *testing.T, reader metric.Reader) { assert.NoError(t, err) require.Len(t, rm.ScopeMetrics, 1) require.Len(t, rm.ScopeMetrics[0].Metrics, 5) - expectedMetrics := []struct { - mc metricdata.Metrics - opts []metricdatatest.Option - }{ - { - mc: metricdata.Metrics{ + expectedScopeMetric := metricdata.ScopeMetrics{ + Scope: instrumentation.Scope{ + Name: "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc", + Version: otelgrpc.Version(), + SchemaURL: "https://opentelemetry.io/schemas/1.17.0", + }, + Metrics: []metricdata.Metrics{ + { Name: "rpc.server.duration", Description: "Measures the duration of inbound RPC.", Unit: "ms", @@ -1021,10 +1010,7 @@ func checkServerMetrics(t *testing.T, reader metric.Reader) { }, }, }, - opts: []metricdatatest.Option{metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreValue()}, - }, - { - mc: metricdata.Metrics{ + { Name: "rpc.server.request.size", Description: "Measures size of RPC request messages (uncompressed).", Unit: "By", @@ -1094,10 +1080,7 @@ func checkServerMetrics(t *testing.T, reader metric.Reader) { }, }, }, - opts: []metricdatatest.Option{metricdatatest.IgnoreTimestamp()}, - }, - { - mc: metricdata.Metrics{ + { Name: "rpc.server.response.size", Description: "Measures size of RPC response messages (uncompressed).", Unit: "By", @@ -1167,10 +1150,7 @@ func checkServerMetrics(t *testing.T, reader metric.Reader) { }, }, }, - opts: []metricdatatest.Option{metricdatatest.IgnoreTimestamp()}, - }, - { - mc: metricdata.Metrics{ + { Name: "rpc.server.requests_per_rpc", Description: "Measures the number of messages received per RPC. Should be 1 for all non-streaming RPCs.", Unit: "{count}", @@ -1245,10 +1225,7 @@ func checkServerMetrics(t *testing.T, reader metric.Reader) { }, }, }, - opts: []metricdatatest.Option{metricdatatest.IgnoreTimestamp()}, - }, - { - mc: metricdata.Metrics{ + { Name: "rpc.server.responses_per_rpc", Description: "Measures the number of messages received per RPC. Should be 1 for all non-streaming RPCs.", Unit: "{count}", @@ -1323,12 +1300,9 @@ func checkServerMetrics(t *testing.T, reader metric.Reader) { }, }, }, - opts: []metricdatatest.Option{metricdatatest.IgnoreTimestamp()}, }, } - for i, mc := range rm.ScopeMetrics[0].Metrics { - t.Run(mc.Name, func(t *testing.T) { - metricdatatest.AssertEqual(t, expectedMetrics[i].mc, mc, expectedMetrics[i].opts...) - }) - } + + metricdatatest.AssertEqual(t, expectedScopeMetric, rm.ScopeMetrics[0], metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreValue()) + } From e455fd292e3c504912a84c423ae7b34fd783667d Mon Sep 17 00:00:00 2001 From: Ziqi Zhao Date: Fri, 3 Nov 2023 08:46:19 +0800 Subject: [PATCH 5/9] move all metrics into config Signed-off-by: Ziqi Zhao --- .../google.golang.org/grpc/otelgrpc/config.go | 48 ++++++- .../grpc/otelgrpc/interceptor.go | 10 +- .../grpc/otelgrpc/metadata_supplier.go | 4 +- .../grpc/otelgrpc/stats_handler.go | 130 ++---------------- 4 files changed, 63 insertions(+), 129 deletions(-) diff --git a/instrumentation/google.golang.org/grpc/otelgrpc/config.go b/instrumentation/google.golang.org/grpc/otelgrpc/config.go index d9b91a24b17..bad05b6734f 100644 --- a/instrumentation/google.golang.org/grpc/otelgrpc/config.go +++ b/instrumentation/google.golang.org/grpc/otelgrpc/config.go @@ -46,8 +46,14 @@ type config struct { ReceivedEvent bool SentEvent bool - meter metric.Meter - rpcServerDuration metric.Int64Histogram + tracer trace.Tracer + meter metric.Meter + + rpcDuration metric.Float64Histogram + rpcRequestSize metric.Int64Histogram + rpcResponseSize metric.Int64Histogram + rpcRequestsPerRPC metric.Int64Histogram + rpcResponsesPerRPC metric.Int64Histogram } // Option applies an option value for a config. @@ -56,7 +62,7 @@ type Option interface { } // newConfig returns a config configured with all the passed Options. -func newConfig(opts []Option) *config { +func newConfig(opts []Option, role string) *config { c := &config{ Propagators: otel.GetTextMapPropagator(), TracerProvider: otel.GetTracerProvider(), @@ -66,19 +72,53 @@ func newConfig(opts []Option) *config { o.apply(c) } + c.tracer = c.TracerProvider.Tracer( + instrumentationName, + trace.WithInstrumentationVersion(SemVersion()), + ) + c.meter = c.MeterProvider.Meter( instrumentationName, metric.WithInstrumentationVersion(Version()), metric.WithSchemaURL(semconv.SchemaURL), ) + var err error - c.rpcServerDuration, err = c.meter.Int64Histogram("rpc.server.duration", + c.rpcDuration, err = c.meter.Float64Histogram("rpc."+role+".duration", metric.WithDescription("Measures the duration of inbound RPC."), metric.WithUnit("ms")) if err != nil { otel.Handle(err) } + c.rpcRequestSize, err = c.meter.Int64Histogram("rpc."+role+".request.size", + metric.WithDescription("Measures size of RPC request messages (uncompressed)."), + metric.WithUnit("By")) + if err != nil { + otel.Handle(err) + } + + c.rpcResponseSize, err = c.meter.Int64Histogram("rpc."+role+".response.size", + metric.WithDescription("Measures size of RPC response messages (uncompressed)."), + metric.WithUnit("By")) + if err != nil { + otel.Handle(err) + } + + c.rpcRequestsPerRPC, err = c.meter.Int64Histogram("rpc."+role+".requests_per_rpc", + metric.WithDescription("Measures the number of messages received per RPC. Should be 1 for all non-streaming RPCs."), + metric.WithUnit("{count}")) + if err != nil { + otel.Handle(err) + } + + c.rpcResponsesPerRPC, err = c.meter.Int64Histogram("rpc."+role+".responses_per_rpc", + metric.WithDescription("Measures the number of messages received per RPC. Should be 1 for all non-streaming RPCs."), + metric.WithUnit("{count}")) + if err != nil { + otel.Handle(err) + } + return c } diff --git a/instrumentation/google.golang.org/grpc/otelgrpc/interceptor.go b/instrumentation/google.golang.org/grpc/otelgrpc/interceptor.go index 815eabb907e..031d1f4df68 100644 --- a/instrumentation/google.golang.org/grpc/otelgrpc/interceptor.go +++ b/instrumentation/google.golang.org/grpc/otelgrpc/interceptor.go @@ -61,7 +61,7 @@ var ( // UnaryClientInterceptor returns a grpc.UnaryClientInterceptor suitable // for use in a grpc.Dial call. func UnaryClientInterceptor(opts ...Option) grpc.UnaryClientInterceptor { - cfg := newConfig(opts) + cfg := newConfig(opts, "client") tracer := cfg.TracerProvider.Tracer( instrumentationName, trace.WithInstrumentationVersion(Version()), @@ -255,7 +255,7 @@ func (w *clientStream) sendStreamEvent(eventType streamEventType, err error) { // StreamClientInterceptor returns a grpc.StreamClientInterceptor suitable // for use in a grpc.Dial call. func StreamClientInterceptor(opts ...Option) grpc.StreamClientInterceptor { - cfg := newConfig(opts) + cfg := newConfig(opts, "client") tracer := cfg.TracerProvider.Tracer( instrumentationName, trace.WithInstrumentationVersion(Version()), @@ -325,7 +325,7 @@ func StreamClientInterceptor(opts ...Option) grpc.StreamClientInterceptor { // UnaryServerInterceptor returns a grpc.UnaryServerInterceptor suitable // for use in a grpc.NewServer call. func UnaryServerInterceptor(opts ...Option) grpc.UnaryServerInterceptor { - cfg := newConfig(opts) + cfg := newConfig(opts, "server") tracer := cfg.TracerProvider.Tracer( instrumentationName, trace.WithInstrumentationVersion(Version()), @@ -387,7 +387,7 @@ func UnaryServerInterceptor(opts ...Option) grpc.UnaryServerInterceptor { elapsedTime := time.Since(before).Milliseconds() attr = append(attr, grpcStatusCodeAttr) - cfg.rpcServerDuration.Record(ctx, elapsedTime, metric.WithAttributes(attr...)) + cfg.rpcDuration.Record(ctx, float64(elapsedTime), metric.WithAttributes(attr...)) return resp, err } @@ -446,7 +446,7 @@ func wrapServerStream(ctx context.Context, ss grpc.ServerStream, cfg *config) *s // StreamServerInterceptor returns a grpc.StreamServerInterceptor suitable // for use in a grpc.NewServer call. func StreamServerInterceptor(opts ...Option) grpc.StreamServerInterceptor { - cfg := newConfig(opts) + cfg := newConfig(opts, "server") tracer := cfg.TracerProvider.Tracer( instrumentationName, trace.WithInstrumentationVersion(Version()), diff --git a/instrumentation/google.golang.org/grpc/otelgrpc/metadata_supplier.go b/instrumentation/google.golang.org/grpc/otelgrpc/metadata_supplier.go index d91c6df2370..f585fb6ae0c 100644 --- a/instrumentation/google.golang.org/grpc/otelgrpc/metadata_supplier.go +++ b/instrumentation/google.golang.org/grpc/otelgrpc/metadata_supplier.go @@ -56,7 +56,7 @@ func (s *metadataSupplier) Keys() []string { // requests. // Deprecated: Unnecessary public func. func Inject(ctx context.Context, md *metadata.MD, opts ...Option) { - c := newConfig(opts) + c := newConfig(opts, "") c.Propagators.Inject(ctx, &metadataSupplier{ metadata: md, }) @@ -78,7 +78,7 @@ func inject(ctx context.Context, propagators propagation.TextMapPropagator) cont // This function is meant to be used on incoming requests. // Deprecated: Unnecessary public func. func Extract(ctx context.Context, md *metadata.MD, opts ...Option) (baggage.Baggage, trace.SpanContext) { - c := newConfig(opts) + c := newConfig(opts, "") ctx = c.Propagators.Extract(ctx, &metadataSupplier{ metadata: md, }) diff --git a/instrumentation/google.golang.org/grpc/otelgrpc/stats_handler.go b/instrumentation/google.golang.org/grpc/otelgrpc/stats_handler.go index bcc84e965b8..e051aa34719 100644 --- a/instrumentation/google.golang.org/grpc/otelgrpc/stats_handler.go +++ b/instrumentation/google.golang.org/grpc/otelgrpc/stats_handler.go @@ -23,7 +23,6 @@ import ( "google.golang.org/grpc/status" "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc/internal" - "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/codes" "go.opentelemetry.io/otel/metric" @@ -39,71 +38,14 @@ type gRPCContext struct { metricAttrs []attribute.KeyValue } -type handler struct { - tracer trace.Tracer - meter metric.Meter - rpcDuration metric.Float64Histogram - rpcRequestSize metric.Int64Histogram - rpcResponseSize metric.Int64Histogram - rpcRequestsPerRPC metric.Int64Histogram - rpcResponsesPerRPC metric.Int64Histogram -} - type serverHandler struct { *config - r *handler } // NewServerHandler creates a stats.Handler for gRPC server. func NewServerHandler(opts ...Option) stats.Handler { h := &serverHandler{ - config: newConfig(opts), - r: &handler{}, - } - - h.r.tracer = h.config.TracerProvider.Tracer( - instrumentationName, - trace.WithInstrumentationVersion(SemVersion()), - ) - h.r.meter = h.config.MeterProvider.Meter( - instrumentationName, - metric.WithInstrumentationVersion(Version()), - metric.WithSchemaURL(semconv.SchemaURL), - ) - var err error - h.r.rpcDuration, err = h.r.meter.Float64Histogram("rpc.server.duration", - metric.WithDescription("Measures the duration of inbound RPC."), - metric.WithUnit("ms")) - if err != nil { - otel.Handle(err) - } - - h.r.rpcRequestSize, err = h.r.meter.Int64Histogram("rpc.server.request.size", - metric.WithDescription("Measures size of RPC request messages (uncompressed)."), - metric.WithUnit("By")) - if err != nil { - otel.Handle(err) - } - - h.r.rpcResponseSize, err = h.r.meter.Int64Histogram("rpc.server.response.size", - metric.WithDescription("Measures size of RPC response messages (uncompressed)."), - metric.WithUnit("By")) - if err != nil { - otel.Handle(err) - } - - h.r.rpcRequestsPerRPC, err = h.r.meter.Int64Histogram("rpc.server.requests_per_rpc", - metric.WithDescription("Measures the number of messages received per RPC. Should be 1 for all non-streaming RPCs."), - metric.WithUnit("{count}")) - if err != nil { - otel.Handle(err) - } - - h.r.rpcResponsesPerRPC, err = h.r.meter.Int64Histogram("rpc.server.responses_per_rpc", - metric.WithDescription("Measures the number of messages received per RPC. Should be 1 for all non-streaming RPCs."), - metric.WithUnit("{count}")) - if err != nil { - otel.Handle(err) + config: newConfig(opts, "server"), } return h @@ -128,7 +70,7 @@ func (h *serverHandler) TagRPC(ctx context.Context, info *stats.RPCTagInfo) cont name, attrs := internal.ParseFullMethod(info.FullMethodName) attrs = append(attrs, RPCSystemGRPC) - ctx, _ = h.r.tracer.Start( + ctx, _ = h.tracer.Start( trace.ContextWithRemoteSpanContext(ctx, trace.SpanContextFromContext(ctx)), name, trace.WithSpanKind(trace.SpanKindServer), @@ -143,65 +85,17 @@ func (h *serverHandler) TagRPC(ctx context.Context, info *stats.RPCTagInfo) cont // HandleRPC processes the RPC stats. func (h *serverHandler) HandleRPC(ctx context.Context, rs stats.RPCStats) { - h.r.handleRPC(ctx, rs) + h.handleRPC(ctx, rs) } type clientHandler struct { *config - r *handler } // NewClientHandler creates a stats.Handler for gRPC client. func NewClientHandler(opts ...Option) stats.Handler { h := &clientHandler{ - config: newConfig(opts), - r: &handler{}, - } - - h.r.tracer = h.config.TracerProvider.Tracer( - instrumentationName, - trace.WithInstrumentationVersion(SemVersion()), - ) - - h.r.meter = h.config.MeterProvider.Meter( - instrumentationName, - metric.WithInstrumentationVersion(Version()), - metric.WithSchemaURL(semconv.SchemaURL), - ) - var err error - h.r.rpcDuration, err = h.r.meter.Float64Histogram("rpc.client.duration", - metric.WithDescription("Measures the duration of inbound RPC."), - metric.WithUnit("ms")) - if err != nil { - otel.Handle(err) - } - - h.r.rpcRequestSize, err = h.r.meter.Int64Histogram("rpc.client.request.size", - metric.WithDescription("Measures size of RPC request messages (uncompressed)."), - metric.WithUnit("By")) - if err != nil { - otel.Handle(err) - } - - h.r.rpcResponseSize, err = h.r.meter.Int64Histogram("rpc.client.response.size", - metric.WithDescription("Measures size of RPC response messages (uncompressed)."), - metric.WithUnit("By")) - if err != nil { - otel.Handle(err) - } - - h.r.rpcRequestsPerRPC, err = h.r.meter.Int64Histogram("rpc.client.requests_per_rpc", - metric.WithDescription("Measures the number of messages received per RPC. Should be 1 for all non-streaming RPCs."), - metric.WithUnit("{count}")) - if err != nil { - otel.Handle(err) - } - - h.r.rpcResponsesPerRPC, err = h.r.meter.Int64Histogram("rpc.client.responses_per_rpc", - metric.WithDescription("Measures the number of messages received per RPC. Should be 1 for all non-streaming RPCs."), - metric.WithUnit("{count}")) - if err != nil { - otel.Handle(err) + config: newConfig(opts, "client"), } return h @@ -211,7 +105,7 @@ func NewClientHandler(opts ...Option) stats.Handler { func (h *clientHandler) TagRPC(ctx context.Context, info *stats.RPCTagInfo) context.Context { name, attrs := internal.ParseFullMethod(info.FullMethodName) attrs = append(attrs, RPCSystemGRPC) - ctx, _ = h.r.tracer.Start( + ctx, _ = h.tracer.Start( ctx, name, trace.WithSpanKind(trace.SpanKindClient), @@ -227,7 +121,7 @@ func (h *clientHandler) TagRPC(ctx context.Context, info *stats.RPCTagInfo) cont // HandleRPC processes the RPC stats. func (h *clientHandler) HandleRPC(ctx context.Context, rs stats.RPCStats) { - h.r.handleRPC(ctx, rs) + h.handleRPC(ctx, rs) } // TagConn can attach some information to the given context. @@ -243,7 +137,7 @@ func (h *clientHandler) HandleConn(context.Context, stats.ConnStats) { // no-op } -func (r *handler) handleRPC(ctx context.Context, rs stats.RPCStats) { +func (c *config) handleRPC(ctx context.Context, rs stats.RPCStats) { span := trace.SpanFromContext(ctx) gctx, _ := ctx.Value(gRPCContextKey{}).(*gRPCContext) var messageId int64 @@ -255,7 +149,7 @@ func (r *handler) handleRPC(ctx context.Context, rs stats.RPCStats) { case *stats.InPayload: if gctx != nil { messageId = atomic.AddInt64(&gctx.messagesReceived, 1) - r.rpcRequestSize.Record(ctx, int64(rs.Length), metric.WithAttributes(metricAttrs...)) + c.rpcRequestSize.Record(ctx, int64(rs.Length), metric.WithAttributes(metricAttrs...)) } span.AddEvent("message", @@ -269,7 +163,7 @@ func (r *handler) handleRPC(ctx context.Context, rs stats.RPCStats) { case *stats.OutPayload: if gctx != nil { messageId = atomic.AddInt64(&gctx.messagesSent, 1) - r.rpcResponseSize.Record(ctx, int64(rs.Length), metric.WithAttributes(metricAttrs...)) + c.rpcResponseSize.Record(ctx, int64(rs.Length), metric.WithAttributes(metricAttrs...)) } span.AddEvent("message", @@ -295,9 +189,9 @@ func (r *handler) handleRPC(ctx context.Context, rs stats.RPCStats) { span.End() metricAttrs = append(metricAttrs, rpcStatusAttr) - r.rpcDuration.Record(context.TODO(), float64(rs.EndTime.Sub(rs.BeginTime)), metric.WithAttributes(metricAttrs...)) - r.rpcRequestsPerRPC.Record(context.TODO(), gctx.messagesReceived, metric.WithAttributes(metricAttrs...)) - r.rpcResponsesPerRPC.Record(context.TODO(), gctx.messagesSent, metric.WithAttributes(metricAttrs...)) + c.rpcDuration.Record(context.TODO(), float64(rs.EndTime.Sub(rs.BeginTime)), metric.WithAttributes(metricAttrs...)) + c.rpcRequestsPerRPC.Record(context.TODO(), gctx.messagesReceived, metric.WithAttributes(metricAttrs...)) + c.rpcResponsesPerRPC.Record(context.TODO(), gctx.messagesSent, metric.WithAttributes(metricAttrs...)) default: return From 01d8b159b1cd8e1223e789bd14229c76b7bfcd71 Mon Sep 17 00:00:00 2001 From: Ziqi Zhao Date: Fri, 3 Nov 2023 08:55:00 +0800 Subject: [PATCH 6/9] fix interceptor unit test Signed-off-by: Ziqi Zhao --- .../google.golang.org/grpc/otelgrpc/test/grpc_test.go | 4 ++-- .../google.golang.org/grpc/otelgrpc/test/interceptor_test.go | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/instrumentation/google.golang.org/grpc/otelgrpc/test/grpc_test.go b/instrumentation/google.golang.org/grpc/otelgrpc/test/grpc_test.go index 7f8ba0d41ae..620e5c9f1aa 100644 --- a/instrumentation/google.golang.org/grpc/otelgrpc/test/grpc_test.go +++ b/instrumentation/google.golang.org/grpc/otelgrpc/test/grpc_test.go @@ -614,9 +614,9 @@ func checkUnaryServerRecords(t *testing.T, reader metric.Reader) { Name: "rpc.server.duration", Description: "Measures the duration of inbound RPC.", Unit: "ms", - Data: metricdata.Histogram[int64]{ + Data: metricdata.Histogram[float64]{ Temporality: metricdata.CumulativeTemporality, - DataPoints: []metricdata.HistogramDataPoint[int64]{ + DataPoints: []metricdata.HistogramDataPoint[float64]{ { Attributes: attribute.NewSet( semconv.RPCMethod("EmptyCall"), diff --git a/instrumentation/google.golang.org/grpc/otelgrpc/test/interceptor_test.go b/instrumentation/google.golang.org/grpc/otelgrpc/test/interceptor_test.go index 3c9bf04d474..03e1876ceb2 100644 --- a/instrumentation/google.golang.org/grpc/otelgrpc/test/interceptor_test.go +++ b/instrumentation/google.golang.org/grpc/otelgrpc/test/interceptor_test.go @@ -1095,9 +1095,9 @@ func assertServerMetrics(t *testing.T, reader metric.Reader, serviceName, name s Name: "rpc.server.duration", Description: "Measures the duration of inbound RPC.", Unit: "ms", - Data: metricdata.Histogram[int64]{ + Data: metricdata.Histogram[float64]{ Temporality: metricdata.CumulativeTemporality, - DataPoints: []metricdata.HistogramDataPoint[int64]{ + DataPoints: []metricdata.HistogramDataPoint[float64]{ { Attributes: attribute.NewSet( semconv.RPCMethod(name), From 686a7611337a5c88fc35567713bc9e77400cd85b Mon Sep 17 00:00:00 2001 From: Ziqi Zhao Date: Sat, 4 Nov 2023 08:27:52 +0800 Subject: [PATCH 7/9] add withcancel context Signed-off-by: Ziqi Zhao --- .../grpc/otelgrpc/stats_handler.go | 43 ++++++++++++++++--- 1 file changed, 38 insertions(+), 5 deletions(-) diff --git a/instrumentation/google.golang.org/grpc/otelgrpc/stats_handler.go b/instrumentation/google.golang.org/grpc/otelgrpc/stats_handler.go index e051aa34719..f3a78a8d861 100644 --- a/instrumentation/google.golang.org/grpc/otelgrpc/stats_handler.go +++ b/instrumentation/google.golang.org/grpc/otelgrpc/stats_handler.go @@ -17,6 +17,7 @@ package otelgrpc // import "go.opentelemetry.io/contrib/instrumentation/google.g import ( "context" "sync/atomic" + "time" grpc_codes "google.golang.org/grpc/codes" "google.golang.org/grpc/stats" @@ -143,13 +144,14 @@ func (c *config) handleRPC(ctx context.Context, rs stats.RPCStats) { var messageId int64 metricAttrs := make([]attribute.KeyValue, 0, len(gctx.metricAttrs)+1) metricAttrs = append(metricAttrs, gctx.metricAttrs...) + wctx := withoutCancel(ctx) switch rs := rs.(type) { case *stats.Begin: case *stats.InPayload: if gctx != nil { messageId = atomic.AddInt64(&gctx.messagesReceived, 1) - c.rpcRequestSize.Record(ctx, int64(rs.Length), metric.WithAttributes(metricAttrs...)) + c.rpcRequestSize.Record(wctx, int64(rs.Length), metric.WithAttributes(metricAttrs...)) } span.AddEvent("message", @@ -163,7 +165,7 @@ func (c *config) handleRPC(ctx context.Context, rs stats.RPCStats) { case *stats.OutPayload: if gctx != nil { messageId = atomic.AddInt64(&gctx.messagesSent, 1) - c.rpcResponseSize.Record(ctx, int64(rs.Length), metric.WithAttributes(metricAttrs...)) + c.rpcResponseSize.Record(wctx, int64(rs.Length), metric.WithAttributes(metricAttrs...)) } span.AddEvent("message", @@ -189,11 +191,42 @@ func (c *config) handleRPC(ctx context.Context, rs stats.RPCStats) { span.End() metricAttrs = append(metricAttrs, rpcStatusAttr) - c.rpcDuration.Record(context.TODO(), float64(rs.EndTime.Sub(rs.BeginTime)), metric.WithAttributes(metricAttrs...)) - c.rpcRequestsPerRPC.Record(context.TODO(), gctx.messagesReceived, metric.WithAttributes(metricAttrs...)) - c.rpcResponsesPerRPC.Record(context.TODO(), gctx.messagesSent, metric.WithAttributes(metricAttrs...)) + c.rpcDuration.Record(wctx, float64(rs.EndTime.Sub(rs.BeginTime)), metric.WithAttributes(metricAttrs...)) + c.rpcRequestsPerRPC.Record(wctx, gctx.messagesReceived, metric.WithAttributes(metricAttrs...)) + c.rpcResponsesPerRPC.Record(wctx, gctx.messagesSent, metric.WithAttributes(metricAttrs...)) default: return } } + +func withoutCancel(parent context.Context) context.Context { + if parent == nil { + panic("cannot create context from nil parent") + } + return withoutCancelCtx{parent} +} + +type withoutCancelCtx struct { + c context.Context +} + +func (withoutCancelCtx) Deadline() (deadline time.Time, ok bool) { + return +} + +func (withoutCancelCtx) Done() <-chan struct{} { + return nil +} + +func (withoutCancelCtx) Err() error { + return nil +} + +func (w withoutCancelCtx) Value(key any) any { + return w.c.Value(key) +} + +func (w withoutCancelCtx) String() string { + return "withoutCancel" +} From 74ca71ae748f99e01dd4e9462179b8aa2e148a81 Mon Sep 17 00:00:00 2001 From: Ziqi Zhao Date: Sat, 4 Nov 2023 08:41:08 +0800 Subject: [PATCH 8/9] fix for failed lint Signed-off-by: Ziqi Zhao --- .../grpc/otelgrpc/test/grpc_stats_handler_test.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/instrumentation/google.golang.org/grpc/otelgrpc/test/grpc_stats_handler_test.go b/instrumentation/google.golang.org/grpc/otelgrpc/test/grpc_stats_handler_test.go index 933969be81a..eccb63b4082 100644 --- a/instrumentation/google.golang.org/grpc/otelgrpc/test/grpc_stats_handler_test.go +++ b/instrumentation/google.golang.org/grpc/otelgrpc/test/grpc_stats_handler_test.go @@ -949,7 +949,6 @@ func checkClientMetrics(t *testing.T, reader metric.Reader) { }, } metricdatatest.AssertEqual(t, expectedScopeMetric, rm.ScopeMetrics[0], metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreValue()) - } func checkServerMetrics(t *testing.T, reader metric.Reader) { @@ -1304,5 +1303,4 @@ func checkServerMetrics(t *testing.T, reader metric.Reader) { } metricdatatest.AssertEqual(t, expectedScopeMetric, rm.ScopeMetrics[0], metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreValue()) - } From 977d89fd33ef3a68f679c6eebf203ce1f736d6bf Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Robert=20Paj=C4=85k?= Date: Mon, 6 Nov 2023 09:02:54 +0100 Subject: [PATCH 9/9] Update instrumentation/google.golang.org/grpc/otelgrpc/stats_handler.go --- instrumentation/google.golang.org/grpc/otelgrpc/stats_handler.go | 1 - 1 file changed, 1 deletion(-) diff --git a/instrumentation/google.golang.org/grpc/otelgrpc/stats_handler.go b/instrumentation/google.golang.org/grpc/otelgrpc/stats_handler.go index f3a78a8d861..212e257ff72 100644 --- a/instrumentation/google.golang.org/grpc/otelgrpc/stats_handler.go +++ b/instrumentation/google.golang.org/grpc/otelgrpc/stats_handler.go @@ -57,7 +57,6 @@ func (h *serverHandler) TagConn(ctx context.Context, info *stats.ConnTagInfo) co span := trace.SpanFromContext(ctx) attrs := peerAttr(peerFromCtx(ctx)) span.SetAttributes(attrs...) - return ctx }