Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

otelgrpc: Refine tests to use a net socket instead of a buffer #4503

Merged
merged 8 commits into from Nov 6, 2023
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/stretchr/testify/require"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/test/bufconn"

"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
"go.opentelemetry.io/otel/attribute"
Expand All @@ -36,14 +37,18 @@ func TestStatsHandler(t *testing.T) {
serverSR := tracetest.NewSpanRecorder()
serverTP := trace.NewTracerProvider(trace.WithSpanProcessor(serverSR))

assert.NoError(t, doCalls(
listener := bufconn.Listen(bufSize)
defer listener.Close()
pellared marked this conversation as resolved.
Show resolved Hide resolved
err := newGrpcTest(
listener,
[]grpc.DialOption{
grpc.WithStatsHandler(otelgrpc.NewClientHandler(otelgrpc.WithTracerProvider(clientTP))),
},
[]grpc.ServerOption{
grpc.StatsHandler(otelgrpc.NewServerHandler(otelgrpc.WithTracerProvider(serverTP))),
},
))
)
require.NoError(t, err)

t.Run("ClientSpans", func(t *testing.T) {
checkClientSpans(t, clientSR.Ended())
Expand Down
147 changes: 104 additions & 43 deletions instrumentation/google.golang.org/grpc/otelgrpc/test/grpc_test.go
Expand Up @@ -17,16 +17,15 @@ package test
import (
"context"
"net"
"strconv"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/interop"
pb "google.golang.org/grpc/interop/grpc_testing"
"google.golang.org/grpc/test/bufconn"

"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
"go.opentelemetry.io/otel/attribute"
Expand All @@ -47,42 +46,47 @@ var wantInstrumentationScope = instrumentation.Scope{

const bufSize = 2048

func doCalls(cOpt []grpc.DialOption, sOpt []grpc.ServerOption) error {
l := bufconn.Listen(bufSize)
defer l.Close()

s := grpc.NewServer(sOpt...)
pb.RegisterTestServiceServer(s, interop.NewTestServer())
// newGrpcTest creats a grpc server, starts it, and executes all the calls, closes everything down.
func newGrpcTest(listener net.Listener, cOpt []grpc.DialOption, sOpt []grpc.ServerOption) (err error) {
pellared marked this conversation as resolved.
Show resolved Hide resolved
grpcServer := grpc.NewServer(sOpt...)
pb.RegisterTestServiceServer(grpcServer, interop.NewTestServer())
errCh := make(chan error)
go func() {
if err := s.Serve(l); err != nil {
panic(err)
}
errCh <- grpcServer.Serve(listener)
}()
defer s.Stop()

ctx := context.Background()
dial := func(context.Context, string) (net.Conn, error) { return l.Dial() }

cOpt = append(cOpt, grpc.WithInsecure())

if l, ok := listener.(interface{ Dial() (net.Conn, error) }); ok {
dial := func(context.Context, string) (net.Conn, error) { return l.Dial() }
cOpt = append(cOpt, grpc.WithContextDialer(dial))
}

conn, err := grpc.DialContext(
ctx,
"bufnet",
append([]grpc.DialOption{
grpc.WithContextDialer(dial),
grpc.WithTransportCredentials(insecure.NewCredentials()),
}, cOpt...)...,
listener.Addr().String(),
cOpt...,
)
if err != nil {
return err
}
defer conn.Close()
client := pb.NewTestServiceClient(conn)

doCalls(client)

conn.Close()
grpcServer.Stop()

return <-errCh
}

func doCalls(client pb.TestServiceClient) {
interop.DoEmptyUnaryCall(client)
interop.DoLargeUnaryCall(client)
interop.DoClientStreaming(client)
interop.DoServerStreaming(client)
interop.DoPingPong(client)

return nil
}

func TestInterceptors(t *testing.T) {
Expand All @@ -100,17 +104,18 @@ func TestInterceptors(t *testing.T) {
serverStreamSR := tracetest.NewSpanRecorder()
serverStreamTP := trace.NewTracerProvider(trace.WithSpanProcessor(serverStreamSR))

assert.NoError(t, doCalls(
[]grpc.DialOption{
grpc.WithUnaryInterceptor(otelgrpc.UnaryClientInterceptor(
otelgrpc.WithTracerProvider(clientUnaryTP),
otelgrpc.WithMessageEvents(otelgrpc.ReceivedEvents, otelgrpc.SentEvents),
)),
grpc.WithStreamInterceptor(otelgrpc.StreamClientInterceptor(
otelgrpc.WithTracerProvider(clientStreamTP),
otelgrpc.WithMessageEvents(otelgrpc.ReceivedEvents, otelgrpc.SentEvents),
)),
},
listener, err := net.Listen("tcp", "127.0.0.1:0")
require.NoError(t, err, "failed to open port")
err = newGrpcTest(listener, []grpc.DialOption{
grpc.WithUnaryInterceptor(otelgrpc.UnaryClientInterceptor(
otelgrpc.WithTracerProvider(clientUnaryTP),
otelgrpc.WithMessageEvents(otelgrpc.ReceivedEvents, otelgrpc.SentEvents),
)),
grpc.WithStreamInterceptor(otelgrpc.StreamClientInterceptor(
otelgrpc.WithTracerProvider(clientStreamTP),
otelgrpc.WithMessageEvents(otelgrpc.ReceivedEvents, otelgrpc.SentEvents),
)),
},
[]grpc.ServerOption{
grpc.UnaryInterceptor(otelgrpc.UnaryServerInterceptor(
otelgrpc.WithTracerProvider(serverUnaryTP),
Expand All @@ -122,14 +127,15 @@ func TestInterceptors(t *testing.T) {
otelgrpc.WithMessageEvents(otelgrpc.ReceivedEvents, otelgrpc.SentEvents),
)),
},
))
)
require.NoError(t, err)

t.Run("UnaryClientSpans", func(t *testing.T) {
checkUnaryClientSpans(t, clientUnarySR.Ended())
checkUnaryClientSpans(t, clientUnarySR.Ended(), listener.Addr().String())
})

t.Run("StreamClientSpans", func(t *testing.T) {
checkStreamClientSpans(t, clientStreamSR.Ended())
checkStreamClientSpans(t, clientStreamSR.Ended(), listener.Addr().String())
})

t.Run("UnaryServerSpans", func(t *testing.T) {
Expand All @@ -142,9 +148,14 @@ func TestInterceptors(t *testing.T) {
})
}

func checkUnaryClientSpans(t *testing.T, spans []trace.ReadOnlySpan) {
func checkUnaryClientSpans(t *testing.T, spans []trace.ReadOnlySpan, addr string) {
require.Len(t, spans, 2)

host, p, err := net.SplitHostPort(addr)
require.NoError(t, err)
port, err := strconv.Atoi(p)
require.NoError(t, err)

emptySpan := spans[0]
assert.False(t, emptySpan.EndTime().IsZero())
assert.Equal(t, "grpc.testing.TestService/EmptyCall", emptySpan.Name())
Expand All @@ -169,6 +180,8 @@ func checkUnaryClientSpans(t *testing.T, spans []trace.ReadOnlySpan) {
semconv.RPCService("grpc.testing.TestService"),
otelgrpc.RPCSystemGRPC,
otelgrpc.GRPCStatusCodeKey.Int64(int64(codes.OK)),
semconv.NetSockPeerAddr(host),
semconv.NetSockPeerPort(port),
}, emptySpan.Attributes())

largeSpan := spans[1]
Expand Down Expand Up @@ -197,12 +210,19 @@ func checkUnaryClientSpans(t *testing.T, spans []trace.ReadOnlySpan) {
semconv.RPCService("grpc.testing.TestService"),
otelgrpc.RPCSystemGRPC,
otelgrpc.GRPCStatusCodeKey.Int64(int64(codes.OK)),
semconv.NetSockPeerAddr(host),
semconv.NetSockPeerPort(port),
}, largeSpan.Attributes())
}

func checkStreamClientSpans(t *testing.T, spans []trace.ReadOnlySpan) {
func checkStreamClientSpans(t *testing.T, spans []trace.ReadOnlySpan, addr string) {
require.Len(t, spans, 3)

host, p, err := net.SplitHostPort(addr)
require.NoError(t, err)
port, err := strconv.Atoi(p)
require.NoError(t, err)

streamInput := spans[0]
assert.False(t, streamInput.EndTime().IsZero())
assert.Equal(t, "grpc.testing.TestService/StreamingInputCall", streamInput.Name())
Expand Down Expand Up @@ -243,6 +263,8 @@ func checkStreamClientSpans(t *testing.T, spans []trace.ReadOnlySpan) {
semconv.RPCService("grpc.testing.TestService"),
otelgrpc.RPCSystemGRPC,
otelgrpc.GRPCStatusCodeKey.Int64(int64(codes.OK)),
semconv.NetSockPeerAddr(host),
semconv.NetSockPeerPort(port),
}, streamInput.Attributes())

streamOutput := spans[1]
Expand Down Expand Up @@ -291,6 +313,8 @@ func checkStreamClientSpans(t *testing.T, spans []trace.ReadOnlySpan) {
semconv.RPCService("grpc.testing.TestService"),
otelgrpc.RPCSystemGRPC,
otelgrpc.GRPCStatusCodeKey.Int64(int64(codes.OK)),
semconv.NetSockPeerAddr(host),
semconv.NetSockPeerPort(port),
}, streamOutput.Attributes())

pingPong := spans[2]
Expand Down Expand Up @@ -359,6 +383,8 @@ func checkStreamClientSpans(t *testing.T, spans []trace.ReadOnlySpan) {
semconv.RPCService("grpc.testing.TestService"),
otelgrpc.RPCSystemGRPC,
otelgrpc.GRPCStatusCodeKey.Int64(int64(codes.OK)),
semconv.NetSockPeerAddr(host),
semconv.NetSockPeerPort(port),
}, pingPong.Attributes())
}

Expand Down Expand Up @@ -411,7 +437,10 @@ func checkStreamServerSpans(t *testing.T, spans []trace.ReadOnlySpan) {
semconv.RPCService("grpc.testing.TestService"),
otelgrpc.RPCSystemGRPC,
otelgrpc.GRPCStatusCodeKey.Int64(int64(codes.OK)),
}, streamInput.Attributes())
semconv.NetSockPeerAddr("127.0.0.1"),
}, filterKVs(streamInput.Attributes(), semconv.NetSockPeerPortKey))
// Client port is random
assert.True(t, existKVs(streamInput.Attributes(), semconv.NetSockPeerAddrKey))
pellared marked this conversation as resolved.
Show resolved Hide resolved

streamOutput := spans[1]
assert.False(t, streamOutput.EndTime().IsZero())
Expand Down Expand Up @@ -459,7 +488,9 @@ func checkStreamServerSpans(t *testing.T, spans []trace.ReadOnlySpan) {
semconv.RPCService("grpc.testing.TestService"),
otelgrpc.RPCSystemGRPC,
otelgrpc.GRPCStatusCodeKey.Int64(int64(codes.OK)),
}, streamOutput.Attributes())
semconv.NetSockPeerAddr("127.0.0.1"),
}, filterKVs(streamOutput.Attributes(), semconv.NetSockPeerPortKey))
assert.True(t, existKVs(streamOutput.Attributes(), semconv.NetSockPeerPortKey))

