Skip to content

Commit

Permalink
otelgrpc stats.Handler support metric
Browse files Browse the repository at this point in the history
Signed-off-by: Ziqi Zhao <zhaoziqi9146@gmail.com>
  • Loading branch information
fatsheep9146 committed Oct 1, 2023
1 parent 23bf4c9 commit 2218a89
Show file tree
Hide file tree
Showing 2 changed files with 198 additions and 16 deletions.
162 changes: 149 additions & 13 deletions instrumentation/google.golang.org/grpc/otelgrpc/stats_handler.go
Expand Up @@ -23,7 +23,10 @@ import (
"google.golang.org/grpc/status"

"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc/internal"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/metric"
semconv "go.opentelemetry.io/otel/semconv/v1.17.0"
"go.opentelemetry.io/otel/trace"
)
Expand All @@ -33,84 +36,186 @@ type gRPCContextKey struct{}
type gRPCContext struct {
messagesReceived int64
messagesSent int64
metricAttrs []attribute.KeyValue
}

// NewServerHandler creates a stats.Handler for gRPC server.
func NewServerHandler(opts ...Option) stats.Handler {
h := &serverHandler{
config: newConfig(opts),
r: &handler{},
}

h.tracer = h.config.TracerProvider.Tracer(
h.r.tracer = h.config.TracerProvider.Tracer(
instrumentationName,
trace.WithInstrumentationVersion(SemVersion()),
)
h.r.meter = h.config.MeterProvider.Meter(
instrumentationName,
metric.WithInstrumentationVersion(Version()),
metric.WithSchemaURL(semconv.SchemaURL),
)
var err error
h.r.rpcDuration, err = h.r.meter.Int64Histogram("rpc.server.duration",
metric.WithDescription("Measures the duration of inbound RPC."),
metric.WithUnit("ms"))
if err != nil {
otel.Handle(err)
}

Check warning on line 64 in instrumentation/google.golang.org/grpc/otelgrpc/stats_handler.go

View check run for this annotation

Codecov / codecov/patch

instrumentation/google.golang.org/grpc/otelgrpc/stats_handler.go#L63-L64

Added lines #L63 - L64 were not covered by tests

h.r.rpcRequestSize, err = h.r.meter.Int64Histogram("rpc.server.request.size",
metric.WithDescription("Measures size of RPC request messages (uncompressed)."),
metric.WithUnit("By"))
if err != nil {
otel.Handle(err)
}

Check warning on line 71 in instrumentation/google.golang.org/grpc/otelgrpc/stats_handler.go

View check run for this annotation

Codecov / codecov/patch

instrumentation/google.golang.org/grpc/otelgrpc/stats_handler.go#L70-L71

Added lines #L70 - L71 were not covered by tests

h.r.rpcResponseSize, err = h.r.meter.Int64Histogram("rpc.server.response.size",
metric.WithDescription("Measures size of RPC response messages (uncompressed)."),
metric.WithUnit("By"))
if err != nil {
otel.Handle(err)
}

Check warning on line 78 in instrumentation/google.golang.org/grpc/otelgrpc/stats_handler.go

View check run for this annotation

Codecov / codecov/patch

instrumentation/google.golang.org/grpc/otelgrpc/stats_handler.go#L77-L78

Added lines #L77 - L78 were not covered by tests

h.r.rpcRequestsPerRPC, err = h.r.meter.Int64Histogram("rpc.server.requests_per_rpc",
metric.WithDescription("Measures the number of messages received per RPC. Should be 1 for all non-streaming RPCs."),
metric.WithUnit("{count}"))
if err != nil {
otel.Handle(err)
}

Check warning on line 85 in instrumentation/google.golang.org/grpc/otelgrpc/stats_handler.go

View check run for this annotation

Codecov / codecov/patch

instrumentation/google.golang.org/grpc/otelgrpc/stats_handler.go#L84-L85

Added lines #L84 - L85 were not covered by tests

h.r.rpcResponsesPerRPC, err = h.r.meter.Int64Histogram("rpc.server.responses_per_rpc",
metric.WithDescription("Measures the number of messages received per RPC. Should be 1 for all non-streaming RPCs."),
metric.WithUnit("{count}"))
if err != nil {
otel.Handle(err)
}

Check warning on line 92 in instrumentation/google.golang.org/grpc/otelgrpc/stats_handler.go

View check run for this annotation

Codecov / codecov/patch

instrumentation/google.golang.org/grpc/otelgrpc/stats_handler.go#L91-L92

Added lines #L91 - L92 were not covered by tests

return h
}

type handler struct {
tracer trace.Tracer
meter metric.Meter
rpcDuration metric.Int64Histogram
rpcRequestSize metric.Int64Histogram
rpcResponseSize metric.Int64Histogram
rpcRequestsPerRPC metric.Int64Histogram
rpcResponsesPerRPC metric.Int64Histogram
}

type serverHandler struct {
*config
tracer trace.Tracer
r *handler
}

// TagRPC can attach some information to the given context.
func (h *serverHandler) TagRPC(ctx context.Context, info *stats.RPCTagInfo) context.Context {
// fmt.Printf("server TagRPC, ctx.Err %v\n", ctx.Err())
ctx = extract(ctx, h.config.Propagators)

name, attrs := internal.ParseFullMethod(info.FullMethodName)
attrs = append(attrs, RPCSystemGRPC)
ctx, _ = h.tracer.Start(
ctx, _ = h.r.tracer.Start(
trace.ContextWithRemoteSpanContext(ctx, trace.SpanContextFromContext(ctx)),
name,
trace.WithSpanKind(trace.SpanKindServer),
trace.WithAttributes(attrs...),
)

gctx := gRPCContext{}
gctx := gRPCContext{
metricAttrs: attrs,
}
return context.WithValue(ctx, gRPCContextKey{}, &gctx)
}

// HandleRPC processes the RPC stats.
func (h *serverHandler) HandleRPC(ctx context.Context, rs stats.RPCStats) {
handleRPC(ctx, rs)
// fmt.Printf("server HandleRPC, ctx.Err %v\n", ctx.Err())
h.r.handleRPC(ctx, rs)
}

// TagConn can attach some information to the given context.
func (h *serverHandler) TagConn(ctx context.Context, info *stats.ConnTagInfo) context.Context {
// fmt.Printf("server TagConn, ctx.Err %v\n", ctx.Err())
span := trace.SpanFromContext(ctx)
attrs := peerAttr(peerFromCtx(ctx))
span.SetAttributes(attrs...)

return ctx
}

// HandleConn processes the Conn stats.
func (h *serverHandler) HandleConn(ctx context.Context, info stats.ConnStats) {
// fmt.Printf("server HandleConn, ctx.Err %v\n", ctx.Err())
}

// NewClientHandler creates a stats.Handler for gRPC client.
func NewClientHandler(opts ...Option) stats.Handler {
h := &clientHandler{
config: newConfig(opts),
r: &handler{},
}

h.tracer = h.config.TracerProvider.Tracer(
h.r.tracer = h.config.TracerProvider.Tracer(
instrumentationName,
trace.WithInstrumentationVersion(SemVersion()),
)

h.r.meter = h.config.MeterProvider.Meter(
instrumentationName,
metric.WithInstrumentationVersion(Version()),
metric.WithSchemaURL(semconv.SchemaURL),
)
var err error
h.r.rpcDuration, err = h.r.meter.Int64Histogram("rpc.client.duration",
metric.WithDescription("Measures the duration of inbound RPC."),
metric.WithUnit("ms"))
if err != nil {
otel.Handle(err)
}

Check warning on line 176 in instrumentation/google.golang.org/grpc/otelgrpc/stats_handler.go

View check run for this annotation

Codecov / codecov/patch

instrumentation/google.golang.org/grpc/otelgrpc/stats_handler.go#L175-L176

Added lines #L175 - L176 were not covered by tests

h.r.rpcRequestSize, err = h.r.meter.Int64Histogram("rpc.client.request.size",
metric.WithDescription("Measures size of RPC request messages (uncompressed)."),
metric.WithUnit("By"))
if err != nil {
otel.Handle(err)
}

Check warning on line 183 in instrumentation/google.golang.org/grpc/otelgrpc/stats_handler.go

View check run for this annotation

Codecov / codecov/patch

instrumentation/google.golang.org/grpc/otelgrpc/stats_handler.go#L182-L183

Added lines #L182 - L183 were not covered by tests

h.r.rpcResponseSize, err = h.r.meter.Int64Histogram("rpc.client.response.size",
metric.WithDescription("Measures size of RPC response messages (uncompressed)."),
metric.WithUnit("By"))
if err != nil {
otel.Handle(err)
}

Check warning on line 190 in instrumentation/google.golang.org/grpc/otelgrpc/stats_handler.go

View check run for this annotation

Codecov / codecov/patch

instrumentation/google.golang.org/grpc/otelgrpc/stats_handler.go#L189-L190

Added lines #L189 - L190 were not covered by tests

h.r.rpcRequestsPerRPC, err = h.r.meter.Int64Histogram("rpc.client.requests_per_rpc",
metric.WithDescription("Measures the number of messages received per RPC. Should be 1 for all non-streaming RPCs."),
metric.WithUnit("{count}"))
if err != nil {
otel.Handle(err)
}

Check warning on line 197 in instrumentation/google.golang.org/grpc/otelgrpc/stats_handler.go

View check run for this annotation

Codecov / codecov/patch

instrumentation/google.golang.org/grpc/otelgrpc/stats_handler.go#L196-L197

Added lines #L196 - L197 were not covered by tests

h.r.rpcResponsesPerRPC, err = h.r.meter.Int64Histogram("rpc.client.responses_per_rpc",
metric.WithDescription("Measures the number of messages received per RPC. Should be 1 for all non-streaming RPCs."),
metric.WithUnit("{count}"))
if err != nil {
otel.Handle(err)
}

Check warning on line 204 in instrumentation/google.golang.org/grpc/otelgrpc/stats_handler.go

View check run for this annotation

Codecov / codecov/patch

instrumentation/google.golang.org/grpc/otelgrpc/stats_handler.go#L203-L204

Added lines #L203 - L204 were not covered by tests

return h
}

type clientHandler struct {
*config
tracer trace.Tracer
r *handler
}

// TagRPC can attach some information to the given context.
func (h *clientHandler) TagRPC(ctx context.Context, info *stats.RPCTagInfo) context.Context {
name, attrs := internal.ParseFullMethod(info.FullMethodName)
attrs = append(attrs, RPCSystemGRPC)
ctx, _ = h.tracer.Start(
ctx, _ = h.r.tracer.Start(
ctx,
name,
trace.WithSpanKind(trace.SpanKindClient),
Expand All @@ -124,7 +229,7 @@ func (h *clientHandler) TagRPC(ctx context.Context, info *stats.RPCTagInfo) cont

// HandleRPC processes the RPC stats.
func (h *clientHandler) HandleRPC(ctx context.Context, rs stats.RPCStats) {
handleRPC(ctx, rs)
h.r.handleRPC(ctx, rs)
}

// TagConn can attach some information to the given context.
Expand All @@ -140,16 +245,20 @@ func (h *clientHandler) HandleConn(context.Context, stats.ConnStats) {
// no-op
}

func handleRPC(ctx context.Context, rs stats.RPCStats) {
func (r *handler) handleRPC(ctx context.Context, rs stats.RPCStats) {
span := trace.SpanFromContext(ctx)
gctx, _ := ctx.Value(gRPCContextKey{}).(*gRPCContext)
var messageId int64

metricAttrs := make([]attribute.KeyValue, 0, len(gctx.metricAttrs)+1)
metricAttrs = append(metricAttrs, gctx.metricAttrs...)

switch rs := rs.(type) {
case *stats.Begin:
case *stats.InPayload:
if gctx != nil {
messageId = atomic.AddInt64(&gctx.messagesReceived, 1)
r.rpcRequestSize.Record(context.TODO(), int64(rs.Length), metric.WithAttributes(metricAttrs...))
}
span.AddEvent("message",
trace.WithAttributes(
Expand All @@ -162,6 +271,7 @@ func handleRPC(ctx context.Context, rs stats.RPCStats) {
case *stats.OutPayload:
if gctx != nil {
messageId = atomic.AddInt64(&gctx.messagesSent, 1)
r.rpcResponseSize.Record(context.TODO(), int64(rs.Length), metric.WithAttributes(metricAttrs...))
}

span.AddEvent("message",
Expand All @@ -173,14 +283,40 @@ func handleRPC(ctx context.Context, rs stats.RPCStats) {
),
)
case *stats.End:
var rpcStatusAttr attribute.KeyValue
if rs.Error != nil {
s, _ := status.FromError(rs.Error)
span.SetStatus(codes.Error, s.Message())
span.SetAttributes(statusCodeAttr(s.Code()))
rpcStatusAttr = semconv.RPCGRPCStatusCodeKey.Int(int(s.Code()))

Check warning on line 289 in instrumentation/google.golang.org/grpc/otelgrpc/stats_handler.go

View check run for this annotation

Codecov / codecov/patch

instrumentation/google.golang.org/grpc/otelgrpc/stats_handler.go#L289

Added line #L289 was not covered by tests
} else {
span.SetAttributes(statusCodeAttr(grpc_codes.OK))
rpcStatusAttr = semconv.RPCGRPCStatusCodeKey.Int(int(grpc_codes.OK))
}
metricAttrs = append(metricAttrs, rpcStatusAttr)

span.SetAttributes(rpcStatusAttr)
if rs.Error != nil {
s, _ := status.FromError(rs.Error)
span.SetStatus(codes.Error, s.Message())

Check warning on line 298 in instrumentation/google.golang.org/grpc/otelgrpc/stats_handler.go

View check run for this annotation

Codecov / codecov/patch

instrumentation/google.golang.org/grpc/otelgrpc/stats_handler.go#L297-L298

Added lines #L297 - L298 were not covered by tests
}
span.End()

r.rpcDuration.Record(
context.TODO(),
int64(rs.EndTime.Sub(rs.BeginTime)),
metric.WithAttributes(metricAttrs...),
)

r.rpcRequestsPerRPC.Record(
context.TODO(),
int64(gctx.messagesReceived),
metric.WithAttributes(metricAttrs...),
)

r.rpcResponsesPerRPC.Record(
context.TODO(),
int64(gctx.messagesSent),
metric.WithAttributes(metricAttrs...),
)

default:
return
}
Expand Down
Expand Up @@ -15,6 +15,8 @@
package test

import (
"context"
"strings"
"testing"

"github.com/stretchr/testify/assert"
Expand All @@ -24,24 +26,30 @@ import (

"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
"go.opentelemetry.io/otel/sdk/trace"
"go.opentelemetry.io/otel/sdk/trace/tracetest"
semconv "go.opentelemetry.io/otel/semconv/v1.17.0"
)

func TestStatsHandler(t *testing.T) {
func TestStatsHandlerMix(t *testing.T) {
clientSR := tracetest.NewSpanRecorder()
clientTP := trace.NewTracerProvider(trace.WithSpanProcessor(clientSR))
clientMetricReader := metric.NewManualReader()
clientMP := metric.NewMeterProvider(metric.WithReader(clientMetricReader))

serverSR := tracetest.NewSpanRecorder()
serverTP := trace.NewTracerProvider(trace.WithSpanProcessor(serverSR))
serverMetricReader := metric.NewManualReader()
serverMP := metric.NewMeterProvider(metric.WithReader(serverMetricReader))

assert.NoError(t, doCalls(
[]grpc.DialOption{
grpc.WithStatsHandler(otelgrpc.NewClientHandler(otelgrpc.WithTracerProvider(clientTP))),
grpc.WithStatsHandler(otelgrpc.NewClientHandler(otelgrpc.WithTracerProvider(clientTP), otelgrpc.WithMeterProvider(clientMP))),
},
[]grpc.ServerOption{
grpc.StatsHandler(otelgrpc.NewServerHandler(otelgrpc.WithTracerProvider(serverTP))),
grpc.StatsHandler(otelgrpc.NewServerHandler(otelgrpc.WithTracerProvider(serverTP), otelgrpc.WithMeterProvider(serverMP))),
},
))

Expand All @@ -51,6 +59,7 @@ func TestStatsHandler(t *testing.T) {

t.Run("ServerSpans", func(t *testing.T) {
checkServerSpans(t, serverSR.Ended())
checkServerRecords(t, serverMetricReader)
})
}

Expand Down Expand Up @@ -579,3 +588,40 @@ func checkServerSpans(t *testing.T, spans []trace.ReadOnlySpan) {
otelgrpc.GRPCStatusCodeKey.Int64(int64(codes.OK)),
}, pingPong.Attributes())
}

func checkServerRecords(t *testing.T, reader metric.Reader) {
rm := metricdata.ResourceMetrics{}
err := reader.Collect(context.Background(), &rm)
assert.NoError(t, err)
require.Len(t, rm.ScopeMetrics, 1)
require.Len(t, rm.ScopeMetrics[0].Metrics, 5)
for _, metric := range rm.ScopeMetrics[0].Metrics {
require.IsType(t, metric.Data, metricdata.Histogram[int64]{})
data := metric.Data.(metricdata.Histogram[int64])
expectedKeys := make([]string, 0)
switch {
case strings.HasSuffix(metric.Name, "duration"), strings.HasSuffix(metric.Name, "requests_per_rpc"), strings.HasSuffix(metric.Name, "responses_per_rpc"):
expectedKeys = []string{
string(semconv.RPCMethodKey),
string(semconv.RPCServiceKey),
string(semconv.RPCSystemKey),
string(otelgrpc.GRPCStatusCodeKey),
}
case strings.HasSuffix(metric.Name, "request.size"), strings.HasSuffix(metric.Name, "response.size"):
expectedKeys = []string{
string(semconv.RPCMethodKey),
string(semconv.RPCServiceKey),
string(semconv.RPCSystemKey),
}
}

for _, dpt := range data.DataPoints {
attrs := dpt.Attributes.ToSlice()
keys := make([]string, 0, len(attrs))
for _, attr := range attrs {
keys = append(keys, string(attr.Key))
}
assert.ElementsMatch(t, expectedKeys, keys)
}
}
}

0 comments on commit 2218a89

Please sign in to comment.