From ad22f064b8a64d0977a9d538fe22feb134a611f4 Mon Sep 17 00:00:00 2001 From: Ziqi Zhao Date: Fri, 3 Nov 2023 08:46:19 +0800 Subject: [PATCH] move all metrics into config Signed-off-by: Ziqi Zhao --- .../google.golang.org/grpc/otelgrpc/config.go | 48 ++++++- .../grpc/otelgrpc/interceptor.go | 10 +- .../grpc/otelgrpc/metadata_supplier.go | 4 +- .../grpc/otelgrpc/stats_handler.go | 130 ++---------------- .../grpc/otelgrpc/test/grpc_test.go | 4 +- .../grpc/otelgrpc/test/interceptor_test.go | 6 +- 6 files changed, 68 insertions(+), 134 deletions(-) diff --git a/instrumentation/google.golang.org/grpc/otelgrpc/config.go b/instrumentation/google.golang.org/grpc/otelgrpc/config.go index d9b91a24b17..bad05b6734f 100644 --- a/instrumentation/google.golang.org/grpc/otelgrpc/config.go +++ b/instrumentation/google.golang.org/grpc/otelgrpc/config.go @@ -46,8 +46,14 @@ type config struct { ReceivedEvent bool SentEvent bool - meter metric.Meter - rpcServerDuration metric.Int64Histogram + tracer trace.Tracer + meter metric.Meter + + rpcDuration metric.Float64Histogram + rpcRequestSize metric.Int64Histogram + rpcResponseSize metric.Int64Histogram + rpcRequestsPerRPC metric.Int64Histogram + rpcResponsesPerRPC metric.Int64Histogram } // Option applies an option value for a config. @@ -56,7 +62,7 @@ type Option interface { } // newConfig returns a config configured with all the passed Options. -func newConfig(opts []Option) *config { +func newConfig(opts []Option, role string) *config { c := &config{ Propagators: otel.GetTextMapPropagator(), TracerProvider: otel.GetTracerProvider(), @@ -66,19 +72,53 @@ func newConfig(opts []Option) *config { o.apply(c) } + c.tracer = c.TracerProvider.Tracer( + instrumentationName, + trace.WithInstrumentationVersion(SemVersion()), + ) + c.meter = c.MeterProvider.Meter( instrumentationName, metric.WithInstrumentationVersion(Version()), metric.WithSchemaURL(semconv.SchemaURL), ) + var err error - c.rpcServerDuration, err = c.meter.Int64Histogram("rpc.server.duration", + c.rpcDuration, err = c.meter.Float64Histogram("rpc."+role+".duration", metric.WithDescription("Measures the duration of inbound RPC."), metric.WithUnit("ms")) if err != nil { otel.Handle(err) } + c.rpcRequestSize, err = c.meter.Int64Histogram("rpc."+role+".request.size", + metric.WithDescription("Measures size of RPC request messages (uncompressed)."), + metric.WithUnit("By")) + if err != nil { + otel.Handle(err) + } + + c.rpcResponseSize, err = c.meter.Int64Histogram("rpc."+role+".response.size", + metric.WithDescription("Measures size of RPC response messages (uncompressed)."), + metric.WithUnit("By")) + if err != nil { + otel.Handle(err) + } + + c.rpcRequestsPerRPC, err = c.meter.Int64Histogram("rpc."+role+".requests_per_rpc", + metric.WithDescription("Measures the number of messages received per RPC. Should be 1 for all non-streaming RPCs."), + metric.WithUnit("{count}")) + if err != nil { + otel.Handle(err) + } + + c.rpcResponsesPerRPC, err = c.meter.Int64Histogram("rpc."+role+".responses_per_rpc", + metric.WithDescription("Measures the number of messages received per RPC. Should be 1 for all non-streaming RPCs."), + metric.WithUnit("{count}")) + if err != nil { + otel.Handle(err) + } + return c } diff --git a/instrumentation/google.golang.org/grpc/otelgrpc/interceptor.go b/instrumentation/google.golang.org/grpc/otelgrpc/interceptor.go index 815eabb907e..031d1f4df68 100644 --- a/instrumentation/google.golang.org/grpc/otelgrpc/interceptor.go +++ b/instrumentation/google.golang.org/grpc/otelgrpc/interceptor.go @@ -61,7 +61,7 @@ var ( // UnaryClientInterceptor returns a grpc.UnaryClientInterceptor suitable // for use in a grpc.Dial call. func UnaryClientInterceptor(opts ...Option) grpc.UnaryClientInterceptor { - cfg := newConfig(opts) + cfg := newConfig(opts, "client") tracer := cfg.TracerProvider.Tracer( instrumentationName, trace.WithInstrumentationVersion(Version()), @@ -255,7 +255,7 @@ func (w *clientStream) sendStreamEvent(eventType streamEventType, err error) { // StreamClientInterceptor returns a grpc.StreamClientInterceptor suitable // for use in a grpc.Dial call. func StreamClientInterceptor(opts ...Option) grpc.StreamClientInterceptor { - cfg := newConfig(opts) + cfg := newConfig(opts, "client") tracer := cfg.TracerProvider.Tracer( instrumentationName, trace.WithInstrumentationVersion(Version()), @@ -325,7 +325,7 @@ func StreamClientInterceptor(opts ...Option) grpc.StreamClientInterceptor { // UnaryServerInterceptor returns a grpc.UnaryServerInterceptor suitable // for use in a grpc.NewServer call. func UnaryServerInterceptor(opts ...Option) grpc.UnaryServerInterceptor { - cfg := newConfig(opts) + cfg := newConfig(opts, "server") tracer := cfg.TracerProvider.Tracer( instrumentationName, trace.WithInstrumentationVersion(Version()), @@ -387,7 +387,7 @@ func UnaryServerInterceptor(opts ...Option) grpc.UnaryServerInterceptor { elapsedTime := time.Since(before).Milliseconds() attr = append(attr, grpcStatusCodeAttr) - cfg.rpcServerDuration.Record(ctx, elapsedTime, metric.WithAttributes(attr...)) + cfg.rpcDuration.Record(ctx, float64(elapsedTime), metric.WithAttributes(attr...)) return resp, err } @@ -446,7 +446,7 @@ func wrapServerStream(ctx context.Context, ss grpc.ServerStream, cfg *config) *s // StreamServerInterceptor returns a grpc.StreamServerInterceptor suitable // for use in a grpc.NewServer call. func StreamServerInterceptor(opts ...Option) grpc.StreamServerInterceptor { - cfg := newConfig(opts) + cfg := newConfig(opts, "server") tracer := cfg.TracerProvider.Tracer( instrumentationName, trace.WithInstrumentationVersion(Version()), diff --git a/instrumentation/google.golang.org/grpc/otelgrpc/metadata_supplier.go b/instrumentation/google.golang.org/grpc/otelgrpc/metadata_supplier.go index d91c6df2370..f585fb6ae0c 100644 --- a/instrumentation/google.golang.org/grpc/otelgrpc/metadata_supplier.go +++ b/instrumentation/google.golang.org/grpc/otelgrpc/metadata_supplier.go @@ -56,7 +56,7 @@ func (s *metadataSupplier) Keys() []string { // requests. // Deprecated: Unnecessary public func. func Inject(ctx context.Context, md *metadata.MD, opts ...Option) { - c := newConfig(opts) + c := newConfig(opts, "") c.Propagators.Inject(ctx, &metadataSupplier{ metadata: md, }) @@ -78,7 +78,7 @@ func inject(ctx context.Context, propagators propagation.TextMapPropagator) cont // This function is meant to be used on incoming requests. // Deprecated: Unnecessary public func. func Extract(ctx context.Context, md *metadata.MD, opts ...Option) (baggage.Baggage, trace.SpanContext) { - c := newConfig(opts) + c := newConfig(opts, "") ctx = c.Propagators.Extract(ctx, &metadataSupplier{ metadata: md, }) diff --git a/instrumentation/google.golang.org/grpc/otelgrpc/stats_handler.go b/instrumentation/google.golang.org/grpc/otelgrpc/stats_handler.go index bcc84e965b8..e051aa34719 100644 --- a/instrumentation/google.golang.org/grpc/otelgrpc/stats_handler.go +++ b/instrumentation/google.golang.org/grpc/otelgrpc/stats_handler.go @@ -23,7 +23,6 @@ import ( "google.golang.org/grpc/status" "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc/internal" - "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/codes" "go.opentelemetry.io/otel/metric" @@ -39,71 +38,14 @@ type gRPCContext struct { metricAttrs []attribute.KeyValue } -type handler struct { - tracer trace.Tracer - meter metric.Meter - rpcDuration metric.Float64Histogram - rpcRequestSize metric.Int64Histogram - rpcResponseSize metric.Int64Histogram - rpcRequestsPerRPC metric.Int64Histogram - rpcResponsesPerRPC metric.Int64Histogram -} - type serverHandler struct { *config - r *handler } // NewServerHandler creates a stats.Handler for gRPC server. func NewServerHandler(opts ...Option) stats.Handler { h := &serverHandler{ - config: newConfig(opts), - r: &handler{}, - } - - h.r.tracer = h.config.TracerProvider.Tracer( - instrumentationName, - trace.WithInstrumentationVersion(SemVersion()), - ) - h.r.meter = h.config.MeterProvider.Meter( - instrumentationName, - metric.WithInstrumentationVersion(Version()), - metric.WithSchemaURL(semconv.SchemaURL), - ) - var err error - h.r.rpcDuration, err = h.r.meter.Float64Histogram("rpc.server.duration", - metric.WithDescription("Measures the duration of inbound RPC."), - metric.WithUnit("ms")) - if err != nil { - otel.Handle(err) - } - - h.r.rpcRequestSize, err = h.r.meter.Int64Histogram("rpc.server.request.size", - metric.WithDescription("Measures size of RPC request messages (uncompressed)."), - metric.WithUnit("By")) - if err != nil { - otel.Handle(err) - } - - h.r.rpcResponseSize, err = h.r.meter.Int64Histogram("rpc.server.response.size", - metric.WithDescription("Measures size of RPC response messages (uncompressed)."), - metric.WithUnit("By")) - if err != nil { - otel.Handle(err) - } - - h.r.rpcRequestsPerRPC, err = h.r.meter.Int64Histogram("rpc.server.requests_per_rpc", - metric.WithDescription("Measures the number of messages received per RPC. Should be 1 for all non-streaming RPCs."), - metric.WithUnit("{count}")) - if err != nil { - otel.Handle(err) - } - - h.r.rpcResponsesPerRPC, err = h.r.meter.Int64Histogram("rpc.server.responses_per_rpc", - metric.WithDescription("Measures the number of messages received per RPC. Should be 1 for all non-streaming RPCs."), - metric.WithUnit("{count}")) - if err != nil { - otel.Handle(err) + config: newConfig(opts, "server"), } return h @@ -128,7 +70,7 @@ func (h *serverHandler) TagRPC(ctx context.Context, info *stats.RPCTagInfo) cont name, attrs := internal.ParseFullMethod(info.FullMethodName) attrs = append(attrs, RPCSystemGRPC) - ctx, _ = h.r.tracer.Start( + ctx, _ = h.tracer.Start( trace.ContextWithRemoteSpanContext(ctx, trace.SpanContextFromContext(ctx)), name, trace.WithSpanKind(trace.SpanKindServer), @@ -143,65 +85,17 @@ func (h *serverHandler) TagRPC(ctx context.Context, info *stats.RPCTagInfo) cont // HandleRPC processes the RPC stats. func (h *serverHandler) HandleRPC(ctx context.Context, rs stats.RPCStats) { - h.r.handleRPC(ctx, rs) + h.handleRPC(ctx, rs) } type clientHandler struct { *config - r *handler } // NewClientHandler creates a stats.Handler for gRPC client. func NewClientHandler(opts ...Option) stats.Handler { h := &clientHandler{ - config: newConfig(opts), - r: &handler{}, - } - - h.r.tracer = h.config.TracerProvider.Tracer( - instrumentationName, - trace.WithInstrumentationVersion(SemVersion()), - ) - - h.r.meter = h.config.MeterProvider.Meter( - instrumentationName, - metric.WithInstrumentationVersion(Version()), - metric.WithSchemaURL(semconv.SchemaURL), - ) - var err error - h.r.rpcDuration, err = h.r.meter.Float64Histogram("rpc.client.duration", - metric.WithDescription("Measures the duration of inbound RPC."), - metric.WithUnit("ms")) - if err != nil { - otel.Handle(err) - } - - h.r.rpcRequestSize, err = h.r.meter.Int64Histogram("rpc.client.request.size", - metric.WithDescription("Measures size of RPC request messages (uncompressed)."), - metric.WithUnit("By")) - if err != nil { - otel.Handle(err) - } - - h.r.rpcResponseSize, err = h.r.meter.Int64Histogram("rpc.client.response.size", - metric.WithDescription("Measures size of RPC response messages (uncompressed)."), - metric.WithUnit("By")) - if err != nil { - otel.Handle(err) - } - - h.r.rpcRequestsPerRPC, err = h.r.meter.Int64Histogram("rpc.client.requests_per_rpc", - metric.WithDescription("Measures the number of messages received per RPC. Should be 1 for all non-streaming RPCs."), - metric.WithUnit("{count}")) - if err != nil { - otel.Handle(err) - } - - h.r.rpcResponsesPerRPC, err = h.r.meter.Int64Histogram("rpc.client.responses_per_rpc", - metric.WithDescription("Measures the number of messages received per RPC. Should be 1 for all non-streaming RPCs."), - metric.WithUnit("{count}")) - if err != nil { - otel.Handle(err) + config: newConfig(opts, "client"), } return h @@ -211,7 +105,7 @@ func NewClientHandler(opts ...Option) stats.Handler { func (h *clientHandler) TagRPC(ctx context.Context, info *stats.RPCTagInfo) context.Context { name, attrs := internal.ParseFullMethod(info.FullMethodName) attrs = append(attrs, RPCSystemGRPC) - ctx, _ = h.r.tracer.Start( + ctx, _ = h.tracer.Start( ctx, name, trace.WithSpanKind(trace.SpanKindClient), @@ -227,7 +121,7 @@ func (h *clientHandler) TagRPC(ctx context.Context, info *stats.RPCTagInfo) cont // HandleRPC processes the RPC stats. func (h *clientHandler) HandleRPC(ctx context.Context, rs stats.RPCStats) { - h.r.handleRPC(ctx, rs) + h.handleRPC(ctx, rs) } // TagConn can attach some information to the given context. @@ -243,7 +137,7 @@ func (h *clientHandler) HandleConn(context.Context, stats.ConnStats) { // no-op } -func (r *handler) handleRPC(ctx context.Context, rs stats.RPCStats) { +func (c *config) handleRPC(ctx context.Context, rs stats.RPCStats) { span := trace.SpanFromContext(ctx) gctx, _ := ctx.Value(gRPCContextKey{}).(*gRPCContext) var messageId int64 @@ -255,7 +149,7 @@ func (r *handler) handleRPC(ctx context.Context, rs stats.RPCStats) { case *stats.InPayload: if gctx != nil { messageId = atomic.AddInt64(&gctx.messagesReceived, 1) - r.rpcRequestSize.Record(ctx, int64(rs.Length), metric.WithAttributes(metricAttrs...)) + c.rpcRequestSize.Record(ctx, int64(rs.Length), metric.WithAttributes(metricAttrs...)) } span.AddEvent("message", @@ -269,7 +163,7 @@ func (r *handler) handleRPC(ctx context.Context, rs stats.RPCStats) { case *stats.OutPayload: if gctx != nil { messageId = atomic.AddInt64(&gctx.messagesSent, 1) - r.rpcResponseSize.Record(ctx, int64(rs.Length), metric.WithAttributes(metricAttrs...)) + c.rpcResponseSize.Record(ctx, int64(rs.Length), metric.WithAttributes(metricAttrs...)) } span.AddEvent("message", @@ -295,9 +189,9 @@ func (r *handler) handleRPC(ctx context.Context, rs stats.RPCStats) { span.End() metricAttrs = append(metricAttrs, rpcStatusAttr) - r.rpcDuration.Record(context.TODO(), float64(rs.EndTime.Sub(rs.BeginTime)), metric.WithAttributes(metricAttrs...)) - r.rpcRequestsPerRPC.Record(context.TODO(), gctx.messagesReceived, metric.WithAttributes(metricAttrs...)) - r.rpcResponsesPerRPC.Record(context.TODO(), gctx.messagesSent, metric.WithAttributes(metricAttrs...)) + c.rpcDuration.Record(context.TODO(), float64(rs.EndTime.Sub(rs.BeginTime)), metric.WithAttributes(metricAttrs...)) + c.rpcRequestsPerRPC.Record(context.TODO(), gctx.messagesReceived, metric.WithAttributes(metricAttrs...)) + c.rpcResponsesPerRPC.Record(context.TODO(), gctx.messagesSent, metric.WithAttributes(metricAttrs...)) default: return diff --git a/instrumentation/google.golang.org/grpc/otelgrpc/test/grpc_test.go b/instrumentation/google.golang.org/grpc/otelgrpc/test/grpc_test.go index 04275c11860..a5800ca0d44 100644 --- a/instrumentation/google.golang.org/grpc/otelgrpc/test/grpc_test.go +++ b/instrumentation/google.golang.org/grpc/otelgrpc/test/grpc_test.go @@ -604,8 +604,8 @@ func checkUnaryServerRecords(t *testing.T, reader metric.Reader) { assert.NoError(t, err) require.Len(t, rm.ScopeMetrics, 1) require.Len(t, rm.ScopeMetrics[0].Metrics, 1) - require.IsType(t, rm.ScopeMetrics[0].Metrics[0].Data, metricdata.Histogram[int64]{}) - data := rm.ScopeMetrics[0].Metrics[0].Data.(metricdata.Histogram[int64]) + require.IsType(t, rm.ScopeMetrics[0].Metrics[0].Data, metricdata.Histogram[float64]{}) + data := rm.ScopeMetrics[0].Metrics[0].Data.(metricdata.Histogram[float64]) for _, dpt := range data.DataPoints { attr := dpt.Attributes.ToSlice() diff --git a/instrumentation/google.golang.org/grpc/otelgrpc/test/interceptor_test.go b/instrumentation/google.golang.org/grpc/otelgrpc/test/interceptor_test.go index d661ec86e5a..1f1580e41bd 100644 --- a/instrumentation/google.golang.org/grpc/otelgrpc/test/interceptor_test.go +++ b/instrumentation/google.golang.org/grpc/otelgrpc/test/interceptor_test.go @@ -54,7 +54,7 @@ func getSpanFromRecorder(sr *tracetest.SpanRecorder, name string) (trace.ReadOnl return nil, false } -func getMetricFromData(data metricdata.Histogram[int64], name string) (*metricdata.HistogramDataPoint[int64], bool) { +func getMetricFromData(data metricdata.Histogram[float64], name string) (*metricdata.HistogramDataPoint[float64], bool) { for _, d := range data.DataPoints { v, ok := d.Attributes.Value(semconv.RPCMethodKey) if !ok { @@ -1101,8 +1101,8 @@ func checkManualReaderRecords(t *testing.T, reader metric.Reader, serviceName, n assert.NoError(t, err) require.Len(t, rm.ScopeMetrics, 1) require.Len(t, rm.ScopeMetrics[0].Metrics, 1) - require.IsType(t, rm.ScopeMetrics[0].Metrics[0].Data, metricdata.Histogram[int64]{}) - data := rm.ScopeMetrics[0].Metrics[0].Data.(metricdata.Histogram[int64]) + require.IsType(t, rm.ScopeMetrics[0].Metrics[0].Data, metricdata.Histogram[float64]{}) + data := rm.ScopeMetrics[0].Metrics[0].Data.(metricdata.Histogram[float64]) dpt, ok := getMetricFromData(data, name) assert.True(t, ok) attr := dpt.Attributes.ToSlice()