Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

otelgrpc: StreamClientInterceptor ends spans synchronously #4537

Merged
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,10 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm

- Add the new `go.opentelemetry.io/contrib/instrgen` package to provide auto-generated source code instrumentation. (#3068, #3108)

### Fixed

- Fix `StreamClientInterceptor` in `go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc` to end the spans synchronously. (#4537)

## [1.21.0/0.46.0/0.15.0/0.1.0] - 2023-11-10

### Added
Expand Down
91 changes: 20 additions & 71 deletions instrumentation/google.golang.org/grpc/otelgrpc/interceptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,27 +125,13 @@
}
}

type streamEventType int

type streamEvent struct {
Type streamEventType
Err error
}

const (
receiveEndEvent streamEventType = iota
errorEvent
)

// clientStream wraps around the embedded grpc.ClientStream, and intercepts the RecvMsg and
// SendMsg method call.
type clientStream struct {
grpc.ClientStream
desc *grpc.StreamDesc

desc *grpc.StreamDesc
events chan streamEvent
eventsDone chan struct{}
finished chan error
span trace.Span

receivedEvent bool
sentEvent bool
Expand All @@ -160,11 +146,11 @@
err := w.ClientStream.RecvMsg(m)

if err == nil && !w.desc.ServerStreams {
w.sendStreamEvent(receiveEndEvent, nil)
w.endSpan(nil)
} else if err == io.EOF {
w.sendStreamEvent(receiveEndEvent, nil)
w.endSpan(nil)
} else if err != nil {
w.sendStreamEvent(errorEvent, err)
w.endSpan(err)

Check warning on line 153 in instrumentation/google.golang.org/grpc/otelgrpc/interceptor.go

View check run for this annotation

Codecov / codecov/patch

instrumentation/google.golang.org/grpc/otelgrpc/interceptor.go#L153

Added line #L153 was not covered by tests
} else {
w.receivedMessageID++

Expand All @@ -186,7 +172,7 @@
}

if err != nil {
w.sendStreamEvent(errorEvent, err)
w.endSpan(err)

Check warning on line 175 in instrumentation/google.golang.org/grpc/otelgrpc/interceptor.go

View check run for this annotation

Codecov / codecov/patch

instrumentation/google.golang.org/grpc/otelgrpc/interceptor.go#L175

Added line #L175 was not covered by tests
}

return err
Expand All @@ -195,7 +181,7 @@
func (w *clientStream) Header() (metadata.MD, error) {
md, err := w.ClientStream.Header()
if err != nil {
w.sendStreamEvent(errorEvent, err)
w.endSpan(err)

Check warning on line 184 in instrumentation/google.golang.org/grpc/otelgrpc/interceptor.go

View check run for this annotation

Codecov / codecov/patch

instrumentation/google.golang.org/grpc/otelgrpc/interceptor.go#L184

Added line #L184 was not covered by tests
}

return md, err
Expand All @@ -204,54 +190,32 @@
func (w *clientStream) CloseSend() error {
err := w.ClientStream.CloseSend()
if err != nil {
w.sendStreamEvent(errorEvent, err)
w.endSpan(err)

Check warning on line 193 in instrumentation/google.golang.org/grpc/otelgrpc/interceptor.go

View check run for this annotation

Codecov / codecov/patch

instrumentation/google.golang.org/grpc/otelgrpc/interceptor.go#L193

Added line #L193 was not covered by tests
}

return err
}

func wrapClientStream(ctx context.Context, s grpc.ClientStream, desc *grpc.StreamDesc, cfg *config) *clientStream {
events := make(chan streamEvent)
eventsDone := make(chan struct{})
finished := make(chan error)

go func() {
defer close(eventsDone)

for {
select {
case event := <-events:
switch event.Type {
case receiveEndEvent:
finished <- nil
return
case errorEvent:
finished <- event.Err
return
}
case <-ctx.Done():
finished <- ctx.Err()
return
}
}
}()

func wrapClientStream(ctx context.Context, s grpc.ClientStream, desc *grpc.StreamDesc, span trace.Span, cfg *config) *clientStream {
return &clientStream{
ClientStream: s,
span: span,
desc: desc,
events: events,
eventsDone: eventsDone,
finished: finished,
receivedEvent: cfg.ReceivedEvent,
sentEvent: cfg.SentEvent,
}
}

func (w *clientStream) sendStreamEvent(eventType streamEventType, err error) {
select {
case <-w.eventsDone:
case w.events <- streamEvent{Type: eventType, Err: err}:
func (w *clientStream) endSpan(err error) {
if err != nil {
s, _ := status.FromError(err)
w.span.SetStatus(codes.Error, s.Message())
w.span.SetAttributes(statusCodeAttr(s.Code()))

Check warning on line 213 in instrumentation/google.golang.org/grpc/otelgrpc/interceptor.go

View check run for this annotation

Codecov / codecov/patch

instrumentation/google.golang.org/grpc/otelgrpc/interceptor.go#L211-L213

Added lines #L211 - L213 were not covered by tests
} else {
w.span.SetAttributes(statusCodeAttr(grpc_codes.OK))
}

w.span.End()
}

// StreamClientInterceptor returns a grpc.StreamClientInterceptor suitable
Expand Down Expand Up @@ -306,22 +270,7 @@
span.End()
return s, err
}
stream := wrapClientStream(ctx, s, desc, cfg)

go func() {
err := <-stream.finished

if err != nil {
s, _ := status.FromError(err)
span.SetStatus(codes.Error, s.Message())
span.SetAttributes(statusCodeAttr(s.Code()))
} else {
span.SetAttributes(statusCodeAttr(grpc_codes.OK))
}

span.End()
}()

stream := wrapClientStream(ctx, s, desc, span, cfg)
return stream, nil
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,7 @@ func TestStatsHandler(t *testing.T) {

listener, err := net.Listen("tcp", "127.0.0.1:0")
require.NoError(t, err, "failed to open port")
err = newGrpcTest(
listener,
client := newGrpcTest(t, listener,
[]grpc.DialOption{
grpc.WithStatsHandler(otelgrpc.NewClientHandler(
otelgrpc.WithTracerProvider(clientTP),
Expand All @@ -66,7 +65,7 @@ func TestStatsHandler(t *testing.T) {
),
},
)
require.NoError(t, err)
doCalls(client)

t.Run("ClientSpans", func(t *testing.T) {
checkClientSpans(t, clientSR.Ended())
Expand Down
34 changes: 13 additions & 21 deletions instrumentation/google.golang.org/grpc/otelgrpc/test/grpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (
"net"
"strconv"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand All @@ -46,14 +45,18 @@ var wantInstrumentationScope = instrumentation.Scope{
Version: otelgrpc.Version(),
}

// newGrpcTest creats a grpc server, starts it, and executes all the calls, closes everything down.
func newGrpcTest(listener net.Listener, cOpt []grpc.DialOption, sOpt []grpc.ServerOption) error {
// newGrpcTest creats a grpc server, starts it, and returns the client, closes everything down during test cleanup.
func newGrpcTest(t testing.TB, listener net.Listener, cOpt []grpc.DialOption, sOpt []grpc.ServerOption) pb.TestServiceClient {
grpcServer := grpc.NewServer(sOpt...)
pb.RegisterTestServiceServer(grpcServer, interop.NewTestServer())
errCh := make(chan error)
go func() {
errCh <- grpcServer.Serve(listener)
}()
t.Cleanup(func() {
grpcServer.Stop()
assert.NoError(t, <-errCh)
})
ctx := context.Background()

cOpt = append(cOpt, grpc.WithBlock(), grpc.WithTransportCredentials(insecure.NewCredentials()))
Expand All @@ -68,17 +71,12 @@ func newGrpcTest(listener net.Listener, cOpt []grpc.DialOption, sOpt []grpc.Serv
listener.Addr().String(),
cOpt...,
)
if err != nil {
return err
}
client := pb.NewTestServiceClient(conn)

doCalls(client)

conn.Close()
grpcServer.Stop()
require.NoError(t, err)
t.Cleanup(func() {
assert.NoError(t, conn.Close())
})

return <-errCh
return pb.NewTestServiceClient(conn)
}

func doCalls(client pb.TestServiceClient) {
Expand Down Expand Up @@ -106,7 +104,7 @@ func TestInterceptors(t *testing.T) {

listener, err := net.Listen("tcp", "127.0.0.1:0")
require.NoError(t, err, "failed to open port")
err = newGrpcTest(listener,
client := newGrpcTest(t, listener,
[]grpc.DialOption{
//nolint:staticcheck // Interceptors are deprecated and will be removed in the next release.
grpc.WithUnaryInterceptor(otelgrpc.UnaryClientInterceptor(
Expand All @@ -133,19 +131,13 @@ func TestInterceptors(t *testing.T) {
)),
},
)
require.NoError(t, err)
doCalls(client)

t.Run("UnaryClientSpans", func(t *testing.T) {
checkUnaryClientSpans(t, clientUnarySR.Ended(), listener.Addr().String())
})

t.Run("StreamClientSpans", func(t *testing.T) {
// StreamClientInterceptor ends the spans asynchronously.
// We need to wait for all spans before asserting them.
require.EventuallyWithT(t, func(c *assert.CollectT) {
assert.Len(c, clientStreamSR.Ended(), 3)
}, 5*time.Second, 100*time.Millisecond)

checkStreamClientSpans(t, clientStreamSR.Ended(), listener.Addr().String())
})

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import (
"google.golang.org/grpc"
grpc_codes "google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/interop"
"google.golang.org/grpc/interop/grpc_testing"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"
Expand Down Expand Up @@ -1128,3 +1129,20 @@ func assertServerMetrics(t *testing.T, reader metric.Reader, serviceName, name s
require.Len(t, rm.ScopeMetrics, 1)
metricdatatest.AssertEqual(t, want, rm.ScopeMetrics[0], metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreValue())
}

func BenchmarkStreamClientInterceptor(b *testing.B) {
listener, err := net.Listen("tcp", "127.0.0.1:0")
require.NoError(b, err, "failed to open port")
client := newGrpcTest(b, listener,
[]grpc.DialOption{
//nolint:staticcheck // Interceptors are deprecated and will be removed in the next release.
grpc.WithStreamInterceptor(otelgrpc.StreamClientInterceptor()),
},
[]grpc.ServerOption{},
)

b.ResetTimer()
for i := 0; i < b.N; i++ {
interop.DoClientStreaming(client)
}
}