diff --git a/instrumentation/google.golang.org/grpc/otelgrpc/test/grpc_stats_handler_test.go b/instrumentation/google.golang.org/grpc/otelgrpc/test/grpc_stats_handler_test.go index eccb63b4082..d84d1e62e6a 100644 --- a/instrumentation/google.golang.org/grpc/otelgrpc/test/grpc_stats_handler_test.go +++ b/instrumentation/google.golang.org/grpc/otelgrpc/test/grpc_stats_handler_test.go @@ -22,6 +22,7 @@ import ( "github.com/stretchr/testify/require" "google.golang.org/grpc" "google.golang.org/grpc/codes" + "google.golang.org/grpc/test/bufconn" "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc" "go.opentelemetry.io/otel/attribute" @@ -46,14 +47,18 @@ func TestStatsHandler(t *testing.T) { serverMetricReader := metric.NewManualReader() serverMP := metric.NewMeterProvider(metric.WithReader(serverMetricReader)) - assert.NoError(t, doCalls( + listener := bufconn.Listen(bufSize) + defer listener.Close() + err := newGrpcTest( + listener, []grpc.DialOption{ grpc.WithStatsHandler(otelgrpc.NewClientHandler(otelgrpc.WithTracerProvider(clientTP), otelgrpc.WithMeterProvider(clientMP))), }, []grpc.ServerOption{ grpc.StatsHandler(otelgrpc.NewServerHandler(otelgrpc.WithTracerProvider(serverTP), otelgrpc.WithMeterProvider(serverMP))), }, - )) + ) + require.NoError(t, err) t.Run("ClientSpans", func(t *testing.T) { checkClientSpans(t, clientSR.Ended()) diff --git a/instrumentation/google.golang.org/grpc/otelgrpc/test/grpc_test.go b/instrumentation/google.golang.org/grpc/otelgrpc/test/grpc_test.go index 620e5c9f1aa..c64f59c764f 100644 --- a/instrumentation/google.golang.org/grpc/otelgrpc/test/grpc_test.go +++ b/instrumentation/google.golang.org/grpc/otelgrpc/test/grpc_test.go @@ -16,7 +16,9 @@ package test import ( "context" + "fmt" "net" + "strconv" "testing" "github.com/stretchr/testify/assert" @@ -26,7 +28,6 @@ import ( "google.golang.org/grpc/credentials/insecure" "google.golang.org/grpc/interop" pb "google.golang.org/grpc/interop/grpc_testing" - "google.golang.org/grpc/test/bufconn" "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc" "go.opentelemetry.io/otel/attribute" @@ -47,42 +48,47 @@ var wantInstrumentationScope = instrumentation.Scope{ const bufSize = 2048 -func doCalls(cOpt []grpc.DialOption, sOpt []grpc.ServerOption) error { - l := bufconn.Listen(bufSize) - defer l.Close() - - s := grpc.NewServer(sOpt...) - pb.RegisterTestServiceServer(s, interop.NewTestServer()) +// newGrpcTest creats a grpc server, starts it, and executes all the calls, closes everything down. +func newGrpcTest(listener net.Listener, cOpt []grpc.DialOption, sOpt []grpc.ServerOption) error { + grpcServer := grpc.NewServer(sOpt...) + pb.RegisterTestServiceServer(grpcServer, interop.NewTestServer()) + errCh := make(chan error) go func() { - if err := s.Serve(l); err != nil { - panic(err) - } + errCh <- grpcServer.Serve(listener) }() - defer s.Stop() - ctx := context.Background() - dial := func(context.Context, string) (net.Conn, error) { return l.Dial() } + + cOpt = append(cOpt, grpc.WithBlock(), grpc.WithTransportCredentials(insecure.NewCredentials())) + + if l, ok := listener.(interface{ Dial() (net.Conn, error) }); ok { + dial := func(context.Context, string) (net.Conn, error) { return l.Dial() } + cOpt = append(cOpt, grpc.WithContextDialer(dial)) + } + conn, err := grpc.DialContext( ctx, - "bufnet", - append([]grpc.DialOption{ - grpc.WithContextDialer(dial), - grpc.WithTransportCredentials(insecure.NewCredentials()), - }, cOpt...)..., + listener.Addr().String(), + cOpt..., ) if err != nil { return err } - defer conn.Close() client := pb.NewTestServiceClient(conn) + doCalls(client) + + conn.Close() + grpcServer.Stop() + + return <-errCh +} + +func doCalls(client pb.TestServiceClient) { interop.DoEmptyUnaryCall(client) interop.DoLargeUnaryCall(client) interop.DoClientStreaming(client) interop.DoServerStreaming(client) interop.DoPingPong(client) - - return nil } func TestInterceptors(t *testing.T) { @@ -100,17 +106,18 @@ func TestInterceptors(t *testing.T) { serverStreamSR := tracetest.NewSpanRecorder() serverStreamTP := trace.NewTracerProvider(trace.WithSpanProcessor(serverStreamSR)) - assert.NoError(t, doCalls( - []grpc.DialOption{ - grpc.WithUnaryInterceptor(otelgrpc.UnaryClientInterceptor( - otelgrpc.WithTracerProvider(clientUnaryTP), - otelgrpc.WithMessageEvents(otelgrpc.ReceivedEvents, otelgrpc.SentEvents), - )), - grpc.WithStreamInterceptor(otelgrpc.StreamClientInterceptor( - otelgrpc.WithTracerProvider(clientStreamTP), - otelgrpc.WithMessageEvents(otelgrpc.ReceivedEvents, otelgrpc.SentEvents), - )), - }, + listener, err := net.Listen("tcp", "127.0.0.1:0") + require.NoError(t, err, "failed to open port") + err = newGrpcTest(listener, []grpc.DialOption{ + grpc.WithUnaryInterceptor(otelgrpc.UnaryClientInterceptor( + otelgrpc.WithTracerProvider(clientUnaryTP), + otelgrpc.WithMessageEvents(otelgrpc.ReceivedEvents, otelgrpc.SentEvents), + )), + grpc.WithStreamInterceptor(otelgrpc.StreamClientInterceptor( + otelgrpc.WithTracerProvider(clientStreamTP), + otelgrpc.WithMessageEvents(otelgrpc.ReceivedEvents, otelgrpc.SentEvents), + )), + }, []grpc.ServerOption{ grpc.UnaryInterceptor(otelgrpc.UnaryServerInterceptor( otelgrpc.WithTracerProvider(serverUnaryTP), @@ -122,14 +129,15 @@ func TestInterceptors(t *testing.T) { otelgrpc.WithMessageEvents(otelgrpc.ReceivedEvents, otelgrpc.SentEvents), )), }, - )) + ) + require.NoError(t, err) t.Run("UnaryClientSpans", func(t *testing.T) { - checkUnaryClientSpans(t, clientUnarySR.Ended()) + checkUnaryClientSpans(t, clientUnarySR.Ended(), listener.Addr().String()) }) t.Run("StreamClientSpans", func(t *testing.T) { - checkStreamClientSpans(t, clientStreamSR.Ended()) + checkStreamClientSpans(t, clientStreamSR.Ended(), listener.Addr().String()) }) t.Run("UnaryServerSpans", func(t *testing.T) { @@ -142,9 +150,14 @@ func TestInterceptors(t *testing.T) { }) } -func checkUnaryClientSpans(t *testing.T, spans []trace.ReadOnlySpan) { +func checkUnaryClientSpans(t *testing.T, spans []trace.ReadOnlySpan, addr string) { require.Len(t, spans, 2) + host, p, err := net.SplitHostPort(addr) + require.NoError(t, err) + port, err := strconv.Atoi(p) + require.NoError(t, err) + emptySpan := spans[0] assert.False(t, emptySpan.EndTime().IsZero()) assert.Equal(t, "grpc.testing.TestService/EmptyCall", emptySpan.Name()) @@ -169,6 +182,8 @@ func checkUnaryClientSpans(t *testing.T, spans []trace.ReadOnlySpan) { semconv.RPCService("grpc.testing.TestService"), otelgrpc.RPCSystemGRPC, otelgrpc.GRPCStatusCodeKey.Int64(int64(codes.OK)), + semconv.NetSockPeerAddr(host), + semconv.NetSockPeerPort(port), }, emptySpan.Attributes()) largeSpan := spans[1] @@ -197,12 +212,19 @@ func checkUnaryClientSpans(t *testing.T, spans []trace.ReadOnlySpan) { semconv.RPCService("grpc.testing.TestService"), otelgrpc.RPCSystemGRPC, otelgrpc.GRPCStatusCodeKey.Int64(int64(codes.OK)), + semconv.NetSockPeerAddr(host), + semconv.NetSockPeerPort(port), }, largeSpan.Attributes()) } -func checkStreamClientSpans(t *testing.T, spans []trace.ReadOnlySpan) { +func checkStreamClientSpans(t *testing.T, spans []trace.ReadOnlySpan, addr string) { require.Len(t, spans, 3) + host, p, err := net.SplitHostPort(addr) + require.NoError(t, err) + port, err := strconv.Atoi(p) + require.NoError(t, err) + streamInput := spans[0] assert.False(t, streamInput.EndTime().IsZero()) assert.Equal(t, "grpc.testing.TestService/StreamingInputCall", streamInput.Name()) @@ -243,6 +265,8 @@ func checkStreamClientSpans(t *testing.T, spans []trace.ReadOnlySpan) { semconv.RPCService("grpc.testing.TestService"), otelgrpc.RPCSystemGRPC, otelgrpc.GRPCStatusCodeKey.Int64(int64(codes.OK)), + semconv.NetSockPeerAddr(host), + semconv.NetSockPeerPort(port), }, streamInput.Attributes()) streamOutput := spans[1] @@ -291,6 +315,8 @@ func checkStreamClientSpans(t *testing.T, spans []trace.ReadOnlySpan) { semconv.RPCService("grpc.testing.TestService"), otelgrpc.RPCSystemGRPC, otelgrpc.GRPCStatusCodeKey.Int64(int64(codes.OK)), + semconv.NetSockPeerAddr(host), + semconv.NetSockPeerPort(port), }, streamOutput.Attributes()) pingPong := spans[2] @@ -359,6 +385,8 @@ func checkStreamClientSpans(t *testing.T, spans []trace.ReadOnlySpan) { semconv.RPCService("grpc.testing.TestService"), otelgrpc.RPCSystemGRPC, otelgrpc.GRPCStatusCodeKey.Int64(int64(codes.OK)), + semconv.NetSockPeerAddr(host), + semconv.NetSockPeerPort(port), }, pingPong.Attributes()) } @@ -406,11 +434,16 @@ func checkStreamServerSpans(t *testing.T, spans []trace.ReadOnlySpan) { }, }, }, streamInput.Events()) + port, ok := findAttribute(streamInput.Attributes(), semconv.NetSockPeerPortKey) + assert.True(t, ok) + assert.ElementsMatch(t, []attribute.KeyValue{ semconv.RPCMethod("StreamingInputCall"), semconv.RPCService("grpc.testing.TestService"), otelgrpc.RPCSystemGRPC, otelgrpc.GRPCStatusCodeKey.Int64(int64(codes.OK)), + semconv.NetSockPeerAddr("127.0.0.1"), + port, }, streamInput.Attributes()) streamOutput := spans[1] @@ -454,11 +487,17 @@ func checkStreamServerSpans(t *testing.T, spans []trace.ReadOnlySpan) { }, }, }, streamOutput.Events()) + + port, ok = findAttribute(streamOutput.Attributes(), semconv.NetSockPeerPortKey) + assert.True(t, ok) + assert.ElementsMatch(t, []attribute.KeyValue{ semconv.RPCMethod("StreamingOutputCall"), semconv.RPCService("grpc.testing.TestService"), otelgrpc.RPCSystemGRPC, otelgrpc.GRPCStatusCodeKey.Int64(int64(codes.OK)), + semconv.NetSockPeerAddr("127.0.0.1"), + port, }, streamOutput.Attributes()) pingPong := spans[2] @@ -522,11 +561,15 @@ func checkStreamServerSpans(t *testing.T, spans []trace.ReadOnlySpan) { }, }, }, pingPong.Events()) + port, ok = findAttribute(pingPong.Attributes(), semconv.NetSockPeerPortKey) + assert.True(t, ok) assert.ElementsMatch(t, []attribute.KeyValue{ semconv.RPCMethod("FullDuplexCall"), semconv.RPCService("grpc.testing.TestService"), otelgrpc.RPCSystemGRPC, otelgrpc.GRPCStatusCodeKey.Int64(int64(codes.OK)), + semconv.NetSockPeerAddr("127.0.0.1"), + port, }, pingPong.Attributes()) } @@ -552,11 +595,16 @@ func checkUnaryServerSpans(t *testing.T, spans []trace.ReadOnlySpan) { }, }, }, emptySpan.Events()) + + port, ok := findAttribute(emptySpan.Attributes(), semconv.NetSockPeerPortKey) + assert.True(t, ok) assert.ElementsMatch(t, []attribute.KeyValue{ semconv.RPCMethod("EmptyCall"), semconv.RPCService("grpc.testing.TestService"), otelgrpc.RPCSystemGRPC, otelgrpc.GRPCStatusCodeKey.Int64(int64(codes.OK)), + semconv.NetSockPeerAddr("127.0.0.1"), + port, }, emptySpan.Attributes()) largeSpan := spans[1] @@ -580,11 +628,16 @@ func checkUnaryServerSpans(t *testing.T, spans []trace.ReadOnlySpan) { }, }, }, largeSpan.Events()) + + port, ok = findAttribute(largeSpan.Attributes(), semconv.NetSockPeerPortKey) + assert.True(t, ok) assert.ElementsMatch(t, []attribute.KeyValue{ semconv.RPCMethod("UnaryCall"), semconv.RPCService("grpc.testing.TestService"), otelgrpc.RPCSystemGRPC, otelgrpc.GRPCStatusCodeKey.Int64(int64(codes.OK)), + semconv.NetSockPeerAddr("127.0.0.1"), + port, }, largeSpan.Attributes()) } @@ -607,6 +660,17 @@ func assertEvents(t *testing.T, expected, actual []trace.Event) bool { } func checkUnaryServerRecords(t *testing.T, reader metric.Reader) { + rm := metricdata.ResourceMetrics{} + err := reader.Collect(context.Background(), &rm) + assert.NoError(t, err) + require.Len(t, rm.ScopeMetrics, 1) + + // TODO: Remove these #4322 + address, ok := findScopeMetricAttribute(rm.ScopeMetrics[0], semconv.NetSockPeerAddrKey) + assert.True(t, ok) + port, ok := findScopeMetricAttribute(rm.ScopeMetrics[0], semconv.NetSockPeerPortKey) + assert.True(t, ok) + want := metricdata.ScopeMetrics{ Scope: wantInstrumentationScope, Metrics: []metricdata.Metrics{ @@ -623,6 +687,8 @@ func checkUnaryServerRecords(t *testing.T, reader metric.Reader) { semconv.RPCService("grpc.testing.TestService"), otelgrpc.RPCSystemGRPC, otelgrpc.GRPCStatusCodeKey.Int64(int64(codes.OK)), + address, + port, ), }, { @@ -631,6 +697,8 @@ func checkUnaryServerRecords(t *testing.T, reader metric.Reader) { semconv.RPCService("grpc.testing.TestService"), otelgrpc.RPCSystemGRPC, otelgrpc.GRPCStatusCodeKey.Int64(int64(codes.OK)), + address, + port, ), }, }, @@ -638,9 +706,38 @@ func checkUnaryServerRecords(t *testing.T, reader metric.Reader) { }, }, } - rm := metricdata.ResourceMetrics{} - err := reader.Collect(context.Background(), &rm) - assert.NoError(t, err) - require.Len(t, rm.ScopeMetrics, 1) + metricdatatest.AssertEqual(t, want, rm.ScopeMetrics[0], metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreValue()) } + +func findAttribute(kvs []attribute.KeyValue, key attribute.Key) (attribute.KeyValue, bool) { + for _, kv := range kvs { + if kv.Key == key { + return kv, true + } + } + return attribute.KeyValue{}, false +} + +func findScopeMetricAttribute(sm metricdata.ScopeMetrics, key attribute.Key) (attribute.KeyValue, bool) { + for _, m := range sm.Metrics { + // This only needs to cover data types used by the instrumentation. + switch d := m.Data.(type) { + case metricdata.Histogram[int64]: + for _, dp := range d.DataPoints { + if kv, ok := findAttribute(dp.Attributes.ToSlice(), key); ok { + return kv, true + } + } + case metricdata.Histogram[float64]: + for _, dp := range d.DataPoints { + if kv, ok := findAttribute(dp.Attributes.ToSlice(), key); ok { + return kv, true + } + } + default: + panic(fmt.Sprintf("unexpected data type %T - name %s", d, m.Name)) + } + } + return attribute.KeyValue{}, false +}