pingPong := spans[2]
assert.False(t, pingPong.EndTime().IsZero())
Expand Down Expand Up @@ -527,7 +558,9 @@ func checkStreamServerSpans(t *testing.T, spans []trace.ReadOnlySpan) {
semconv.RPCService("grpc.testing.TestService"),
otelgrpc.RPCSystemGRPC,
otelgrpc.GRPCStatusCodeKey.Int64(int64(codes.OK)),
}, pingPong.Attributes())
semconv.NetSockPeerAddr("127.0.0.1"),
}, filterKVs(pingPong.Attributes(), semconv.NetSockPeerPortKey))
assert.True(t, existKVs(pingPong.Attributes(), semconv.NetSockPeerPortKey))
}

func checkUnaryServerSpans(t *testing.T, spans []trace.ReadOnlySpan) {
Expand Down Expand Up @@ -557,7 +590,9 @@ func checkUnaryServerSpans(t *testing.T, spans []trace.ReadOnlySpan) {
semconv.RPCService("grpc.testing.TestService"),
otelgrpc.RPCSystemGRPC,
otelgrpc.GRPCStatusCodeKey.Int64(int64(codes.OK)),
}, emptySpan.Attributes())
semconv.NetSockPeerAddr("127.0.0.1"),
}, filterKVs(emptySpan.Attributes(), semconv.NetSockPeerPortKey))
assert.True(t, existKVs(emptySpan.Attributes(), semconv.NetSockPeerPortKey))

