Skip to content

Commit

Permalink
add more test
Browse files Browse the repository at this point in the history
Signed-off-by: Ziqi Zhao <zhaoziqi9146@gmail.com>
  • Loading branch information
fatsheep9146 committed Oct 7, 2023
1 parent e69b846 commit 7c3ae17
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 54 deletions.
46 changes: 17 additions & 29 deletions instrumentation/google.golang.org/grpc/otelgrpc/stats_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ type gRPCContextKey struct{}
type gRPCContext struct {
messagesReceived int64
messagesSent int64
requestSize int64
responseSize int64
metricAttrs []attribute.KeyValue
}

Expand Down Expand Up @@ -222,7 +224,9 @@ func (h *clientHandler) TagRPC(ctx context.Context, info *stats.RPCTagInfo) cont
trace.WithAttributes(attrs...),
)

gctx := gRPCContext{}
gctx := gRPCContext{
metricAttrs: attrs,
}

return inject(context.WithValue(ctx, gRPCContextKey{}, &gctx), h.config.Propagators)
}
Expand Down Expand Up @@ -250,15 +254,12 @@ func (r *handler) handleRPC(ctx context.Context, rs stats.RPCStats) {
gctx, _ := ctx.Value(gRPCContextKey{}).(*gRPCContext)
var messageId int64

metricAttrs := make([]attribute.KeyValue, 0, len(gctx.metricAttrs)+1)
metricAttrs = append(metricAttrs, gctx.metricAttrs...)

switch rs := rs.(type) {
case *stats.Begin:
case *stats.InPayload:
if gctx != nil {
messageId = atomic.AddInt64(&gctx.messagesReceived, 1)
r.rpcRequestSize.Record(context.TODO(), int64(rs.Length), metric.WithAttributes(metricAttrs...))
atomic.StoreInt64(&gctx.requestSize, int64(rs.Length))
}
span.AddEvent("message",
trace.WithAttributes(
Expand All @@ -271,7 +272,7 @@ func (r *handler) handleRPC(ctx context.Context, rs stats.RPCStats) {
case *stats.OutPayload:
if gctx != nil {
messageId = atomic.AddInt64(&gctx.messagesSent, 1)
r.rpcResponseSize.Record(context.TODO(), int64(rs.Length), metric.WithAttributes(metricAttrs...))
atomic.StoreInt64(&gctx.responseSize, int64(rs.Length))
}

span.AddEvent("message",
Expand All @@ -284,38 +285,25 @@ func (r *handler) handleRPC(ctx context.Context, rs stats.RPCStats) {
)
case *stats.End:
var rpcStatusAttr attribute.KeyValue
metricAttrs := make([]attribute.KeyValue, 0, len(gctx.metricAttrs)+1)
metricAttrs = append(metricAttrs, gctx.metricAttrs...)

if rs.Error != nil {
s, _ := status.FromError(rs.Error)
span.SetStatus(codes.Error, s.Message())
rpcStatusAttr = semconv.RPCGRPCStatusCodeKey.Int(int(s.Code()))
} else {
rpcStatusAttr = semconv.RPCGRPCStatusCodeKey.Int(int(grpc_codes.OK))
}
metricAttrs = append(metricAttrs, rpcStatusAttr)

span.SetAttributes(rpcStatusAttr)
if rs.Error != nil {
s, _ := status.FromError(rs.Error)
span.SetStatus(codes.Error, s.Message())
}
span.End()

r.rpcDuration.Record(
context.TODO(),
int64(rs.EndTime.Sub(rs.BeginTime)),
metric.WithAttributes(metricAttrs...),
)

r.rpcRequestsPerRPC.Record(
context.TODO(),
int64(gctx.messagesReceived),
metric.WithAttributes(metricAttrs...),
)

r.rpcResponsesPerRPC.Record(
context.TODO(),
int64(gctx.messagesSent),
metric.WithAttributes(metricAttrs...),
)
metricAttrs = append(metricAttrs, rpcStatusAttr)
r.rpcDuration.Record(context.TODO(), int64(rs.EndTime.Sub(rs.BeginTime)), metric.WithAttributes(metricAttrs...))
r.rpcRequestSize.Record(context.TODO(), gctx.requestSize, metric.WithAttributes(metricAttrs...))
r.rpcResponseSize.Record(context.TODO(), gctx.responseSize, metric.WithAttributes(metricAttrs...))
r.rpcRequestsPerRPC.Record(context.TODO(), gctx.messagesReceived, metric.WithAttributes(metricAttrs...))
r.rpcResponsesPerRPC.Record(context.TODO(), gctx.messagesSent, metric.WithAttributes(metricAttrs...))

default:
return
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ package test

import (
"context"
"strings"
"testing"

"github.com/stretchr/testify/assert"
Expand All @@ -33,7 +32,7 @@ import (
semconv "go.opentelemetry.io/otel/semconv/v1.17.0"
)

func TestStatsHandlerMix(t *testing.T) {
func TestStatsHandler(t *testing.T) {
clientSR := tracetest.NewSpanRecorder()
clientTP := trace.NewTracerProvider(trace.WithSpanProcessor(clientSR))
clientMetricReader := metric.NewManualReader()
Expand All @@ -53,11 +52,12 @@ func TestStatsHandlerMix(t *testing.T) {
},
))

t.Run("ClientSpans", func(t *testing.T) {
t.Run("Client", func(t *testing.T) {
checkClientSpans(t, clientSR.Ended())
checkClientRecords(t, clientMetricReader)
})

t.Run("ServerSpans", func(t *testing.T) {
t.Run("Server", func(t *testing.T) {
checkServerSpans(t, serverSR.Ended())
checkServerRecords(t, serverMetricReader)
})
Expand Down Expand Up @@ -598,30 +598,39 @@ func checkServerRecords(t *testing.T, reader metric.Reader) {
for _, metric := range rm.ScopeMetrics[0].Metrics {
require.IsType(t, metric.Data, metricdata.Histogram[int64]{})
data := metric.Data.(metricdata.Histogram[int64])
expectedKeys := make([]string, 0)
switch {
case strings.HasSuffix(metric.Name, "duration"), strings.HasSuffix(metric.Name, "requests_per_rpc"), strings.HasSuffix(metric.Name, "responses_per_rpc"):
expectedKeys = []string{
string(semconv.RPCMethodKey),
string(semconv.RPCServiceKey),
string(semconv.RPCSystemKey),
string(otelgrpc.GRPCStatusCodeKey),
}
case strings.HasSuffix(metric.Name, "request.size"), strings.HasSuffix(metric.Name, "response.size"):
expectedKeys = []string{
string(semconv.RPCMethodKey),
string(semconv.RPCServiceKey),
string(semconv.RPCSystemKey),
}
for _, dpt := range data.DataPoints {
attr := dpt.Attributes.ToSlice()
method := getRPCMethod(attr)
assert.NotEmpty(t, method)
assert.ElementsMatch(t, []attribute.KeyValue{
semconv.RPCMethod(method),
semconv.RPCService("grpc.testing.TestService"),
otelgrpc.RPCSystemGRPC,
otelgrpc.GRPCStatusCodeKey.Int64(int64(codes.OK)),
}, attr)
}
}
}

func checkClientRecords(t *testing.T, reader metric.Reader) {
rm := metricdata.ResourceMetrics{}
err := reader.Collect(context.Background(), &rm)
assert.NoError(t, err)
require.Len(t, rm.ScopeMetrics, 1)
require.Len(t, rm.ScopeMetrics[0].Metrics, 5)
for _, metric := range rm.ScopeMetrics[0].Metrics {
require.IsType(t, metric.Data, metricdata.Histogram[int64]{})
data := metric.Data.(metricdata.Histogram[int64])
for _, dpt := range data.DataPoints {
attrs := dpt.Attributes.ToSlice()
keys := make([]string, 0, len(attrs))
for _, attr := range attrs {
keys = append(keys, string(attr.Key))
}
assert.ElementsMatch(t, expectedKeys, keys)
attr := dpt.Attributes.ToSlice()
method := getRPCMethod(attr)
assert.NotEmpty(t, method)
assert.ElementsMatch(t, []attribute.KeyValue{
semconv.RPCMethod(method),
semconv.RPCService("grpc.testing.TestService"),
otelgrpc.RPCSystemGRPC,
otelgrpc.GRPCStatusCodeKey.Int64(int64(codes.OK)),
}, attr)
}
}
}

0 comments on commit 7c3ae17

Please sign in to comment.