Skip to content

Commit

Permalink
add server handler basic implementation
Browse files Browse the repository at this point in the history
Signed-off-by: Ziqi Zhao <zhaoziqi9146@gmail.com>
  • Loading branch information
fatsheep9146 committed Feb 9, 2023
1 parent 3da968b commit c08371f
Showing 1 changed file with 84 additions and 7 deletions.
91 changes: 84 additions & 7 deletions instrumentation/google.golang.org/grpc/otelgrpc/stats_handler.go
Expand Up @@ -37,31 +37,108 @@ type grpcContext struct {
}

func NewServerHandler(opts ...Option) stats.Handler {
h := &serverHandler{
config: newConfig(opts),
}

return &serverHandler{}
h.tracer = h.config.TracerProvider.Tracer(
instrumentationName,
trace.WithInstrumentationVersion(SemVersion()),
)

return h
}

type serverHandler struct {
*config
tracer trace.Tracer
}

// TagRPC can attach some information to the given context.
func (h *serverHandler) TagRPC(context.Context, *stats.RPCTagInfo) context.Context {
return nil
func (h *serverHandler) TagRPC(ctx context.Context, info *stats.RPCTagInfo) context.Context {
attrs := []attribute.KeyValue{RPCSystemGRPC}
name, mAttrs := internal.ParseFullMethod(info.FullMethodName)
fmt.Printf("Server TagRPC: %v\n", name)
attrs = append(attrs, mAttrs...)
ctx, _ = h.tracer.Start(
ctx,
name,
trace.WithSpanKind(trace.SpanKindServer),
trace.WithAttributes(attrs...),
)

gctx := grpcContext{}

return context.WithValue(ctx, grpcContextKey, &gctx)
}

// HandleRPC processes the RPC stats.
func (h *serverHandler) HandleRPC(context.Context, stats.RPCStats) {
func (h *serverHandler) HandleRPC(ctx context.Context, rs stats.RPCStats) {
fmt.Println("Server HandleRPC")
span := trace.SpanFromContext(ctx)
gctx, _ := ctx.Value(grpcContextKey).(*grpcContext)
var messageId int64 = 0

switch rs := rs.(type) {
case *stats.OutHeader:
fmt.Println(" Server HandleRPC: stats.OutHeader")
case *stats.OutTrailer:
fmt.Println(" Server HandleRPC: stats.OutTrailer")
case *stats.Begin:
fmt.Println(" Server HandleRPC: stats.Begin")
case *stats.InPayload:
fmt.Println(" Server HandleRPC: stats.InPayload")
if gctx != nil {
messageId = atomic.AddInt64(&gctx.messagesReceived, 1)
}
span.AddEvent("message",
trace.WithAttributes(
semconv.MessageTypeReceived,
semconv.MessageUncompressedSizeKey.Int(rs.Length),
semconv.MessageIDKey.Int64(messageId),
),
)
case *stats.OutPayload:
fmt.Println(" Server HandleRPC: stats.OutPayload")
if gctx != nil {
messageId = atomic.AddInt64(&gctx.messagesSent, 1)
}

span.AddEvent("message",
trace.WithAttributes(
semconv.MessageTypeSent,
semconv.MessageUncompressedSizeKey.Int(rs.Length),
semconv.MessageIDKey.Int64(messageId),
),
)
case *stats.End:
fmt.Println(" Server HandleRPC: stats.End")
if rs.Error != nil {
s, _ := status.FromError(rs.Error)
span.SetStatus(codes.Error, s.Message())
span.SetAttributes(statusCodeAttr(s.Code()))
} else {
span.SetAttributes(statusCodeAttr(grpc_codes.OK))
}
span.End()
default:
return
}
}

// TagConn can attach some information to the given context.
func (h *serverHandler) TagConn(context.Context, *stats.ConnTagInfo) context.Context {
return nil
func (h *serverHandler) TagConn(ctx context.Context, info *stats.ConnTagInfo) context.Context {
fmt.Println("Server TagConn")
span := trace.SpanFromContext(ctx)
attrs := peerAttr(info.RemoteAddr.String())
printAttrs("TagConn", attrs)
span.SetAttributes(attrs...)
return ctx
}

// HandleConn processes the Conn stats.
func (h *serverHandler) HandleConn(context.Context, stats.ConnStats) {

fmt.Println("Server HandleConn")
}

func NewClientHandler(opts ...Option) stats.Handler {
Expand Down

0 comments on commit c08371f

Please sign in to comment.