Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reduce OpenTelemetry events in gRPC interceptor #3964

Merged
merged 13 commits into from Jun 20, 2023
1 change: 1 addition & 0 deletions CHANGELOG.md
Expand Up @@ -36,6 +36,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
### Changed

- Use `strings.Cut()` instead of `string.SplitN()` for better readability and memory use. (#3822)
- Change `otelgrpc` interceptors to disable `SENT`/`RECEIVED` events. Use `WithMessageEvents()` to turn back on. (#3964)

## [1.17.0-rc.1/0.42.0-rc.1/0.11.0-rc.1] - 2023-05-17

Expand Down
38 changes: 38 additions & 0 deletions instrumentation/google.golang.org/grpc/otelgrpc/config.go
Expand Up @@ -42,6 +42,9 @@ type config struct {
TracerProvider trace.TracerProvider
MeterProvider metric.MeterProvider

ReceivedEvent bool
SentEvent bool

meter metric.Meter
rpcServerDuration metric.Int64Histogram
}
Expand Down Expand Up @@ -131,3 +134,38 @@ func (o meterProviderOption) apply(c *config) {
func WithMeterProvider(mp metric.MeterProvider) Option {
return meterProviderOption{mp: mp}
}

// Event type that can be recorded, see WithMessageEvents.
type Event int

// Different types of events that can be recorded, see WithMessageEvents.
const (
ReceivedEvents Event = iota
SentEvents
)

type messageEventsProviderOption struct {
events []Event
}

func (m messageEventsProviderOption) apply(c *config) {
for _, e := range m.events {
switch e {
case ReceivedEvents:
c.ReceivedEvent = true
case SentEvents:
c.SentEvent = true
}
}
}

// WithMessageEvents configures the Handler to record the specified events
// (span.AddEvent) on spans. By default only summary attributes are added at the
// end of the request.
//
// Valid events are:
// - ReceivedEvents: Record the number of bytes read after every gRPC read operation.
// - SentEvents: Record the number of bytes written after every gRPC write operation.
func WithMessageEvents(events ...Event) Option {
return messageEventsProviderOption{events: events}
}
72 changes: 51 additions & 21 deletions instrumentation/google.golang.org/grpc/otelgrpc/interceptor.go
Expand Up @@ -95,11 +95,15 @@

ctx = inject(ctx, cfg.Propagators)

messageSent.Event(ctx, 1, req)
if cfg.SentEvent {
messageSent.Event(ctx, 1, req)
}

err := invoker(ctx, method, req, reply, cc, callOpts...)

messageReceived.Event(ctx, 1, reply)
if cfg.ReceivedEvent {
messageReceived.Event(ctx, 1, reply)
}

if err != nil {
s, _ := status.FromError(err)
Expand Down Expand Up @@ -135,6 +139,9 @@
eventsDone chan struct{}
finished chan error

receivedEvent bool
sentEvent bool

receivedMessageID int
sentMessageID int
}
Expand All @@ -152,7 +159,10 @@
w.sendStreamEvent(errorEvent, err)
} else {
w.receivedMessageID++
messageReceived.Event(w.Context(), w.receivedMessageID, m)

if w.receivedEvent {
messageReceived.Event(w.Context(), w.receivedMessageID, m)
}
}

return err
Expand All @@ -162,7 +172,10 @@
err := w.ClientStream.SendMsg(m)

w.sentMessageID++
messageSent.Event(w.Context(), w.sentMessageID, m)

if w.sentEvent {
messageSent.Event(w.Context(), w.sentMessageID, m)
}

if err != nil {
w.sendStreamEvent(errorEvent, err)
Expand Down Expand Up @@ -191,7 +204,7 @@
return err
}

func wrapClientStream(ctx context.Context, s grpc.ClientStream, desc *grpc.StreamDesc) *clientStream {
func wrapClientStream(ctx context.Context, s grpc.ClientStream, desc *grpc.StreamDesc, cfg *config) *clientStream {
events := make(chan streamEvent)
eventsDone := make(chan struct{})
finished := make(chan error)
Expand All @@ -218,11 +231,13 @@
}()

return &clientStream{
ClientStream: s,
desc: desc,
events: events,
eventsDone: eventsDone,
finished: finished,
ClientStream: s,
desc: desc,
events: events,
eventsDone: eventsDone,
finished: finished,
receivedEvent: cfg.ReceivedEvent,
sentEvent: cfg.SentEvent,
}
}

Expand Down Expand Up @@ -277,7 +292,7 @@
span.End()
return s, err
}
stream := wrapClientStream(ctx, s, desc)
stream := wrapClientStream(ctx, s, desc, cfg)

go func() {
err := <-stream.finished
Expand Down Expand Up @@ -331,7 +346,9 @@
)
defer span.End()

messageReceived.Event(ctx, 1, req)
if cfg.ReceivedEvent {
messageReceived.Event(ctx, 1, req)
}

var statusCode grpc_codes.Code
defer func(t time.Time) {
Expand All @@ -347,11 +364,15 @@
statusCode, msg := serverStatus(s)
span.SetStatus(statusCode, msg)
span.SetAttributes(statusCodeAttr(s.Code()))
messageSent.Event(ctx, 1, s.Proto())
if cfg.SentEvent {
messageSent.Event(ctx, 1, s.Proto())
}

Check warning on line 369 in instrumentation/google.golang.org/grpc/otelgrpc/interceptor.go

View check run for this annotation

Codecov / codecov/patch

instrumentation/google.golang.org/grpc/otelgrpc/interceptor.go#L368-L369

Added lines #L368 - L369 were not covered by tests
} else {
statusCode = grpc_codes.OK
span.SetAttributes(statusCodeAttr(grpc_codes.OK))
messageSent.Event(ctx, 1, resp)
if cfg.SentEvent {
messageSent.Event(ctx, 1, resp)
}
}

return resp, err
Expand All @@ -366,6 +387,9 @@

receivedMessageID int
sentMessageID int

receivedEvent bool
sentEvent bool
}

