Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(otelgrpc): correctly assign grpc status code #4481

Merged
merged 7 commits into from Oct 27, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Expand Up @@ -28,6 +28,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
### Fixed

- The `go.opentelemetry.io/contrib/samplers/jaegerremote` sampler does not panic when the default HTTP round-tripper (`http.DefaultTransport`) is not `*http.Transport`. (#4045)
- The `UnaryServerInterceptor` in `go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc` now sets gRPC status code correctly for the `rpc.server.duration` metric. (#4481)

## [1.20.0/0.45.0/0.14.0] - 2023-09-28

Expand Down
17 changes: 9 additions & 8 deletions instrumentation/google.golang.org/grpc/otelgrpc/interceptor.go
Expand Up @@ -366,31 +366,32 @@ func UnaryServerInterceptor(opts ...Option) grpc.UnaryServerInterceptor {
messageReceived.Event(ctx, 1, req)
}

var statusCode grpc_codes.Code
defer func(t time.Time) {
elapsedTime := time.Since(t) / time.Millisecond
attr = append(attr, semconv.RPCGRPCStatusCodeKey.Int64(int64(statusCode)))
o := metric.WithAttributes(attr...)
cfg.rpcServerDuration.Record(ctx, int64(elapsedTime), o)
}(time.Now())
before := time.Now()
var grpcStatusCode grpc_codes.Code

resp, err := handler(ctx, req)
if err != nil {
s, _ := status.FromError(err)
grpcStatusCode = s.Code()
statusCode, msg := serverStatus(s)
span.SetStatus(statusCode, msg)
span.SetAttributes(statusCodeAttr(s.Code()))
MrAlias marked this conversation as resolved.
Show resolved Hide resolved
if cfg.SentEvent {
messageSent.Event(ctx, 1, s.Proto())
}
} else {
statusCode = grpc_codes.OK
grpcStatusCode = grpc_codes.OK
span.SetAttributes(statusCodeAttr(grpc_codes.OK))
if cfg.SentEvent {
messageSent.Event(ctx, 1, resp)
}
}

elapsedTime := time.Since(before).Milliseconds()
attr = append(attr, semconv.RPCGRPCStatusCodeKey.Int64(int64(grpcStatusCode)))
liufuyang marked this conversation as resolved.
Show resolved Hide resolved
o := metric.WithAttributes(attr...)
cfg.rpcServerDuration.Record(ctx, elapsedTime, o)
liufuyang marked this conversation as resolved.
Show resolved Hide resolved

return resp, err
}
}
Expand Down
Expand Up @@ -26,6 +26,8 @@ import (
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
"go.opentelemetry.io/otel/sdk/trace"
"go.opentelemetry.io/otel/sdk/trace/tracetest"
semconv "go.opentelemetry.io/otel/semconv/v1.17.0"
Expand All @@ -52,6 +54,19 @@ func getSpanFromRecorder(sr *tracetest.SpanRecorder, name string) (trace.ReadOnl
return nil, false
}

func getMetricFromData(data metricdata.Histogram[int64], name string) (*metricdata.HistogramDataPoint[int64], bool) {
for _, d := range data.DataPoints {
v, ok := d.Attributes.Value(semconv.RPCMethodKey)
if !ok {
return nil, false
}
if semconv.RPCMethod(name).Value == v {
return &d, true
}
}
return nil, false
}

type mockUICInvoker struct {
ctx context.Context
}
Expand Down Expand Up @@ -867,24 +882,34 @@ func assertServerSpan(t *testing.T, wantSpanCode codes.Code, wantSpanStatusDescr
func TestUnaryServerInterceptor(t *testing.T) {
sr := tracetest.NewSpanRecorder()
tp := trace.NewTracerProvider(trace.WithSpanProcessor(sr))
mr := metric.NewManualReader()
mp := metric.NewMeterProvider(metric.WithReader(mr))
usi := otelgrpc.UnaryServerInterceptor(
otelgrpc.WithTracerProvider(tp),
otelgrpc.WithMeterProvider(mp),
)

for _, check := range serverChecks {
name := check.grpcCode.String()
t.Run(name, func(t *testing.T) {
serviceName := "TestGrpcService"
methodName := serviceName + "/" + name
fullMethodName := "/" + methodName
// call the unary interceptor
grpcErr := status.Error(check.grpcCode, check.grpcCode.String())
handler := func(_ context.Context, _ interface{}) (interface{}, error) {
return nil, grpcErr
}
_, err := usi(context.Background(), &grpc_testing.SimpleRequest{}, &grpc.UnaryServerInfo{FullMethod: name}, handler)
_, err := usi(context.Background(), &grpc_testing.SimpleRequest{}, &grpc.UnaryServerInfo{FullMethod: fullMethodName}, handler)
assert.Equal(t, grpcErr, err)

// validate span
span, ok := getSpanFromRecorder(sr, name)
require.True(t, ok, "missing span %s", name)
span, ok := getSpanFromRecorder(sr, methodName)
require.True(t, ok, "missing span %s", methodName)
assertServerSpan(t, check.wantSpanCode, check.wantSpanStatusDescription, check.grpcCode, span)

// validate metric
checkManualReaderRecords(t, mr, serviceName, name, check.grpcCode)
})
}
}
Expand Down Expand Up @@ -1069,3 +1094,22 @@ func TestStreamServerInterceptorEvents(t *testing.T) {
})
}
}

func checkManualReaderRecords(t *testing.T, reader metric.Reader, serviceName, name string, code grpc_codes.Code) {
rm := metricdata.ResourceMetrics{}
err := reader.Collect(context.Background(), &rm)
assert.NoError(t, err)
require.Len(t, rm.ScopeMetrics, 1)
require.Len(t, rm.ScopeMetrics[0].Metrics, 1)
require.IsType(t, rm.ScopeMetrics[0].Metrics[0].Data, metricdata.Histogram[int64]{})
data := rm.ScopeMetrics[0].Metrics[0].Data.(metricdata.Histogram[int64])
dpt, ok := getMetricFromData(data, name)
assert.True(t, ok)
attr := dpt.Attributes.ToSlice()
assert.ElementsMatch(t, []attribute.KeyValue{
semconv.RPCMethod(name),
semconv.RPCService(serviceName),
otelgrpc.RPCSystemGRPC,
otelgrpc.GRPCStatusCodeKey.Int64(int64(code)),
}, attr)
pellared marked this conversation as resolved.
Show resolved Hide resolved
}