largeSpan := spans[1]
assert.False(t, largeSpan.EndTime().IsZero())
Expand Down Expand Up @@ -585,7 +620,9 @@ func checkUnaryServerSpans(t *testing.T, spans []trace.ReadOnlySpan) {
semconv.RPCService("grpc.testing.TestService"),
otelgrpc.RPCSystemGRPC,
otelgrpc.GRPCStatusCodeKey.Int64(int64(codes.OK)),
}, largeSpan.Attributes())
semconv.NetSockPeerAddr("127.0.0.1"),
}, filterKVs(largeSpan.Attributes(), semconv.NetSockPeerPortKey))
assert.True(t, existKVs(largeSpan.Attributes(), semconv.NetSockPeerPortKey))
}

func assertEvents(t *testing.T, expected, actual []trace.Event) bool {
Expand Down Expand Up @@ -644,3 +681,27 @@ func checkUnaryServerRecords(t *testing.T, reader metric.Reader) {
require.Len(t, rm.ScopeMetrics, 1)
metricdatatest.AssertEqual(t, want, rm.ScopeMetrics[0], metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreValue())
}

func existKVs(kvs []attribute.KeyValue, key attribute.Key) bool {
for _, kv := range kvs {
if kv.Key == key {
return true
}
}
return false
}

func filterKVs(kvs []attribute.KeyValue, keys ...attribute.Key) []attribute.KeyValue {
keyLookup := make(map[attribute.Key]struct{}, len(keys))
for _, key := range keys {
keyLookup[key] = struct{}{}
}

output := make([]attribute.KeyValue, 0, len(kvs))
for _, kv := range kvs {
if _, ok := keyLookup[kv.Key]; !ok {
output = append(output, kv)
}
}
return output
}