func (w *serverStream) Context() context.Context {
Expand All @@ -377,7 +401,9 @@

if err == nil {
w.receivedMessageID++
messageReceived.Event(w.Context(), w.receivedMessageID, m)
if w.receivedEvent {
messageReceived.Event(w.Context(), w.receivedMessageID, m)
}
}

return err
Expand All @@ -387,15 +413,19 @@
err := w.ServerStream.SendMsg(m)

w.sentMessageID++
messageSent.Event(w.Context(), w.sentMessageID, m)
if w.sentEvent {
messageSent.Event(w.Context(), w.sentMessageID, m)
}

return err
}

func wrapServerStream(ctx context.Context, ss grpc.ServerStream) *serverStream {
func wrapServerStream(ctx context.Context, ss grpc.ServerStream, cfg *config) *serverStream {
return &serverStream{
ServerStream: ss,
ctx: ctx,
ServerStream: ss,
ctx: ctx,
receivedEvent: cfg.ReceivedEvent,
sentEvent: cfg.SentEvent,
}
}

Expand All @@ -420,7 +450,7 @@
Type: StreamServer,
}
if cfg.Filter != nil && !cfg.Filter(i) {
return handler(srv, wrapServerStream(ctx, ss))
return handler(srv, wrapServerStream(ctx, ss, cfg))

Check warning on line 453 in instrumentation/google.golang.org/grpc/otelgrpc/interceptor.go

View check run for this annotation

Codecov / codecov/patch

instrumentation/google.golang.org/grpc/otelgrpc/interceptor.go#L453

Added line #L453 was not covered by tests
}

ctx = extract(ctx, cfg.Propagators)
Expand All @@ -434,7 +464,7 @@
)
defer span.End()

err := handler(srv, wrapServerStream(ctx, ss))
err := handler(srv, wrapServerStream(ctx, ss, cfg))
if err != nil {
s, _ := status.FromError(err)
statusCode, msg := serverStatus(s)
Expand Down
Expand Up @@ -94,12 +94,25 @@ func TestInterceptors(t *testing.T) {

assert.NoError(t, doCalls(
[]grpc.DialOption{
grpc.WithUnaryInterceptor(otelgrpc.UnaryClientInterceptor(otelgrpc.WithTracerProvider(clientUnaryTP))),
grpc.WithStreamInterceptor(otelgrpc.StreamClientInterceptor(otelgrpc.WithTracerProvider(clientStreamTP))),
grpc.WithUnaryInterceptor(otelgrpc.UnaryClientInterceptor(
otelgrpc.WithTracerProvider(clientUnaryTP),
otelgrpc.WithMessageEvents(otelgrpc.ReceivedEvents, otelgrpc.SentEvents),
)),
grpc.WithStreamInterceptor(otelgrpc.StreamClientInterceptor(
otelgrpc.WithTracerProvider(clientStreamTP),
otelgrpc.WithMessageEvents(otelgrpc.ReceivedEvents, otelgrpc.SentEvents),
)),
},
[]grpc.ServerOption{
grpc.UnaryInterceptor(otelgrpc.UnaryServerInterceptor(otelgrpc.WithTracerProvider(serverUnaryTP), otelgrpc.WithMeterProvider(serverUnaryMP))),
grpc.StreamInterceptor(otelgrpc.StreamServerInterceptor(otelgrpc.WithTracerProvider(serverStreamTP))),
grpc.UnaryInterceptor(otelgrpc.UnaryServerInterceptor(
otelgrpc.WithTracerProvider(serverUnaryTP),
otelgrpc.WithMeterProvider(serverUnaryMP),
otelgrpc.WithMessageEvents(otelgrpc.ReceivedEvents, otelgrpc.SentEvents),
)),
grpc.StreamInterceptor(otelgrpc.StreamServerInterceptor(
otelgrpc.WithTracerProvider(serverStreamTP),
otelgrpc.WithMessageEvents(otelgrpc.ReceivedEvents, otelgrpc.SentEvents),
)),
},
))

Expand Down