Skip to content

Commit

Permalink
fix(otelgrpc): correctly assign grpc status code
Browse files Browse the repository at this point in the history
  • Loading branch information
liufuyang committed Oct 25, 2023
1 parent 5adc271 commit 7660db2
Show file tree
Hide file tree
Showing 3 changed files with 61 additions and 14 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Expand Up @@ -28,6 +28,8 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
### Fixed

- The `go.opentelemetry.io/contrib/samplers/jaegerremote` sampler does not panic when the default HTTP round-tripper (`http.DefaultTransport`) is not `*http.Transport`. (#4045)
- The `UnaryServerInterceptor` in `go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc`
- does not set gRPC status code correctly for metric `rpcServerDuration` (#4481)

## [1.20.0/0.45.0/0.14.0] - 2023-09-28

Expand Down
17 changes: 9 additions & 8 deletions instrumentation/google.golang.org/grpc/otelgrpc/interceptor.go
Expand Up @@ -366,31 +366,32 @@ func UnaryServerInterceptor(opts ...Option) grpc.UnaryServerInterceptor {
messageReceived.Event(ctx, 1, req)
}

var statusCode grpc_codes.Code
defer func(t time.Time) {
elapsedTime := time.Since(t) / time.Millisecond
attr = append(attr, semconv.RPCGRPCStatusCodeKey.Int64(int64(statusCode)))
o := metric.WithAttributes(attr...)
cfg.rpcServerDuration.Record(ctx, int64(elapsedTime), o)
}(time.Now())
before := time.Now()
var grpcStatusCode grpc_codes.Code

resp, err := handler(ctx, req)
if err != nil {
s, _ := status.FromError(err)
grpcStatusCode = s.Code()
statusCode, msg := serverStatus(s)
span.SetStatus(statusCode, msg)
span.SetAttributes(statusCodeAttr(s.Code()))
if cfg.SentEvent {
messageSent.Event(ctx, 1, s.Proto())
}
} else {
statusCode = grpc_codes.OK
grpcStatusCode = grpc_codes.OK
span.SetAttributes(statusCodeAttr(grpc_codes.OK))
if cfg.SentEvent {
messageSent.Event(ctx, 1, resp)
}
}

elapsedTime := time.Since(before).Milliseconds()
attr = append(attr, semconv.RPCGRPCStatusCodeKey.Int64(int64(grpcStatusCode)))
o := metric.WithAttributes(attr...)
cfg.rpcServerDuration.Record(ctx, elapsedTime, o)

return resp, err
}
}
Expand Down
Expand Up @@ -23,16 +23,18 @@ import (
"testing"
"time"

"go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/metric/metricdata"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/sdk/trace"
"go.opentelemetry.io/otel/sdk/trace/tracetest"
semconv "go.opentelemetry.io/otel/semconv/v1.17.0"
oteltrace "go.opentelemetry.io/otel/trace"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/goleak"
"google.golang.org/grpc"
grpc_codes "google.golang.org/grpc/codes"
Expand All @@ -52,6 +54,19 @@ func getSpanFromRecorder(sr *tracetest.SpanRecorder, name string) (trace.ReadOnl
return nil, false
}

func getMetricFromData(data metricdata.Histogram[int64], name string) (*metricdata.HistogramDataPoint[int64], bool) {
for _, d := range data.DataPoints {
v, ok := d.Attributes.Value(semconv.RPCMethodKey)
if !ok {
return nil, false
}
if semconv.RPCMethod(name).Value == v {
return &d, true
}
}
return nil, false
}

type mockUICInvoker struct {
ctx context.Context
}
Expand Down Expand Up @@ -867,24 +882,34 @@ func assertServerSpan(t *testing.T, wantSpanCode codes.Code, wantSpanStatusDescr
func TestUnaryServerInterceptor(t *testing.T) {
sr := tracetest.NewSpanRecorder()
tp := trace.NewTracerProvider(trace.WithSpanProcessor(sr))
mr := metric.NewManualReader()
mp := metric.NewMeterProvider(metric.WithReader(mr))
usi := otelgrpc.UnaryServerInterceptor(
otelgrpc.WithTracerProvider(tp),
otelgrpc.WithMeterProvider(mp),
)

for _, check := range serverChecks {
name := check.grpcCode.String()
t.Run(name, func(t *testing.T) {
serviceName := "TestGrpcService"
methodName := serviceName + "/" + name
fullMethodName := "/" + methodName
// call the unary interceptor
grpcErr := status.Error(check.grpcCode, check.grpcCode.String())
handler := func(_ context.Context, _ interface{}) (interface{}, error) {
return nil, grpcErr
}
_, err := usi(context.Background(), &grpc_testing.SimpleRequest{}, &grpc.UnaryServerInfo{FullMethod: name}, handler)
_, err := usi(context.Background(), &grpc_testing.SimpleRequest{}, &grpc.UnaryServerInfo{FullMethod: fullMethodName}, handler)
assert.Equal(t, grpcErr, err)

// validate span
span, ok := getSpanFromRecorder(sr, name)
require.True(t, ok, "missing span %s", name)
span, ok := getSpanFromRecorder(sr, methodName)
require.True(t, ok, "missing span %s", methodName)
assertServerSpan(t, check.wantSpanCode, check.wantSpanStatusDescription, check.grpcCode, span)

// validate metric
checkManualReaderRecords(t, mr, serviceName, name, check.grpcCode)
})
}
}
Expand Down Expand Up @@ -1069,3 +1094,22 @@ func TestStreamServerInterceptorEvents(t *testing.T) {
})
}
}

func checkManualReaderRecords(t *testing.T, reader metric.Reader, serviceName, name string, code grpc_codes.Code) {
rm := metricdata.ResourceMetrics{}
err := reader.Collect(context.Background(), &rm)
assert.NoError(t, err)
require.Len(t, rm.ScopeMetrics, 1)
require.Len(t, rm.ScopeMetrics[0].Metrics, 1)
require.IsType(t, rm.ScopeMetrics[0].Metrics[0].Data, metricdata.Histogram[int64]{})
data := rm.ScopeMetrics[0].Metrics[0].Data.(metricdata.Histogram[int64])
dpt, ok := getMetricFromData(data, name)
assert.True(t, ok)
attr := dpt.Attributes.ToSlice()
assert.ElementsMatch(t, []attribute.KeyValue{
semconv.RPCMethod(name),
semconv.RPCService(serviceName),
otelgrpc.RPCSystemGRPC,
otelgrpc.GRPCStatusCodeKey.Int64(int64(code)),
}, attr)
}

0 comments on commit 7660db2

Please sign in to comment.