diff --git a/instrumentation/google.golang.org/grpc/otelgrpc/stats_handler.go b/instrumentation/google.golang.org/grpc/otelgrpc/stats_handler.go index 16686d1331d..c96c7e3fe64 100644 --- a/instrumentation/google.golang.org/grpc/otelgrpc/stats_handler.go +++ b/instrumentation/google.golang.org/grpc/otelgrpc/stats_handler.go @@ -37,31 +37,108 @@ type grpcContext struct { } func NewServerHandler(opts ...Option) stats.Handler { + h := &serverHandler{ + config: newConfig(opts), + } - return &serverHandler{} + h.tracer = h.config.TracerProvider.Tracer( + instrumentationName, + trace.WithInstrumentationVersion(SemVersion()), + ) + + return h } type serverHandler struct { + *config + tracer trace.Tracer } // TagRPC can attach some information to the given context. -func (h *serverHandler) TagRPC(context.Context, *stats.RPCTagInfo) context.Context { - return nil +func (h *serverHandler) TagRPC(ctx context.Context, info *stats.RPCTagInfo) context.Context { + attrs := []attribute.KeyValue{RPCSystemGRPC} + name, mAttrs := internal.ParseFullMethod(info.FullMethodName) + fmt.Printf("Server TagRPC: %v\n", name) + attrs = append(attrs, mAttrs...) + ctx, _ = h.tracer.Start( + ctx, + name, + trace.WithSpanKind(trace.SpanKindServer), + trace.WithAttributes(attrs...), + ) + + gctx := grpcContext{} + + return context.WithValue(ctx, grpcContextKey, &gctx) } // HandleRPC processes the RPC stats. -func (h *serverHandler) HandleRPC(context.Context, stats.RPCStats) { +func (h *serverHandler) HandleRPC(ctx context.Context, rs stats.RPCStats) { + fmt.Println("Server HandleRPC") + span := trace.SpanFromContext(ctx) + gctx, _ := ctx.Value(grpcContextKey).(*grpcContext) + var messageId int64 = 0 + switch rs := rs.(type) { + case *stats.OutHeader: + fmt.Println(" Server HandleRPC: stats.OutHeader") + case *stats.OutTrailer: + fmt.Println(" Server HandleRPC: stats.OutTrailer") + case *stats.Begin: + fmt.Println(" Server HandleRPC: stats.Begin") + case *stats.InPayload: + fmt.Println(" Server HandleRPC: stats.InPayload") + if gctx != nil { + messageId = atomic.AddInt64(&gctx.messagesReceived, 1) + } + span.AddEvent("message", + trace.WithAttributes( + semconv.MessageTypeReceived, + semconv.MessageUncompressedSizeKey.Int(rs.Length), + semconv.MessageIDKey.Int64(messageId), + ), + ) + case *stats.OutPayload: + fmt.Println(" Server HandleRPC: stats.OutPayload") + if gctx != nil { + messageId = atomic.AddInt64(&gctx.messagesSent, 1) + } + + span.AddEvent("message", + trace.WithAttributes( + semconv.MessageTypeSent, + semconv.MessageUncompressedSizeKey.Int(rs.Length), + semconv.MessageIDKey.Int64(messageId), + ), + ) + case *stats.End: + fmt.Println(" Server HandleRPC: stats.End") + if rs.Error != nil { + s, _ := status.FromError(rs.Error) + span.SetStatus(codes.Error, s.Message()) + span.SetAttributes(statusCodeAttr(s.Code())) + } else { + span.SetAttributes(statusCodeAttr(grpc_codes.OK)) + } + span.End() + default: + return + } } // TagConn can attach some information to the given context. -func (h *serverHandler) TagConn(context.Context, *stats.ConnTagInfo) context.Context { - return nil +func (h *serverHandler) TagConn(ctx context.Context, info *stats.ConnTagInfo) context.Context { + fmt.Println("Server TagConn") + span := trace.SpanFromContext(ctx) + attrs := peerAttr(info.RemoteAddr.String()) + printAttrs("TagConn", attrs) + span.SetAttributes(attrs...) + return ctx } // HandleConn processes the Conn stats. func (h *serverHandler) HandleConn(context.Context, stats.ConnStats) { - + fmt.Println("Server HandleConn") } func NewClientHandler(opts ...Option) stats.Handler {