Skip to content

Commit

Permalink
move all metrics into config
Browse files Browse the repository at this point in the history
Signed-off-by: Ziqi Zhao <zhaoziqi9146@gmail.com>
  • Loading branch information
fatsheep9146 committed Nov 3, 2023
1 parent 8c194cb commit ad22f06
Show file tree
Hide file tree
Showing 6 changed files with 68 additions and 134 deletions.
48 changes: 44 additions & 4 deletions instrumentation/google.golang.org/grpc/otelgrpc/config.go
Expand Up @@ -46,8 +46,14 @@ type config struct {
ReceivedEvent bool
SentEvent bool

meter metric.Meter
rpcServerDuration metric.Int64Histogram
tracer trace.Tracer
meter metric.Meter

rpcDuration metric.Float64Histogram
rpcRequestSize metric.Int64Histogram
rpcResponseSize metric.Int64Histogram
rpcRequestsPerRPC metric.Int64Histogram
rpcResponsesPerRPC metric.Int64Histogram
}

// Option applies an option value for a config.
Expand All @@ -56,7 +62,7 @@ type Option interface {
}

// newConfig returns a config configured with all the passed Options.
func newConfig(opts []Option) *config {
func newConfig(opts []Option, role string) *config {
c := &config{
Propagators: otel.GetTextMapPropagator(),
TracerProvider: otel.GetTracerProvider(),
Expand All @@ -66,19 +72,53 @@ func newConfig(opts []Option) *config {
o.apply(c)
}

c.tracer = c.TracerProvider.Tracer(
instrumentationName,
trace.WithInstrumentationVersion(SemVersion()),
)

c.meter = c.MeterProvider.Meter(
instrumentationName,
metric.WithInstrumentationVersion(Version()),
metric.WithSchemaURL(semconv.SchemaURL),
)

var err error
c.rpcServerDuration, err = c.meter.Int64Histogram("rpc.server.duration",
c.rpcDuration, err = c.meter.Float64Histogram("rpc."+role+".duration",
metric.WithDescription("Measures the duration of inbound RPC."),
metric.WithUnit("ms"))
if err != nil {
otel.Handle(err)
}

c.rpcRequestSize, err = c.meter.Int64Histogram("rpc."+role+".request.size",
metric.WithDescription("Measures size of RPC request messages (uncompressed)."),
metric.WithUnit("By"))
if err != nil {
otel.Handle(err)
}

c.rpcResponseSize, err = c.meter.Int64Histogram("rpc."+role+".response.size",
metric.WithDescription("Measures size of RPC response messages (uncompressed)."),
metric.WithUnit("By"))
if err != nil {
otel.Handle(err)
}

c.rpcRequestsPerRPC, err = c.meter.Int64Histogram("rpc."+role+".requests_per_rpc",
metric.WithDescription("Measures the number of messages received per RPC. Should be 1 for all non-streaming RPCs."),
metric.WithUnit("{count}"))
if err != nil {
otel.Handle(err)
}

c.rpcResponsesPerRPC, err = c.meter.Int64Histogram("rpc."+role+".responses_per_rpc",
metric.WithDescription("Measures the number of messages received per RPC. Should be 1 for all non-streaming RPCs."),
metric.WithUnit("{count}"))
if err != nil {
otel.Handle(err)
}

return c
}

Expand Down
10 changes: 5 additions & 5 deletions instrumentation/google.golang.org/grpc/otelgrpc/interceptor.go
Expand Up @@ -61,7 +61,7 @@ var (
// UnaryClientInterceptor returns a grpc.UnaryClientInterceptor suitable
// for use in a grpc.Dial call.
func UnaryClientInterceptor(opts ...Option) grpc.UnaryClientInterceptor {
cfg := newConfig(opts)
cfg := newConfig(opts, "client")
tracer := cfg.TracerProvider.Tracer(
instrumentationName,
trace.WithInstrumentationVersion(Version()),
Expand Down Expand Up @@ -255,7 +255,7 @@ func (w *clientStream) sendStreamEvent(eventType streamEventType, err error) {
// StreamClientInterceptor returns a grpc.StreamClientInterceptor suitable
// for use in a grpc.Dial call.
func StreamClientInterceptor(opts ...Option) grpc.StreamClientInterceptor {
cfg := newConfig(opts)
cfg := newConfig(opts, "client")
tracer := cfg.TracerProvider.Tracer(
instrumentationName,
trace.WithInstrumentationVersion(Version()),
Expand Down Expand Up @@ -325,7 +325,7 @@ func StreamClientInterceptor(opts ...Option) grpc.StreamClientInterceptor {
// UnaryServerInterceptor returns a grpc.UnaryServerInterceptor suitable
// for use in a grpc.NewServer call.
func UnaryServerInterceptor(opts ...Option) grpc.UnaryServerInterceptor {
cfg := newConfig(opts)
cfg := newConfig(opts, "server")
tracer := cfg.TracerProvider.Tracer(
instrumentationName,
trace.WithInstrumentationVersion(Version()),
Expand Down Expand Up @@ -387,7 +387,7 @@ func UnaryServerInterceptor(opts ...Option) grpc.UnaryServerInterceptor {

elapsedTime := time.Since(before).Milliseconds()
attr = append(attr, grpcStatusCodeAttr)
cfg.rpcServerDuration.Record(ctx, elapsedTime, metric.WithAttributes(attr...))
cfg.rpcDuration.Record(ctx, float64(elapsedTime), metric.WithAttributes(attr...))

return resp, err
}
Expand Down Expand Up @@ -446,7 +446,7 @@ func wrapServerStream(ctx context.Context, ss grpc.ServerStream, cfg *config) *s
// StreamServerInterceptor returns a grpc.StreamServerInterceptor suitable
// for use in a grpc.NewServer call.
func StreamServerInterceptor(opts ...Option) grpc.StreamServerInterceptor {
cfg := newConfig(opts)
cfg := newConfig(opts, "server")
tracer := cfg.TracerProvider.Tracer(
instrumentationName,
trace.WithInstrumentationVersion(Version()),
Expand Down
Expand Up @@ -56,7 +56,7 @@ func (s *metadataSupplier) Keys() []string {
// requests.
// Deprecated: Unnecessary public func.
func Inject(ctx context.Context, md *metadata.MD, opts ...Option) {
c := newConfig(opts)
c := newConfig(opts, "")
c.Propagators.Inject(ctx, &metadataSupplier{
metadata: md,
})
Expand All @@ -78,7 +78,7 @@ func inject(ctx context.Context, propagators propagation.TextMapPropagator) cont
// This function is meant to be used on incoming requests.
// Deprecated: Unnecessary public func.
func Extract(ctx context.Context, md *metadata.MD, opts ...Option) (baggage.Baggage, trace.SpanContext) {
c := newConfig(opts)
c := newConfig(opts, "")
ctx = c.Propagators.Extract(ctx, &metadataSupplier{
metadata: md,
})
Expand Down
130 changes: 12 additions & 118 deletions instrumentation/google.golang.org/grpc/otelgrpc/stats_handler.go
Expand Up @@ -23,7 +23,6 @@ import (
"google.golang.org/grpc/status"

"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc/internal"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/metric"
Expand All @@ -39,71 +38,14 @@ type gRPCContext struct {
metricAttrs []attribute.KeyValue
}

type handler struct {
tracer trace.Tracer
meter metric.Meter
rpcDuration metric.Float64Histogram
rpcRequestSize metric.Int64Histogram
rpcResponseSize metric.Int64Histogram
rpcRequestsPerRPC metric.Int64Histogram
rpcResponsesPerRPC metric.Int64Histogram
}

type serverHandler struct {
*config
r *handler
}

// NewServerHandler creates a stats.Handler for gRPC server.
func NewServerHandler(opts ...Option) stats.Handler {
h := &serverHandler{
config: newConfig(opts),
r: &handler{},
}

h.r.tracer = h.config.TracerProvider.Tracer(
instrumentationName,
trace.WithInstrumentationVersion(SemVersion()),
)
h.r.meter = h.config.MeterProvider.Meter(
instrumentationName,
metric.WithInstrumentationVersion(Version()),
metric.WithSchemaURL(semconv.SchemaURL),
)
var err error
h.r.rpcDuration, err = h.r.meter.Float64Histogram("rpc.server.duration",
metric.WithDescription("Measures the duration of inbound RPC."),
metric.WithUnit("ms"))
if err != nil {
otel.Handle(err)
}

h.r.rpcRequestSize, err = h.r.meter.Int64Histogram("rpc.server.request.size",
metric.WithDescription("Measures size of RPC request messages (uncompressed)."),
metric.WithUnit("By"))
if err != nil {
otel.Handle(err)
}

h.r.rpcResponseSize, err = h.r.meter.Int64Histogram("rpc.server.response.size",
metric.WithDescription("Measures size of RPC response messages (uncompressed)."),
metric.WithUnit("By"))
if err != nil {
otel.Handle(err)
}

h.r.rpcRequestsPerRPC, err = h.r.meter.Int64Histogram("rpc.server.requests_per_rpc",
metric.WithDescription("Measures the number of messages received per RPC. Should be 1 for all non-streaming RPCs."),
metric.WithUnit("{count}"))
if err != nil {
otel.Handle(err)
}

h.r.rpcResponsesPerRPC, err = h.r.meter.Int64Histogram("rpc.server.responses_per_rpc",
metric.WithDescription("Measures the number of messages received per RPC. Should be 1 for all non-streaming RPCs."),
metric.WithUnit("{count}"))
if err != nil {
otel.Handle(err)
config: newConfig(opts, "server"),
}

return h
Expand All @@ -128,7 +70,7 @@ func (h *serverHandler) TagRPC(ctx context.Context, info *stats.RPCTagInfo) cont

name, attrs := internal.ParseFullMethod(info.FullMethodName)
attrs = append(attrs, RPCSystemGRPC)
ctx, _ = h.r.tracer.Start(
ctx, _ = h.tracer.Start(
trace.ContextWithRemoteSpanContext(ctx, trace.SpanContextFromContext(ctx)),
name,
trace.WithSpanKind(trace.SpanKindServer),
Expand All @@ -143,65 +85,17 @@ func (h *serverHandler) TagRPC(ctx context.Context, info *stats.RPCTagInfo) cont

// HandleRPC processes the RPC stats.
func (h *serverHandler) HandleRPC(ctx context.Context, rs stats.RPCStats) {
h.r.handleRPC(ctx, rs)
h.handleRPC(ctx, rs)
}

type clientHandler struct {
*config
r *handler
}

// NewClientHandler creates a stats.Handler for gRPC client.
func NewClientHandler(opts ...Option) stats.Handler {
h := &clientHandler{
config: newConfig(opts),
r: &handler{},
}

h.r.tracer = h.config.TracerProvider.Tracer(
instrumentationName,
trace.WithInstrumentationVersion(SemVersion()),
)

h.r.meter = h.config.MeterProvider.Meter(
instrumentationName,
metric.WithInstrumentationVersion(Version()),
metric.WithSchemaURL(semconv.SchemaURL),
)
var err error
h.r.rpcDuration, err = h.r.meter.Float64Histogram("rpc.client.duration",
metric.WithDescription("Measures the duration of inbound RPC."),
metric.WithUnit("ms"))
if err != nil {
otel.Handle(err)
}

h.r.rpcRequestSize, err = h.r.meter.Int64Histogram("rpc.client.request.size",
metric.WithDescription("Measures size of RPC request messages (uncompressed)."),
metric.WithUnit("By"))
if err != nil {
otel.Handle(err)
}

h.r.rpcResponseSize, err = h.r.meter.Int64Histogram("rpc.client.response.size",
metric.WithDescription("Measures size of RPC response messages (uncompressed)."),
metric.WithUnit("By"))
if err != nil {
otel.Handle(err)
}

h.r.rpcRequestsPerRPC, err = h.r.meter.Int64Histogram("rpc.client.requests_per_rpc",
metric.WithDescription("Measures the number of messages received per RPC. Should be 1 for all non-streaming RPCs."),
metric.WithUnit("{count}"))
if err != nil {
otel.Handle(err)
}

h.r.rpcResponsesPerRPC, err = h.r.meter.Int64Histogram("rpc.client.responses_per_rpc",
metric.WithDescription("Measures the number of messages received per RPC. Should be 1 for all non-streaming RPCs."),
metric.WithUnit("{count}"))
if err != nil {
otel.Handle(err)
config: newConfig(opts, "client"),
}

return h
Expand All @@ -211,7 +105,7 @@ func NewClientHandler(opts ...Option) stats.Handler {
func (h *clientHandler) TagRPC(ctx context.Context, info *stats.RPCTagInfo) context.Context {
name, attrs := internal.ParseFullMethod(info.FullMethodName)
attrs = append(attrs, RPCSystemGRPC)
ctx, _ = h.r.tracer.Start(
ctx, _ = h.tracer.Start(
ctx,
name,
trace.WithSpanKind(trace.SpanKindClient),
Expand All @@ -227,7 +121,7 @@ func (h *clientHandler) TagRPC(ctx context.Context, info *stats.RPCTagInfo) cont

// HandleRPC processes the RPC stats.
func (h *clientHandler) HandleRPC(ctx context.Context, rs stats.RPCStats) {
h.r.handleRPC(ctx, rs)
h.handleRPC(ctx, rs)
}

// TagConn can attach some information to the given context.
Expand All @@ -243,7 +137,7 @@ func (h *clientHandler) HandleConn(context.Context, stats.ConnStats) {
// no-op
}

func (r *handler) handleRPC(ctx context.Context, rs stats.RPCStats) {
func (c *config) handleRPC(ctx context.Context, rs stats.RPCStats) {
span := trace.SpanFromContext(ctx)
gctx, _ := ctx.Value(gRPCContextKey{}).(*gRPCContext)
var messageId int64
Expand All @@ -255,7 +149,7 @@ func (r *handler) handleRPC(ctx context.Context, rs stats.RPCStats) {
case *stats.InPayload:
if gctx != nil {
messageId = atomic.AddInt64(&gctx.messagesReceived, 1)
r.rpcRequestSize.Record(ctx, int64(rs.Length), metric.WithAttributes(metricAttrs...))
c.rpcRequestSize.Record(ctx, int64(rs.Length), metric.WithAttributes(metricAttrs...))
}

span.AddEvent("message",
Expand All @@ -269,7 +163,7 @@ func (r *handler) handleRPC(ctx context.Context, rs stats.RPCStats) {
case *stats.OutPayload:
if gctx != nil {
messageId = atomic.AddInt64(&gctx.messagesSent, 1)
r.rpcResponseSize.Record(ctx, int64(rs.Length), metric.WithAttributes(metricAttrs...))
c.rpcResponseSize.Record(ctx, int64(rs.Length), metric.WithAttributes(metricAttrs...))
}

span.AddEvent("message",
Expand All @@ -295,9 +189,9 @@ func (r *handler) handleRPC(ctx context.Context, rs stats.RPCStats) {
span.End()

metricAttrs = append(metricAttrs, rpcStatusAttr)
r.rpcDuration.Record(context.TODO(), float64(rs.EndTime.Sub(rs.BeginTime)), metric.WithAttributes(metricAttrs...))
r.rpcRequestsPerRPC.Record(context.TODO(), gctx.messagesReceived, metric.WithAttributes(metricAttrs...))
r.rpcResponsesPerRPC.Record(context.TODO(), gctx.messagesSent, metric.WithAttributes(metricAttrs...))
c.rpcDuration.Record(context.TODO(), float64(rs.EndTime.Sub(rs.BeginTime)), metric.WithAttributes(metricAttrs...))
c.rpcRequestsPerRPC.Record(context.TODO(), gctx.messagesReceived, metric.WithAttributes(metricAttrs...))
c.rpcResponsesPerRPC.Record(context.TODO(), gctx.messagesSent, metric.WithAttributes(metricAttrs...))

default:
return
Expand Down
Expand Up @@ -604,8 +604,8 @@ func checkUnaryServerRecords(t *testing.T, reader metric.Reader) {
assert.NoError(t, err)
require.Len(t, rm.ScopeMetrics, 1)
require.Len(t, rm.ScopeMetrics[0].Metrics, 1)
require.IsType(t, rm.ScopeMetrics[0].Metrics[0].Data, metricdata.Histogram[int64]{})
data := rm.ScopeMetrics[0].Metrics[0].Data.(metricdata.Histogram[int64])
require.IsType(t, rm.ScopeMetrics[0].Metrics[0].Data, metricdata.Histogram[float64]{})
data := rm.ScopeMetrics[0].Metrics[0].Data.(metricdata.Histogram[float64])

for _, dpt := range data.DataPoints {
attr := dpt.Attributes.ToSlice()
Expand Down
Expand Up @@ -54,7 +54,7 @@ func getSpanFromRecorder(sr *tracetest.SpanRecorder, name string) (trace.ReadOnl
return nil, false
}

func getMetricFromData(data metricdata.Histogram[int64], name string) (*metricdata.HistogramDataPoint[int64], bool) {
func getMetricFromData(data metricdata.Histogram[float64], name string) (*metricdata.HistogramDataPoint[float64], bool) {
for _, d := range data.DataPoints {
v, ok := d.Attributes.Value(semconv.RPCMethodKey)
if !ok {
Expand Down Expand Up @@ -1101,8 +1101,8 @@ func checkManualReaderRecords(t *testing.T, reader metric.Reader, serviceName, n
assert.NoError(t, err)
require.Len(t, rm.ScopeMetrics, 1)
require.Len(t, rm.ScopeMetrics[0].Metrics, 1)
require.IsType(t, rm.ScopeMetrics[0].Metrics[0].Data, metricdata.Histogram[int64]{})
data := rm.ScopeMetrics[0].Metrics[0].Data.(metricdata.Histogram[int64])
require.IsType(t, rm.ScopeMetrics[0].Metrics[0].Data, metricdata.Histogram[float64]{})
data := rm.ScopeMetrics[0].Metrics[0].Data.(metricdata.Histogram[float64])
dpt, ok := getMetricFromData(data, name)
assert.True(t, ok)
attr := dpt.Attributes.ToSlice()
Expand Down

0 comments on commit ad22f06

Please sign in to comment.