Skip to content

Commit

Permalink
Add unit tests for when events are disabled.
Browse files Browse the repository at this point in the history
  • Loading branch information
Baliedge committed Jun 5, 2023
1 parent fe9dc81 commit 372930d
Showing 1 changed file with 127 additions and 14 deletions.
141 changes: 127 additions & 14 deletions instrumentation/google.golang.org/grpc/otelgrpc/test/interceptor_test.go
Expand Up @@ -93,6 +93,9 @@ func TestUnaryClientInterceptor(t *testing.T) {
otelgrpc.WithTracerProvider(tp),
otelgrpc.WithMessageEvents(otelgrpc.ReceivedEvents, otelgrpc.SentEvents),
)
unaryInterceptorNoEvents := otelgrpc.UnaryClientInterceptor(
otelgrpc.WithTracerProvider(tp),
)

req := &grpc_testing.SimpleRequest{}
reply := &grpc_testing.SimpleResponse{}
Expand All @@ -101,14 +104,16 @@ func TestUnaryClientInterceptor(t *testing.T) {
checks := []struct {
method string
name string
interceptor grpc.UnaryClientInterceptor
expectedSpanCode codes.Code
expectedAttr []attribute.KeyValue
eventsAttr []map[attribute.Key]attribute.Value
expectErr bool
}{
{
method: "/github.com.serviceName/bar",
name: "github.com.serviceName/bar",
method: "/github.com.serviceName/bar",
name: "github.com.serviceName/bar",
interceptor: unaryInterceptor,
expectedAttr: []attribute.KeyValue{
semconv.RPCSystemGRPC,
semconv.RPCService("github.com.serviceName"),
Expand All @@ -129,8 +134,9 @@ func TestUnaryClientInterceptor(t *testing.T) {
},
},
{
method: "/serviceName/bar",
name: "serviceName/bar",
method: "/serviceName/bar",
name: "serviceName/bar",
interceptor: unaryInterceptor,
expectedAttr: []attribute.KeyValue{
semconv.RPCSystemGRPC,
semconv.RPCService("serviceName"),
Expand All @@ -151,8 +157,23 @@ func TestUnaryClientInterceptor(t *testing.T) {
},
},
{
method: "serviceName/bar",
name: "serviceName/bar",
method: "/serviceName/bar_noevents",
name: "serviceName/bar_noevents",
interceptor: unaryInterceptorNoEvents,
expectedAttr: []attribute.KeyValue{
semconv.RPCSystemGRPC,
semconv.RPCService("serviceName"),
semconv.RPCMethod("bar_noevents"),
otelgrpc.GRPCStatusCodeKey.Int64(0),
semconv.NetPeerName("fake"),
semconv.NetPeerPort(8906),
},
eventsAttr: []map[attribute.Key]attribute.Value{},
},
{
method: "serviceName/bar",
name: "serviceName/bar",
interceptor: unaryInterceptor,
expectedAttr: []attribute.KeyValue{
semconv.RPCSystemGRPC,
semconv.RPCService("serviceName"),
Expand All @@ -175,6 +196,7 @@ func TestUnaryClientInterceptor(t *testing.T) {
{
method: "serviceName/bar_error",
name: "serviceName/bar_error",
interceptor: unaryInterceptor,
expectedSpanCode: codes.Error,
expectedAttr: []attribute.KeyValue{
semconv.RPCSystemGRPC,
Expand All @@ -197,8 +219,9 @@ func TestUnaryClientInterceptor(t *testing.T) {
expectErr: true,
},
{
method: "invalidName",
name: "invalidName",
method: "invalidName",
name: "invalidName",
interceptor: unaryInterceptor,
expectedAttr: []attribute.KeyValue{
semconv.RPCSystemGRPC,
otelgrpc.GRPCStatusCodeKey.Int64(0),
Expand All @@ -217,8 +240,9 @@ func TestUnaryClientInterceptor(t *testing.T) {
},
},
{
method: "/github.com.foo.serviceName_123/method",
name: "github.com.foo.serviceName_123/method",
method: "/github.com.foo.serviceName_123/method",
name: "github.com.foo.serviceName_123/method",
interceptor: unaryInterceptor,
expectedAttr: []attribute.KeyValue{
semconv.RPCSystemGRPC,
otelgrpc.GRPCStatusCodeKey.Int64(0),
Expand All @@ -241,7 +265,7 @@ func TestUnaryClientInterceptor(t *testing.T) {
}

for _, check := range checks {
err := unaryInterceptor(context.Background(), check.method, req, reply, clientConn, uniInterceptorInvoker.invoker)
err := check.interceptor(context.Background(), check.method, req, reply, clientConn, uniInterceptorInvoker.invoker)
if check.expectErr {
assert.Error(t, err)
} else {
Expand Down Expand Up @@ -290,6 +314,7 @@ func (mockClientStream) Trailer() metadata.MD { return nil }
type clientStreamOpts struct {
NumRecvMsgs int
DisableServerStreams bool
NoEvents bool
}

func newMockClientStream(opts clientStreamOpts) *mockClientStream {
Expand Down Expand Up @@ -317,10 +342,13 @@ func createInterceptedStreamClient(t *testing.T, method string, opts clientStrea
// tracer
sr := tracetest.NewSpanRecorder()
tp := trace.NewTracerProvider(trace.WithSpanProcessor(sr))
streamCI := otelgrpc.StreamClientInterceptor(
interceptorOpts := []otelgrpc.Option{
otelgrpc.WithTracerProvider(tp),
otelgrpc.WithMessageEvents(otelgrpc.ReceivedEvents, otelgrpc.SentEvents),
)
}
if !opts.NoEvents {
interceptorOpts = append(interceptorOpts, otelgrpc.WithMessageEvents(otelgrpc.ReceivedEvents, otelgrpc.SentEvents))
}
streamCI := otelgrpc.StreamClientInterceptor(interceptorOpts...)

streamClient, err := streamCI(
context.Background(),
Expand Down Expand Up @@ -407,6 +435,44 @@ func TestStreamClientInterceptorOnBIDIStream(t *testing.T) {
_ = streamClient.CloseSend()
}

func TestStreamClientInterceptorWithNoEvents(t *testing.T) {
defer goleak.VerifyNone(t)

method := "/github.com.serviceName/bar"
name := "github.com.serviceName/bar"
streamClient, sr := createInterceptedStreamClient(t, method, clientStreamOpts{NumRecvMsgs: 10, NoEvents: true})
_, ok := getSpanFromRecorder(sr, name)
require.False(t, ok, "span should not end while stream is open")

req := &grpc_testing.SimpleRequest{}
reply := &grpc_testing.SimpleResponse{}

// send and receive fake data
for i := 0; i < 10; i++ {
_ = streamClient.SendMsg(req)
_ = streamClient.RecvMsg(reply)
}

// The stream has been exhausted so next read should get a EOF and the stream should be considered closed.
err := streamClient.RecvMsg(reply)
require.Equal(t, io.EOF, err)

// added retry because span end is called in separate go routine
var span trace.ReadOnlySpan
for retry := 0; retry < 5; retry++ {
span, ok = getSpanFromRecorder(sr, name)
if ok {
break
}
time.Sleep(time.Second * 1)
}
require.True(t, ok, "missing span %s", name)
require.Empty(t, span.Events())

// ensure CloseSend can be subsequently called
_ = streamClient.CloseSend()
}

func TestStreamClientInterceptorOnUnidirectionalClientServerStream(t *testing.T) {
defer goleak.VerifyNone(t)

Expand Down Expand Up @@ -739,6 +805,30 @@ func TestUnaryServerInterceptor(t *testing.T) {
}
}

func TestUnaryServerInterceptorWithNoEvents(t *testing.T) {
sr := tracetest.NewSpanRecorder()
tp := trace.NewTracerProvider(trace.WithSpanProcessor(sr))
usi := otelgrpc.UnaryServerInterceptor(
otelgrpc.WithTracerProvider(tp),
)
grpcCode := grpc_codes.OK
name := grpcCode.String()
// call the unary interceptor
grpcErr := status.Error(grpcCode, name)
handler := func(_ context.Context, _ interface{}) (interface{}, error) {
return nil, grpcErr
}
_, err := usi(context.Background(), &grpc_testing.SimpleRequest{}, &grpc.UnaryServerInfo{FullMethod: name}, handler)
assert.Equal(t, grpcErr, err)

// validate span
span, ok := getSpanFromRecorder(sr, name)
require.True(t, ok, "missing span %s", name)

// validate events and their attributes
assert.Empty(t, span.Events())
}

type mockServerStream struct {
grpc.ServerStream
}
Expand Down Expand Up @@ -772,6 +862,29 @@ func TestStreamServerInterceptor(t *testing.T) {
}
}

func TestStreamServerInterceptorWithNoEvents(t *testing.T) {
sr := tracetest.NewSpanRecorder()
tp := trace.NewTracerProvider(trace.WithSpanProcessor(sr))
usi := otelgrpc.StreamServerInterceptor(
otelgrpc.WithTracerProvider(tp),
)

grpcCode := grpc_codes.OK
name := grpcCode.String()
// call the stream interceptor
grpcErr := status.Error(grpcCode, name)
handler := func(_ interface{}, _ grpc.ServerStream) error {
return grpcErr
}
err := usi(&grpc_testing.SimpleRequest{}, &mockServerStream{}, &grpc.StreamServerInfo{FullMethod: name}, handler)
assert.Equal(t, grpcErr, err)

// validate span
span, ok := getSpanFromRecorder(sr, name)
require.True(t, ok, "missing span %s", name)
require.Empty(t, span.Events())
}

func TestParseFullMethod(t *testing.T) {
tests := []struct {
fullMethod string
Expand Down

0 comments on commit 372930d

Please sign in to comment.