Skip to content

Commit

Permalink
otelgrpc: Use metricdatatest in tests (#4499)
Browse files Browse the repository at this point in the history
  • Loading branch information
pellared committed Nov 2, 2023
1 parent bead7e4 commit badf346
Show file tree
Hide file tree
Showing 2 changed files with 85 additions and 65 deletions.
64 changes: 40 additions & 24 deletions instrumentation/google.golang.org/grpc/otelgrpc/test/grpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,21 @@ import (

"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/sdk/instrumentation"
"go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
"go.opentelemetry.io/otel/sdk/metric/metricdata/metricdatatest"
"go.opentelemetry.io/otel/sdk/trace"
"go.opentelemetry.io/otel/sdk/trace/tracetest"
semconv "go.opentelemetry.io/otel/semconv/v1.17.0"
)

var wantInstrumentationScope = instrumentation.Scope{
Name: "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc",
SchemaURL: "https://opentelemetry.io/schemas/1.17.0",
Version: otelgrpc.Version(),
}

const bufSize = 2048

func doCalls(cOpt []grpc.DialOption, sOpt []grpc.ServerOption) error {
Expand Down Expand Up @@ -599,32 +607,40 @@ func assertEvents(t *testing.T, expected, actual []trace.Event) bool {
}

func checkUnaryServerRecords(t *testing.T, reader metric.Reader) {
want := metricdata.ScopeMetrics{
Scope: wantInstrumentationScope,
Metrics: []metricdata.Metrics{
{
Name: "rpc.server.duration",
Description: "Measures the duration of inbound RPC.",
Unit: "ms",
Data: metricdata.Histogram[int64]{
Temporality: metricdata.CumulativeTemporality,
DataPoints: []metricdata.HistogramDataPoint[int64]{
{
Attributes: attribute.NewSet(
semconv.RPCMethod("EmptyCall"),
semconv.RPCService("grpc.testing.TestService"),
otelgrpc.RPCSystemGRPC,
otelgrpc.GRPCStatusCodeKey.Int64(int64(codes.OK)),
),
},
{
Attributes: attribute.NewSet(
semconv.RPCMethod("UnaryCall"),
semconv.RPCService("grpc.testing.TestService"),
otelgrpc.RPCSystemGRPC,
otelgrpc.GRPCStatusCodeKey.Int64(int64(codes.OK)),
),
},
},
},
},
},
}
rm := metricdata.ResourceMetrics{}
err := reader.Collect(context.Background(), &rm)
assert.NoError(t, err)
require.Len(t, rm.ScopeMetrics, 1)
require.Len(t, rm.ScopeMetrics[0].Metrics, 1)
require.IsType(t, rm.ScopeMetrics[0].Metrics[0].Data, metricdata.Histogram[int64]{})
data := rm.ScopeMetrics[0].Metrics[0].Data.(metricdata.Histogram[int64])

for _, dpt := range data.DataPoints {
attr := dpt.Attributes.ToSlice()
method := getRPCMethod(attr)
assert.NotEmpty(t, method)
assert.ElementsMatch(t, []attribute.KeyValue{
semconv.RPCMethod(method),
semconv.RPCService("grpc.testing.TestService"),
otelgrpc.RPCSystemGRPC,
otelgrpc.GRPCStatusCodeKey.Int64(int64(codes.OK)),
}, attr)
}
}

func getRPCMethod(attrs []attribute.KeyValue) string {
for _, kvs := range attrs {
if kvs.Key == semconv.RPCMethodKey {
return kvs.Value.AsString()
}
}
return ""
metricdatatest.AssertEqual(t, want, rm.ScopeMetrics[0], metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreValue())
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
"go.opentelemetry.io/otel/sdk/metric/metricdata/metricdatatest"
"go.opentelemetry.io/otel/sdk/trace"
"go.opentelemetry.io/otel/sdk/trace/tracetest"
semconv "go.opentelemetry.io/otel/semconv/v1.17.0"
Expand All @@ -54,19 +55,6 @@ func getSpanFromRecorder(sr *tracetest.SpanRecorder, name string) (trace.ReadOnl
return nil, false
}

func getMetricFromData(data metricdata.Histogram[int64], name string) (*metricdata.HistogramDataPoint[int64], bool) {
for _, d := range data.DataPoints {
v, ok := d.Attributes.Value(semconv.RPCMethodKey)
if !ok {
return nil, false
}
if semconv.RPCMethod(name).Value == v {
return &d, true
}
}
return nil, false
}

type mockUICInvoker struct {
ctx context.Context
}
Expand Down Expand Up @@ -880,18 +868,20 @@ func assertServerSpan(t *testing.T, wantSpanCode codes.Code, wantSpanStatusDescr

// TestUnaryServerInterceptor tests the server interceptor for unary RPCs.
func TestUnaryServerInterceptor(t *testing.T) {
sr := tracetest.NewSpanRecorder()
tp := trace.NewTracerProvider(trace.WithSpanProcessor(sr))
mr := metric.NewManualReader()
mp := metric.NewMeterProvider(metric.WithReader(mr))
usi := otelgrpc.UnaryServerInterceptor(
otelgrpc.WithTracerProvider(tp),
otelgrpc.WithMeterProvider(mp),
)

for _, check := range serverChecks {
name := check.grpcCode.String()
t.Run(name, func(t *testing.T) {
sr := tracetest.NewSpanRecorder()
tp := trace.NewTracerProvider(trace.WithSpanProcessor(sr))

mr := metric.NewManualReader()
mp := metric.NewMeterProvider(metric.WithReader(mr))

usi := otelgrpc.UnaryServerInterceptor(
otelgrpc.WithTracerProvider(tp),
otelgrpc.WithMeterProvider(mp),
)

serviceName := "TestGrpcService"
methodName := serviceName + "/" + name
fullMethodName := "/" + methodName
Expand All @@ -909,7 +899,7 @@ func TestUnaryServerInterceptor(t *testing.T) {
assertServerSpan(t, check.wantSpanCode, check.wantSpanStatusDescription, check.grpcCode, span)

// validate metric
checkManualReaderRecords(t, mr, serviceName, name, check.grpcCode)
assertServerMetrics(t, mr, serviceName, name, check.grpcCode)
})
}
}
Expand Down Expand Up @@ -1002,14 +992,16 @@ func (m *mockServerStream) RecvMsg(_ interface{}) error {

// TestStreamServerInterceptor tests the server interceptor for streaming RPCs.
func TestStreamServerInterceptor(t *testing.T) {
sr := tracetest.NewSpanRecorder()
tp := trace.NewTracerProvider(trace.WithSpanProcessor(sr))
usi := otelgrpc.StreamServerInterceptor(
otelgrpc.WithTracerProvider(tp),
)
for _, check := range serverChecks {
name := check.grpcCode.String()
t.Run(name, func(t *testing.T) {
sr := tracetest.NewSpanRecorder()
tp := trace.NewTracerProvider(trace.WithSpanProcessor(sr))

usi := otelgrpc.StreamServerInterceptor(
otelgrpc.WithTracerProvider(tp),
)

// call the stream interceptor
grpcErr := status.Error(check.grpcCode, check.grpcCode.String())
handler := func(_ interface{}, _ grpc.ServerStream) error {
Expand Down Expand Up @@ -1095,21 +1087,33 @@ func TestStreamServerInterceptorEvents(t *testing.T) {
}
}

func checkManualReaderRecords(t *testing.T, reader metric.Reader, serviceName, name string, code grpc_codes.Code) {
func assertServerMetrics(t *testing.T, reader metric.Reader, serviceName, name string, code grpc_codes.Code) {
want := metricdata.ScopeMetrics{
Scope: wantInstrumentationScope,
Metrics: []metricdata.Metrics{
{
Name: "rpc.server.duration",
Description: "Measures the duration of inbound RPC.",
Unit: "ms",
Data: metricdata.Histogram[int64]{
Temporality: metricdata.CumulativeTemporality,
DataPoints: []metricdata.HistogramDataPoint[int64]{
{
Attributes: attribute.NewSet(
semconv.RPCMethod(name),
semconv.RPCService(serviceName),
otelgrpc.RPCSystemGRPC,
otelgrpc.GRPCStatusCodeKey.Int64(int64(code)),
),
},
},
},
},
},
}
rm := metricdata.ResourceMetrics{}
err := reader.Collect(context.Background(), &rm)
assert.NoError(t, err)
require.Len(t, rm.ScopeMetrics, 1)
require.Len(t, rm.ScopeMetrics[0].Metrics, 1)
require.IsType(t, rm.ScopeMetrics[0].Metrics[0].Data, metricdata.Histogram[int64]{})
data := rm.ScopeMetrics[0].Metrics[0].Data.(metricdata.Histogram[int64])
dpt, ok := getMetricFromData(data, name)
assert.True(t, ok)
attr := dpt.Attributes.ToSlice()
assert.ElementsMatch(t, []attribute.KeyValue{
semconv.RPCMethod(name),
semconv.RPCService(serviceName),
otelgrpc.RPCSystemGRPC,
otelgrpc.GRPCStatusCodeKey.Int64(int64(code)),
}, attr)
metricdatatest.AssertEqual(t, want, rm.ScopeMetrics[0], metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreValue())
}

0 comments on commit badf346

Please sign in to comment.