Skip to content

Commit

Permalink
[otelgrpc] refactor otelgrpc to use grpc.StatsHandler
Browse files Browse the repository at this point in the history
Signed-off-by: Ziqi Zhao <zhaoziqi9146@gmail.com>
  • Loading branch information
fatsheep9146 committed Feb 10, 2023
1 parent c52452d commit 91dc29d
Show file tree
Hide file tree
Showing 3 changed files with 806 additions and 0 deletions.
185 changes: 185 additions & 0 deletions instrumentation/google.golang.org/grpc/otelgrpc/stats_handler.go
@@ -0,0 +1,185 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package otelgrpc // import "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"

import (
"context"
"sync/atomic"

"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc/internal"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
semconv "go.opentelemetry.io/otel/semconv/v1.12.0"
"go.opentelemetry.io/otel/trace"
grpc_codes "google.golang.org/grpc/codes"
"google.golang.org/grpc/stats"
"google.golang.org/grpc/status"
)

type grpcContext struct {
messagesReceived int64
messagesSent int64
}

func NewServerHandler(opts ...Option) stats.Handler {
h := &serverHandler{
config: newConfig(opts),
}

h.tracer = h.config.TracerProvider.Tracer(
instrumentationName,
trace.WithInstrumentationVersion(SemVersion()),
)

return h
}

type serverHandler struct {
*config
tracer trace.Tracer
}

// TagRPC can attach some information to the given context.
func (h *serverHandler) TagRPC(ctx context.Context, info *stats.RPCTagInfo) context.Context {
ctx = extract(ctx, h.config.Propagators)

attrs := []attribute.KeyValue{RPCSystemGRPC}
name, mAttrs := internal.ParseFullMethod(info.FullMethodName)
attrs = append(attrs, mAttrs...)
ctx, _ = h.tracer.Start(
trace.ContextWithRemoteSpanContext(ctx, trace.SpanContextFromContext(ctx)),
name,
trace.WithSpanKind(trace.SpanKindServer),
trace.WithAttributes(attrs...),
)

gctx := grpcContext{}

return context.WithValue(ctx, grpcContextKey, &gctx)
}

// HandleRPC processes the RPC stats.
func (h *serverHandler) HandleRPC(ctx context.Context, rs stats.RPCStats) {
handleRPC(ctx, rs)
}

// TagConn can attach some information to the given context.
func (h *serverHandler) TagConn(ctx context.Context, info *stats.ConnTagInfo) context.Context {
span := trace.SpanFromContext(ctx)
attrs := peerAttr(peerFromCtx(ctx))
span.SetAttributes(attrs...)
return ctx
}

// HandleConn processes the Conn stats.
func (h *serverHandler) HandleConn(ctx context.Context, info stats.ConnStats) {
}

func NewClientHandler(opts ...Option) stats.Handler {
h := &clientHandler{
config: newConfig(opts),
}

h.tracer = h.config.TracerProvider.Tracer(
instrumentationName,
trace.WithInstrumentationVersion(SemVersion()),
)

return h
}

type clientHandler struct {
*config
tracer trace.Tracer
}

const grpcContextKey = "otel-trace-bin"

// TagRPC can attach some information to the given context.
func (h *clientHandler) TagRPC(ctx context.Context, info *stats.RPCTagInfo) context.Context {
attrs := []attribute.KeyValue{RPCSystemGRPC}
name, mAttrs := internal.ParseFullMethod(info.FullMethodName)
attrs = append(attrs, mAttrs...)
ctx, _ = h.tracer.Start(
ctx,
name,
trace.WithSpanKind(trace.SpanKindClient),
trace.WithAttributes(attrs...),
)

gctx := grpcContext{}

return inject(context.WithValue(ctx, grpcContextKey, &gctx), h.config.Propagators)
}

// HandleRPC processes the RPC stats.
func (h *clientHandler) HandleRPC(ctx context.Context, rs stats.RPCStats) {
handleRPC(ctx, rs)
}

// TagConn can attach some information to the given context.
func (h *clientHandler) TagConn(ctx context.Context, cti *stats.ConnTagInfo) context.Context {
span := trace.SpanFromContext(ctx)
attrs := peerAttr(cti.RemoteAddr.String())
span.SetAttributes(attrs...)
return ctx
}

// HandleConn processes the Conn stats.
func (h *clientHandler) HandleConn(context.Context, stats.ConnStats) {
// no-op
}

func handleRPC(ctx context.Context, rs stats.RPCStats) {
span := trace.SpanFromContext(ctx)
gctx, _ := ctx.Value(grpcContextKey).(*grpcContext)
var messageId int64 = 0

switch rs := rs.(type) {
case *stats.Begin:
case *stats.InPayload:
if gctx != nil {
messageId = atomic.AddInt64(&gctx.messagesReceived, 1)
}
span.AddEvent("message",
trace.WithAttributes(
semconv.MessageTypeReceived,
semconv.MessageIDKey.Int64(messageId),
),
)
case *stats.OutPayload:
if gctx != nil {
messageId = atomic.AddInt64(&gctx.messagesSent, 1)
}

span.AddEvent("message",
trace.WithAttributes(
semconv.MessageTypeSent,
semconv.MessageIDKey.Int64(messageId),
),
)
case *stats.End:
if rs.Error != nil {
s, _ := status.FromError(rs.Error)
span.SetStatus(codes.Error, s.Message())
span.SetAttributes(statusCodeAttr(s.Code()))
} else {
span.SetAttributes(statusCodeAttr(grpc_codes.OK))
}
span.End()
default:
return
}
}

0 comments on commit 91dc29d

Please sign in to comment.