From 91dc29dea8eedd19636bb6b4b944ffbeeb2674a5 Mon Sep 17 00:00:00 2001 From: Ziqi Zhao Date: Wed, 16 Nov 2022 13:28:06 +0800 Subject: [PATCH] [otelgrpc] refactor otelgrpc to use grpc.StatsHandler Signed-off-by: Ziqi Zhao --- .../grpc/otelgrpc/stats_handler.go | 185 +++++++ .../otelgrpc/test/grpc_stats_handler_test.go | 495 ++++++++++++++++++ .../grpc/otelgrpc/test/stats_handler_test.go | 126 +++++ 3 files changed, 806 insertions(+) create mode 100644 instrumentation/google.golang.org/grpc/otelgrpc/stats_handler.go create mode 100644 instrumentation/google.golang.org/grpc/otelgrpc/test/grpc_stats_handler_test.go create mode 100644 instrumentation/google.golang.org/grpc/otelgrpc/test/stats_handler_test.go diff --git a/instrumentation/google.golang.org/grpc/otelgrpc/stats_handler.go b/instrumentation/google.golang.org/grpc/otelgrpc/stats_handler.go new file mode 100644 index 00000000000..edabb9cfe9a --- /dev/null +++ b/instrumentation/google.golang.org/grpc/otelgrpc/stats_handler.go @@ -0,0 +1,185 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package otelgrpc // import "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc" + +import ( + "context" + "sync/atomic" + + "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc/internal" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/codes" + semconv "go.opentelemetry.io/otel/semconv/v1.12.0" + "go.opentelemetry.io/otel/trace" + grpc_codes "google.golang.org/grpc/codes" + "google.golang.org/grpc/stats" + "google.golang.org/grpc/status" +) + +type grpcContext struct { + messagesReceived int64 + messagesSent int64 +} + +func NewServerHandler(opts ...Option) stats.Handler { + h := &serverHandler{ + config: newConfig(opts), + } + + h.tracer = h.config.TracerProvider.Tracer( + instrumentationName, + trace.WithInstrumentationVersion(SemVersion()), + ) + + return h +} + +type serverHandler struct { + *config + tracer trace.Tracer +} + +// TagRPC can attach some information to the given context. +func (h *serverHandler) TagRPC(ctx context.Context, info *stats.RPCTagInfo) context.Context { + ctx = extract(ctx, h.config.Propagators) + + attrs := []attribute.KeyValue{RPCSystemGRPC} + name, mAttrs := internal.ParseFullMethod(info.FullMethodName) + attrs = append(attrs, mAttrs...) + ctx, _ = h.tracer.Start( + trace.ContextWithRemoteSpanContext(ctx, trace.SpanContextFromContext(ctx)), + name, + trace.WithSpanKind(trace.SpanKindServer), + trace.WithAttributes(attrs...), + ) + + gctx := grpcContext{} + + return context.WithValue(ctx, grpcContextKey, &gctx) +} + +// HandleRPC processes the RPC stats. +func (h *serverHandler) HandleRPC(ctx context.Context, rs stats.RPCStats) { + handleRPC(ctx, rs) +} + +// TagConn can attach some information to the given context. +func (h *serverHandler) TagConn(ctx context.Context, info *stats.ConnTagInfo) context.Context { + span := trace.SpanFromContext(ctx) + attrs := peerAttr(peerFromCtx(ctx)) + span.SetAttributes(attrs...) + return ctx +} + +// HandleConn processes the Conn stats. +func (h *serverHandler) HandleConn(ctx context.Context, info stats.ConnStats) { +} + +func NewClientHandler(opts ...Option) stats.Handler { + h := &clientHandler{ + config: newConfig(opts), + } + + h.tracer = h.config.TracerProvider.Tracer( + instrumentationName, + trace.WithInstrumentationVersion(SemVersion()), + ) + + return h +} + +type clientHandler struct { + *config + tracer trace.Tracer +} + +const grpcContextKey = "otel-trace-bin" + +// TagRPC can attach some information to the given context. +func (h *clientHandler) TagRPC(ctx context.Context, info *stats.RPCTagInfo) context.Context { + attrs := []attribute.KeyValue{RPCSystemGRPC} + name, mAttrs := internal.ParseFullMethod(info.FullMethodName) + attrs = append(attrs, mAttrs...) + ctx, _ = h.tracer.Start( + ctx, + name, + trace.WithSpanKind(trace.SpanKindClient), + trace.WithAttributes(attrs...), + ) + + gctx := grpcContext{} + + return inject(context.WithValue(ctx, grpcContextKey, &gctx), h.config.Propagators) +} + +// HandleRPC processes the RPC stats. +func (h *clientHandler) HandleRPC(ctx context.Context, rs stats.RPCStats) { + handleRPC(ctx, rs) +} + +// TagConn can attach some information to the given context. +func (h *clientHandler) TagConn(ctx context.Context, cti *stats.ConnTagInfo) context.Context { + span := trace.SpanFromContext(ctx) + attrs := peerAttr(cti.RemoteAddr.String()) + span.SetAttributes(attrs...) + return ctx +} + +// HandleConn processes the Conn stats. +func (h *clientHandler) HandleConn(context.Context, stats.ConnStats) { + // no-op +} + +func handleRPC(ctx context.Context, rs stats.RPCStats) { + span := trace.SpanFromContext(ctx) + gctx, _ := ctx.Value(grpcContextKey).(*grpcContext) + var messageId int64 = 0 + + switch rs := rs.(type) { + case *stats.Begin: + case *stats.InPayload: + if gctx != nil { + messageId = atomic.AddInt64(&gctx.messagesReceived, 1) + } + span.AddEvent("message", + trace.WithAttributes( + semconv.MessageTypeReceived, + semconv.MessageIDKey.Int64(messageId), + ), + ) + case *stats.OutPayload: + if gctx != nil { + messageId = atomic.AddInt64(&gctx.messagesSent, 1) + } + + span.AddEvent("message", + trace.WithAttributes( + semconv.MessageTypeSent, + semconv.MessageIDKey.Int64(messageId), + ), + ) + case *stats.End: + if rs.Error != nil { + s, _ := status.FromError(rs.Error) + span.SetStatus(codes.Error, s.Message()) + span.SetAttributes(statusCodeAttr(s.Code())) + } else { + span.SetAttributes(statusCodeAttr(grpc_codes.OK)) + } + span.End() + default: + return + } +} diff --git a/instrumentation/google.golang.org/grpc/otelgrpc/test/grpc_stats_handler_test.go b/instrumentation/google.golang.org/grpc/otelgrpc/test/grpc_stats_handler_test.go new file mode 100644 index 00000000000..e6721df295a --- /dev/null +++ b/instrumentation/google.golang.org/grpc/otelgrpc/test/grpc_stats_handler_test.go @@ -0,0 +1,495 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package test + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "google.golang.org/grpc" + "google.golang.org/grpc/codes" + + "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/sdk/trace" + "go.opentelemetry.io/otel/sdk/trace/tracetest" + semconv "go.opentelemetry.io/otel/semconv/v1.17.0" +) + +func TestStatsHandler(t *testing.T) { + clientSR := tracetest.NewSpanRecorder() + clientTP := trace.NewTracerProvider(trace.WithSpanProcessor(clientSR)) + + serverSR := tracetest.NewSpanRecorder() + serverTP := trace.NewTracerProvider(trace.WithSpanProcessor(serverSR)) + + assert.NoError(t, doCalls( + []grpc.DialOption{ + grpc.WithStatsHandler(otelgrpc.NewClientHandler(otelgrpc.WithTracerProvider(clientTP))), + }, + []grpc.ServerOption{ + grpc.StatsHandler(otelgrpc.NewServerHandler(otelgrpc.WithTracerProvider(serverTP))), + }, + )) + + t.Run("ClientSpans", func(t *testing.T) { + checkClientSpans(t, clientSR.Ended()) + }) + + t.Run("ServerSpans", func(t *testing.T) { + checkServerSpans(t, serverSR.Ended()) + }) +} + +func checkClientSpans(t *testing.T, spans []trace.ReadOnlySpan) { + require.Len(t, spans, 5) + + emptySpan := spans[0] + assert.False(t, emptySpan.EndTime().IsZero()) + assert.Equal(t, "grpc.testing.TestService/EmptyCall", emptySpan.Name()) + assertEvents(t, []trace.Event{ + { + Name: "message", + Attributes: []attribute.KeyValue{ + otelgrpc.RPCMessageIDKey.Int(1), + otelgrpc.RPCMessageTypeKey.String("SENT"), + }, + }, + { + Name: "message", + Attributes: []attribute.KeyValue{ + otelgrpc.RPCMessageIDKey.Int(1), + otelgrpc.RPCMessageTypeKey.String("RECEIVED"), + }, + }, + }, emptySpan.Events()) + assert.ElementsMatch(t, []attribute.KeyValue{ + semconv.RPCMethodKey.String("EmptyCall"), + semconv.RPCServiceKey.String("grpc.testing.TestService"), + otelgrpc.RPCSystemGRPC, + otelgrpc.GRPCStatusCodeKey.Int64(int64(codes.OK)), + }, emptySpan.Attributes()) + + largeSpan := spans[1] + assert.False(t, largeSpan.EndTime().IsZero()) + assert.Equal(t, "grpc.testing.TestService/UnaryCall", largeSpan.Name()) + assertEvents(t, []trace.Event{ + { + Name: "message", + Attributes: []attribute.KeyValue{ + otelgrpc.RPCMessageIDKey.Int(1), + otelgrpc.RPCMessageTypeKey.String("SENT"), + }, + }, + { + Name: "message", + Attributes: []attribute.KeyValue{ + otelgrpc.RPCMessageIDKey.Int(1), + otelgrpc.RPCMessageTypeKey.String("RECEIVED"), + }, + }, + }, largeSpan.Events()) + assert.ElementsMatch(t, []attribute.KeyValue{ + semconv.RPCMethodKey.String("UnaryCall"), + semconv.RPCServiceKey.String("grpc.testing.TestService"), + otelgrpc.RPCSystemGRPC, + otelgrpc.GRPCStatusCodeKey.Int64(int64(codes.OK)), + }, largeSpan.Attributes()) + + streamInput := spans[2] + assert.False(t, streamInput.EndTime().IsZero()) + assert.Equal(t, "grpc.testing.TestService/StreamingInputCall", streamInput.Name()) + assertEvents(t, []trace.Event{ + { + Name: "message", + Attributes: []attribute.KeyValue{ + otelgrpc.RPCMessageIDKey.Int(1), + otelgrpc.RPCMessageTypeKey.String("SENT"), + }, + }, + { + Name: "message", + Attributes: []attribute.KeyValue{ + otelgrpc.RPCMessageIDKey.Int(2), + otelgrpc.RPCMessageTypeKey.String("SENT"), + }, + }, + { + Name: "message", + Attributes: []attribute.KeyValue{ + otelgrpc.RPCMessageIDKey.Int(3), + otelgrpc.RPCMessageTypeKey.String("SENT"), + }, + }, + { + Name: "message", + Attributes: []attribute.KeyValue{ + otelgrpc.RPCMessageIDKey.Int(4), + otelgrpc.RPCMessageTypeKey.String("SENT"), + }, + }, + { + Name: "message", + Attributes: []attribute.KeyValue{ + otelgrpc.RPCMessageIDKey.Int(1), + otelgrpc.RPCMessageTypeKey.String("RECEIVED"), + }, + }, + // client does not record an event for the server response. + }, streamInput.Events()) + assert.ElementsMatch(t, []attribute.KeyValue{ + semconv.RPCMethodKey.String("StreamingInputCall"), + semconv.RPCServiceKey.String("grpc.testing.TestService"), + otelgrpc.RPCSystemGRPC, + otelgrpc.GRPCStatusCodeKey.Int64(int64(codes.OK)), + }, streamInput.Attributes()) + + streamOutput := spans[3] + assert.False(t, streamOutput.EndTime().IsZero()) + assert.Equal(t, "grpc.testing.TestService/StreamingOutputCall", streamOutput.Name()) + assertEvents(t, []trace.Event{ + { + Name: "message", + Attributes: []attribute.KeyValue{ + otelgrpc.RPCMessageIDKey.Int(1), + otelgrpc.RPCMessageTypeKey.String("SENT"), + }, + }, + { + Name: "message", + Attributes: []attribute.KeyValue{ + otelgrpc.RPCMessageIDKey.Int(1), + otelgrpc.RPCMessageTypeKey.String("RECEIVED"), + }, + }, + { + Name: "message", + Attributes: []attribute.KeyValue{ + otelgrpc.RPCMessageIDKey.Int(2), + otelgrpc.RPCMessageTypeKey.String("RECEIVED"), + }, + }, + { + Name: "message", + Attributes: []attribute.KeyValue{ + otelgrpc.RPCMessageIDKey.Int(3), + otelgrpc.RPCMessageTypeKey.String("RECEIVED"), + }, + }, + { + Name: "message", + Attributes: []attribute.KeyValue{ + otelgrpc.RPCMessageIDKey.Int(4), + otelgrpc.RPCMessageTypeKey.String("RECEIVED"), + }, + }, + }, streamOutput.Events()) + assert.ElementsMatch(t, []attribute.KeyValue{ + semconv.RPCMethodKey.String("StreamingOutputCall"), + semconv.RPCServiceKey.String("grpc.testing.TestService"), + otelgrpc.RPCSystemGRPC, + otelgrpc.GRPCStatusCodeKey.Int64(int64(codes.OK)), + }, streamOutput.Attributes()) + + pingPong := spans[4] + assert.False(t, pingPong.EndTime().IsZero()) + assert.Equal(t, "grpc.testing.TestService/FullDuplexCall", pingPong.Name()) + assertEvents(t, []trace.Event{ + { + Name: "message", + Attributes: []attribute.KeyValue{ + otelgrpc.RPCMessageIDKey.Int(1), + otelgrpc.RPCMessageTypeKey.String("SENT"), + }, + }, + { + Name: "message", + Attributes: []attribute.KeyValue{ + otelgrpc.RPCMessageIDKey.Int(1), + otelgrpc.RPCMessageTypeKey.String("RECEIVED"), + }, + }, + { + Name: "message", + Attributes: []attribute.KeyValue{ + otelgrpc.RPCMessageIDKey.Int(2), + otelgrpc.RPCMessageTypeKey.String("SENT"), + }, + }, + { + Name: "message", + Attributes: []attribute.KeyValue{ + otelgrpc.RPCMessageIDKey.Int(2), + otelgrpc.RPCMessageTypeKey.String("RECEIVED"), + }, + }, + { + Name: "message", + Attributes: []attribute.KeyValue{ + otelgrpc.RPCMessageIDKey.Int(3), + otelgrpc.RPCMessageTypeKey.String("SENT"), + }, + }, + { + Name: "message", + Attributes: []attribute.KeyValue{ + otelgrpc.RPCMessageIDKey.Int(3), + otelgrpc.RPCMessageTypeKey.String("RECEIVED"), + }, + }, + { + Name: "message", + Attributes: []attribute.KeyValue{ + otelgrpc.RPCMessageIDKey.Int(4), + otelgrpc.RPCMessageTypeKey.String("SENT"), + }, + }, + { + Name: "message", + Attributes: []attribute.KeyValue{ + otelgrpc.RPCMessageIDKey.Int(4), + otelgrpc.RPCMessageTypeKey.String("RECEIVED"), + }, + }, + }, pingPong.Events()) + assert.ElementsMatch(t, []attribute.KeyValue{ + semconv.RPCMethodKey.String("FullDuplexCall"), + semconv.RPCServiceKey.String("grpc.testing.TestService"), + otelgrpc.RPCSystemGRPC, + otelgrpc.GRPCStatusCodeKey.Int64(int64(codes.OK)), + }, pingPong.Attributes()) +} + +func checkServerSpans(t *testing.T, spans []trace.ReadOnlySpan) { + require.Len(t, spans, 5) + + emptySpan := spans[0] + assert.False(t, emptySpan.EndTime().IsZero()) + assert.Equal(t, "grpc.testing.TestService/EmptyCall", emptySpan.Name()) + assertEvents(t, []trace.Event{ + { + Name: "message", + Attributes: []attribute.KeyValue{ + otelgrpc.RPCMessageIDKey.Int(1), + otelgrpc.RPCMessageTypeKey.String("RECEIVED"), + }, + }, + { + Name: "message", + Attributes: []attribute.KeyValue{ + otelgrpc.RPCMessageIDKey.Int(1), + otelgrpc.RPCMessageTypeKey.String("SENT"), + }, + }, + }, emptySpan.Events()) + assert.ElementsMatch(t, []attribute.KeyValue{ + semconv.RPCMethodKey.String("EmptyCall"), + semconv.RPCServiceKey.String("grpc.testing.TestService"), + otelgrpc.RPCSystemGRPC, + otelgrpc.GRPCStatusCodeKey.Int64(int64(codes.OK)), + }, emptySpan.Attributes()) + + largeSpan := spans[1] + assert.False(t, largeSpan.EndTime().IsZero()) + assert.Equal(t, "grpc.testing.TestService/UnaryCall", largeSpan.Name()) + assertEvents(t, []trace.Event{ + { + Name: "message", + Attributes: []attribute.KeyValue{ + otelgrpc.RPCMessageTypeKey.String("RECEIVED"), + otelgrpc.RPCMessageIDKey.Int(1), + }, + }, + { + Name: "message", + Attributes: []attribute.KeyValue{ + otelgrpc.RPCMessageTypeKey.String("SENT"), + otelgrpc.RPCMessageIDKey.Int(1), + }, + }, + }, largeSpan.Events()) + assert.ElementsMatch(t, []attribute.KeyValue{ + semconv.RPCMethodKey.String("UnaryCall"), + semconv.RPCServiceKey.String("grpc.testing.TestService"), + otelgrpc.RPCSystemGRPC, + otelgrpc.GRPCStatusCodeKey.Int64(int64(codes.OK)), + }, largeSpan.Attributes()) + + streamInput := spans[2] + assert.False(t, streamInput.EndTime().IsZero()) + assert.Equal(t, "grpc.testing.TestService/StreamingInputCall", streamInput.Name()) + assertEvents(t, []trace.Event{ + { + Name: "message", + Attributes: []attribute.KeyValue{ + otelgrpc.RPCMessageIDKey.Int(1), + otelgrpc.RPCMessageTypeKey.String("RECEIVED"), + }, + }, + { + Name: "message", + Attributes: []attribute.KeyValue{ + otelgrpc.RPCMessageIDKey.Int(2), + otelgrpc.RPCMessageTypeKey.String("RECEIVED"), + }, + }, + { + Name: "message", + Attributes: []attribute.KeyValue{ + otelgrpc.RPCMessageIDKey.Int(3), + otelgrpc.RPCMessageTypeKey.String("RECEIVED"), + }, + }, + { + Name: "message", + Attributes: []attribute.KeyValue{ + otelgrpc.RPCMessageIDKey.Int(4), + otelgrpc.RPCMessageTypeKey.String("RECEIVED"), + }, + }, + { + Name: "message", + Attributes: []attribute.KeyValue{ + otelgrpc.RPCMessageIDKey.Int(1), + otelgrpc.RPCMessageTypeKey.String("SENT"), + }, + }, + // client does not record an event for the server response. + }, streamInput.Events()) + assert.ElementsMatch(t, []attribute.KeyValue{ + semconv.RPCMethodKey.String("StreamingInputCall"), + semconv.RPCServiceKey.String("grpc.testing.TestService"), + otelgrpc.RPCSystemGRPC, + otelgrpc.GRPCStatusCodeKey.Int64(int64(codes.OK)), + }, streamInput.Attributes()) + + streamOutput := spans[3] + assert.False(t, streamOutput.EndTime().IsZero()) + assert.Equal(t, "grpc.testing.TestService/StreamingOutputCall", streamOutput.Name()) + assertEvents(t, []trace.Event{ + { + Name: "message", + Attributes: []attribute.KeyValue{ + otelgrpc.RPCMessageIDKey.Int(1), + otelgrpc.RPCMessageTypeKey.String("RECEIVED"), + }, + }, + { + Name: "message", + Attributes: []attribute.KeyValue{ + otelgrpc.RPCMessageIDKey.Int(1), + otelgrpc.RPCMessageTypeKey.String("SENT"), + }, + }, + { + Name: "message", + Attributes: []attribute.KeyValue{ + otelgrpc.RPCMessageIDKey.Int(2), + otelgrpc.RPCMessageTypeKey.String("SENT"), + }, + }, + { + Name: "message", + Attributes: []attribute.KeyValue{ + otelgrpc.RPCMessageIDKey.Int(3), + otelgrpc.RPCMessageTypeKey.String("SENT"), + }, + }, + { + Name: "message", + Attributes: []attribute.KeyValue{ + otelgrpc.RPCMessageIDKey.Int(4), + otelgrpc.RPCMessageTypeKey.String("SENT"), + }, + }, + }, streamOutput.Events()) + assert.ElementsMatch(t, []attribute.KeyValue{ + semconv.RPCMethodKey.String("StreamingOutputCall"), + semconv.RPCServiceKey.String("grpc.testing.TestService"), + otelgrpc.RPCSystemGRPC, + otelgrpc.GRPCStatusCodeKey.Int64(int64(codes.OK)), + }, streamOutput.Attributes()) + + pingPong := spans[4] + assert.False(t, pingPong.EndTime().IsZero()) + assert.Equal(t, "grpc.testing.TestService/FullDuplexCall", pingPong.Name()) + + assertEvents(t, []trace.Event{ + { + Name: "message", + Attributes: []attribute.KeyValue{ + otelgrpc.RPCMessageIDKey.Int(1), + otelgrpc.RPCMessageTypeKey.String("RECEIVED"), + }, + }, + { + Name: "message", + Attributes: []attribute.KeyValue{ + otelgrpc.RPCMessageIDKey.Int(1), + otelgrpc.RPCMessageTypeKey.String("SENT"), + }, + }, + { + Name: "message", + Attributes: []attribute.KeyValue{ + otelgrpc.RPCMessageIDKey.Int(2), + otelgrpc.RPCMessageTypeKey.String("RECEIVED"), + }, + }, + { + Name: "message", + Attributes: []attribute.KeyValue{ + otelgrpc.RPCMessageIDKey.Int(2), + otelgrpc.RPCMessageTypeKey.String("SENT"), + }, + }, + { + Name: "message", + Attributes: []attribute.KeyValue{ + otelgrpc.RPCMessageIDKey.Int(3), + otelgrpc.RPCMessageTypeKey.String("RECEIVED"), + }, + }, + { + Name: "message", + Attributes: []attribute.KeyValue{ + otelgrpc.RPCMessageIDKey.Int(3), + otelgrpc.RPCMessageTypeKey.String("SENT"), + }, + }, + { + Name: "message", + Attributes: []attribute.KeyValue{ + otelgrpc.RPCMessageIDKey.Int(4), + otelgrpc.RPCMessageTypeKey.String("RECEIVED"), + }, + }, + { + Name: "message", + Attributes: []attribute.KeyValue{ + otelgrpc.RPCMessageIDKey.Int(4), + otelgrpc.RPCMessageTypeKey.String("SENT"), + }, + }, + }, pingPong.Events()) + + assert.ElementsMatch(t, []attribute.KeyValue{ + semconv.RPCMethodKey.String("FullDuplexCall"), + semconv.RPCServiceKey.String("grpc.testing.TestService"), + otelgrpc.RPCSystemGRPC, + otelgrpc.GRPCStatusCodeKey.Int64(int64(codes.OK)), + }, pingPong.Attributes()) +} diff --git a/instrumentation/google.golang.org/grpc/otelgrpc/test/stats_handler_test.go b/instrumentation/google.golang.org/grpc/otelgrpc/test/stats_handler_test.go new file mode 100644 index 00000000000..e87d618b28b --- /dev/null +++ b/instrumentation/google.golang.org/grpc/otelgrpc/test/stats_handler_test.go @@ -0,0 +1,126 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package test + +import ( + "context" + "net" + "testing" + + "github.com/stretchr/testify/assert" + "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/codes" + "go.opentelemetry.io/otel/sdk/trace" + "go.opentelemetry.io/otel/sdk/trace/tracetest" + semconv "go.opentelemetry.io/otel/semconv/v1.17.0" + "google.golang.org/grpc/stats" +) + +func TestClientStatsHandler(t *testing.T) { + type rpc struct { + tagInfo *stats.RPCTagInfo + connInfo *stats.ConnTagInfo + inPayloads []*stats.InPayload + outPayloads []*stats.OutPayload + end *stats.End + } + + type span struct { + name string + code codes.Code + attrs []attribute.KeyValue + eventsAttr []map[attribute.Key]attribute.Value + } + + sr := tracetest.NewSpanRecorder() + tp := trace.NewTracerProvider(trace.WithSpanProcessor(sr)) + remoteAddr, _ := net.ResolveTCPAddr("tcp", "127.0.0.1:8888") + + tcs := []struct { + rpcs []*rpc + span *span + }{ + { + rpcs: []*rpc{ + { + tagInfo: &stats.RPCTagInfo{ + FullMethodName: "/github.com.serviceName/bar", + }, + connInfo: &stats.ConnTagInfo{ + RemoteAddr: remoteAddr, + }, + inPayloads: []*stats.InPayload{ + {Length: 10}, + }, + outPayloads: []*stats.OutPayload{ + + {Length: 10}, + }, + end: &stats.End{Error: nil}, + }, + }, + span: &span{ + name: "github.com.serviceName/bar", + attrs: []attribute.KeyValue{ + semconv.RPCSystemKey.String("grpc"), + semconv.RPCServiceKey.String("github.com.serviceName"), + semconv.RPCMethodKey.String("bar"), + semconv.NetSockPeerAddrKey.String("127.0.0.1"), + semconv.NetSockPeerPortKey.Int(8888), + otelgrpc.GRPCStatusCodeKey.Int64(0), + }, + eventsAttr: []map[attribute.Key]attribute.Value{ + { + otelgrpc.RPCMessageTypeKey: attribute.StringValue("SENT"), + otelgrpc.RPCMessageIDKey: attribute.IntValue(1), + }, + { + otelgrpc.RPCMessageTypeKey: attribute.StringValue("RECEIVED"), + otelgrpc.RPCMessageIDKey: attribute.IntValue(1), + }, + }, + }, + }, + } + + for _, tc := range tcs { + h := otelgrpc.NewClientHandler(otelgrpc.WithTracerProvider(tp)) + + for _, rpc := range tc.rpcs { + ctx := h.TagRPC(context.Background(), rpc.tagInfo) + ctx = h.TagConn(ctx, rpc.connInfo) + + for _, out := range rpc.outPayloads { + out.Client = true + h.HandleRPC(ctx, out) + } + for _, in := range rpc.inPayloads { + in.Client = true + h.HandleRPC(ctx, in) + } + rpc.end.Client = true + h.HandleRPC(ctx, rpc.end) + } + + span, ok := getSpanFromRecorder(sr, tc.span.name) + if !assert.True(t, ok, "missing span %q", tc.span.name) { + continue + } + assert.Equal(t, tc.span.code, span.Status().Code) + assert.ElementsMatch(t, tc.span.attrs, span.Attributes()) + assert.Equal(t, tc.span.eventsAttr, eventAttrMap(span.Events())) + } +}