Skip to content

Commit

Permalink
add connection info to span
Browse files Browse the repository at this point in the history
Signed-off-by: Ziqi Zhao <zhaoziqi9146@gmail.com>
  • Loading branch information
fatsheep9146 committed Feb 9, 2023
1 parent 88f6a1f commit 3da968b
Show file tree
Hide file tree
Showing 4 changed files with 328 additions and 266 deletions.
32 changes: 27 additions & 5 deletions instrumentation/google.golang.org/grpc/otelgrpc/stats_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package otelgrpc // import "go.opentelemetry.io/contrib/instrumentation/google.g

import (
"context"
"fmt"
"sync/atomic"

"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc/internal"
Expand All @@ -24,14 +25,15 @@ import (
semconv "go.opentelemetry.io/otel/semconv/v1.12.0"
"go.opentelemetry.io/otel/trace"
grpc_codes "google.golang.org/grpc/codes"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/stats"
"google.golang.org/grpc/status"
)

type grpcContext struct {
messagesReceived int64
messagesSent int64
attributes []attribute.KeyValue
name string
}

func NewServerHandler(opts ...Option) stats.Handler {
Expand Down Expand Up @@ -84,11 +86,12 @@ const grpcContextKey = "otel-trace-bin"

// TagRPC can attach some information to the given context.
func (h *clientHandler) TagRPC(ctx context.Context, info *stats.RPCTagInfo) context.Context {
md, _ := metadata.FromIncomingContext(ctx)
h.Propagators.Extract(ctx, &metadataSupplier{metadata: &md})
// md, _ := metadata.FromIncomingContext(ctx)
// h.Propagators.Extract(ctx, &metadataSupplier{metadata: &md})

attrs := []attribute.KeyValue{RPCSystemGRPC}
name, mAttrs := internal.ParseFullMethod(info.FullMethodName)
fmt.Printf("Client TagRPC: %v\n", name)
attrs = append(attrs, mAttrs...)
ctx, _ = h.tracer.Start(
ctx,
Expand All @@ -104,17 +107,23 @@ func (h *clientHandler) TagRPC(ctx context.Context, info *stats.RPCTagInfo) cont

// HandleRPC processes the RPC stats.
func (h *clientHandler) HandleRPC(ctx context.Context, rs stats.RPCStats) {
fmt.Println("Client HandleRPC")
span := trace.SpanFromContext(ctx)
gctx, _ := ctx.Value(grpcContextKey).(*grpcContext)
var messageId int64 = 0

switch rs := rs.(type) {
case *stats.OutHeader:
fmt.Println(" Client HandleRPC: stats.OutHeader")
case *stats.OutTrailer:
fmt.Println(" Client HandleRPC: stats.OutTrailer")
case *stats.Begin:
fmt.Println(" Client HandleRPC: stats.Begin")
case *stats.InPayload:
fmt.Println(" Client HandleRPC: stats.InPayload")
if gctx != nil {
messageId = atomic.AddInt64(&gctx.messagesReceived, 1)
}

span.AddEvent("message",
trace.WithAttributes(
semconv.MessageTypeReceived,
Expand All @@ -123,6 +132,7 @@ func (h *clientHandler) HandleRPC(ctx context.Context, rs stats.RPCStats) {
),
)
case *stats.OutPayload:
fmt.Println(" Client HandleRPC: stats.OutPayload")
if gctx != nil {
messageId = atomic.AddInt64(&gctx.messagesSent, 1)
}
Expand All @@ -135,6 +145,7 @@ func (h *clientHandler) HandleRPC(ctx context.Context, rs stats.RPCStats) {
),
)
case *stats.End:
fmt.Println(" Client HandleRPC: stats.End")
if rs.Error != nil {
s, _ := status.FromError(rs.Error)
span.SetStatus(codes.Error, s.Message())
Expand All @@ -150,11 +161,22 @@ func (h *clientHandler) HandleRPC(ctx context.Context, rs stats.RPCStats) {

// TagConn can attach some information to the given context.
func (h *clientHandler) TagConn(ctx context.Context, cti *stats.ConnTagInfo) context.Context {
// no-op
fmt.Println("Client TagConn")
span := trace.SpanFromContext(ctx)
attrs := peerAttr(cti.RemoteAddr.String())
printAttrs("TagConn", attrs)
span.SetAttributes(attrs...)
return ctx
}

// HandleConn processes the Conn stats.
func (h *clientHandler) HandleConn(context.Context, stats.ConnStats) {
fmt.Println("Client HandleConn")
// no-op
}

func printAttrs(stage string, attributes []attribute.KeyValue) {
for _, kv := range attributes {
fmt.Printf("stage %v: got attr [%v: %v]\n", stage, kv.Key, kv.Value)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,290 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package test

import (
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"

"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/sdk/trace"
"go.opentelemetry.io/otel/sdk/trace/tracetest"
semconv "go.opentelemetry.io/otel/semconv/v1.17.0"
)

func TestStatsHandler(t *testing.T) {
clientUnarySR := tracetest.NewSpanRecorder()
clientUnaryTP := trace.NewTracerProvider(trace.WithSpanProcessor(clientUnarySR))

assert.NoError(t, doCalls(
[]grpc.DialOption{
grpc.WithStatsHandler(otelgrpc.NewClientHandler(otelgrpc.WithTracerProvider(clientUnaryTP))),
},
[]grpc.ServerOption{},
))

t.Run("ClientSpans", func(t *testing.T) {
checkClientSpansStats(t, clientUnarySR.Ended())
})
}

func checkClientSpansStats(t *testing.T, spans []trace.ReadOnlySpan) {
require.Len(t, spans, 5)

emptySpan := spans[0]
assert.False(t, emptySpan.EndTime().IsZero())
assert.Equal(t, "grpc.testing.TestService/EmptyCall", emptySpan.Name())
assertEvents(t, []trace.Event{
{
Name: "message",
Attributes: []attribute.KeyValue{
otelgrpc.RPCMessageIDKey.Int(1),
otelgrpc.RPCMessageTypeKey.String("SENT"),
otelgrpc.RPCMessageUncompressedSizeKey.Int(0),
},
},
{
Name: "message",
Attributes: []attribute.KeyValue{
otelgrpc.RPCMessageIDKey.Int(1),
otelgrpc.RPCMessageTypeKey.String("RECEIVED"),
otelgrpc.RPCMessageUncompressedSizeKey.Int(0),
},
},
}, emptySpan.Events())
assert.ElementsMatch(t, []attribute.KeyValue{
semconv.RPCMethodKey.String("EmptyCall"),
semconv.RPCServiceKey.String("grpc.testing.TestService"),
otelgrpc.RPCSystemGRPC,
otelgrpc.GRPCStatusCodeKey.Int64(int64(codes.OK)),
}, emptySpan.Attributes())

largeSpan := spans[1]
assert.False(t, largeSpan.EndTime().IsZero())
assert.Equal(t, "grpc.testing.TestService/UnaryCall", largeSpan.Name())
assertEvents(t, []trace.Event{
{
Name: "message",
Attributes: []attribute.KeyValue{
otelgrpc.RPCMessageIDKey.Int(1),
otelgrpc.RPCMessageTypeKey.String("SENT"),
// largeReqSize from "google.golang.org/grpc/interop" + 12 (overhead).
otelgrpc.RPCMessageUncompressedSizeKey.Int(271840),
},
},
{
Name: "message",
Attributes: []attribute.KeyValue{
otelgrpc.RPCMessageIDKey.Int(1),
otelgrpc.RPCMessageTypeKey.String("RECEIVED"),
// largeRespSize from "google.golang.org/grpc/interop" + 8 (overhead).
otelgrpc.RPCMessageUncompressedSizeKey.Int(314167),
},
},
}, largeSpan.Events())
assert.ElementsMatch(t, []attribute.KeyValue{
semconv.RPCMethodKey.String("UnaryCall"),
semconv.RPCServiceKey.String("grpc.testing.TestService"),
otelgrpc.RPCSystemGRPC,
otelgrpc.GRPCStatusCodeKey.Int64(int64(codes.OK)),
}, largeSpan.Attributes())

streamInput := spans[2]
assert.False(t, streamInput.EndTime().IsZero())
assert.Equal(t, "grpc.testing.TestService/StreamingInputCall", streamInput.Name())
assertEvents(t, []trace.Event{
{
Name: "message",
Attributes: []attribute.KeyValue{
otelgrpc.RPCMessageIDKey.Int(1),
otelgrpc.RPCMessageTypeKey.String("SENT"),
otelgrpc.RPCMessageUncompressedSizeKey.Int(27190),
},
},
{
Name: "message",
Attributes: []attribute.KeyValue{
otelgrpc.RPCMessageIDKey.Int(2),
otelgrpc.RPCMessageTypeKey.String("SENT"),
otelgrpc.RPCMessageUncompressedSizeKey.Int(12),
},
},
{
Name: "message",
Attributes: []attribute.KeyValue{
otelgrpc.RPCMessageIDKey.Int(3),
otelgrpc.RPCMessageTypeKey.String("SENT"),
otelgrpc.RPCMessageUncompressedSizeKey.Int(1834),
},
},
{
Name: "message",
Attributes: []attribute.KeyValue{
otelgrpc.RPCMessageIDKey.Int(4),
otelgrpc.RPCMessageTypeKey.String("SENT"),
otelgrpc.RPCMessageUncompressedSizeKey.Int(45912),
},
},
{
Name: "message",
Attributes: []attribute.KeyValue{
otelgrpc.RPCMessageIDKey.Int(1),
otelgrpc.RPCMessageTypeKey.String("RECEIVED"),
otelgrpc.RPCMessageUncompressedSizeKey.Int(4),
},
},
// client does not record an event for the server response.
}, streamInput.Events())
assert.ElementsMatch(t, []attribute.KeyValue{
semconv.RPCMethodKey.String("StreamingInputCall"),
semconv.RPCServiceKey.String("grpc.testing.TestService"),
otelgrpc.RPCSystemGRPC,
otelgrpc.GRPCStatusCodeKey.Int64(int64(codes.OK)),
}, streamInput.Attributes())

streamOutput := spans[3]
assert.False(t, streamOutput.EndTime().IsZero())
assert.Equal(t, "grpc.testing.TestService/StreamingOutputCall", streamOutput.Name())
// sizes from respSizes in "google.golang.org/grpc/interop".
assertEvents(t, []trace.Event{
{
Name: "message",
Attributes: []attribute.KeyValue{
otelgrpc.RPCMessageIDKey.Int(1),
otelgrpc.RPCMessageTypeKey.String("SENT"),
otelgrpc.RPCMessageUncompressedSizeKey.Int(21),
},
},
{
Name: "message",
Attributes: []attribute.KeyValue{
otelgrpc.RPCMessageIDKey.Int(1),
otelgrpc.RPCMessageTypeKey.String("RECEIVED"),
otelgrpc.RPCMessageUncompressedSizeKey.Int(31423),
},
},
{
Name: "message",
Attributes: []attribute.KeyValue{
otelgrpc.RPCMessageIDKey.Int(2),
otelgrpc.RPCMessageTypeKey.String("RECEIVED"),
otelgrpc.RPCMessageUncompressedSizeKey.Int(13),
},
},
{
Name: "message",
Attributes: []attribute.KeyValue{
otelgrpc.RPCMessageIDKey.Int(3),
otelgrpc.RPCMessageTypeKey.String("RECEIVED"),
otelgrpc.RPCMessageUncompressedSizeKey.Int(2659),
},
},
{
Name: "message",
Attributes: []attribute.KeyValue{
otelgrpc.RPCMessageIDKey.Int(4),
otelgrpc.RPCMessageTypeKey.String("RECEIVED"),
otelgrpc.RPCMessageUncompressedSizeKey.Int(58987),
},
},
}, streamOutput.Events())
assert.ElementsMatch(t, []attribute.KeyValue{
semconv.RPCMethodKey.String("StreamingOutputCall"),
semconv.RPCServiceKey.String("grpc.testing.TestService"),
otelgrpc.RPCSystemGRPC,
otelgrpc.GRPCStatusCodeKey.Int64(int64(codes.OK)),
}, streamOutput.Attributes())

pingPong := spans[4]
assert.False(t, pingPong.EndTime().IsZero())
assert.Equal(t, "grpc.testing.TestService/FullDuplexCall", pingPong.Name())
assertEvents(t, []trace.Event{
{
Name: "message",
Attributes: []attribute.KeyValue{
otelgrpc.RPCMessageIDKey.Int(1),
otelgrpc.RPCMessageTypeKey.String("SENT"),
otelgrpc.RPCMessageUncompressedSizeKey.Int(27196),
},
},
{
Name: "message",
Attributes: []attribute.KeyValue{
otelgrpc.RPCMessageIDKey.Int(1),
otelgrpc.RPCMessageTypeKey.String("RECEIVED"),
otelgrpc.RPCMessageUncompressedSizeKey.Int(31423),
},
},
{
Name: "message",
Attributes: []attribute.KeyValue{
otelgrpc.RPCMessageIDKey.Int(2),
otelgrpc.RPCMessageTypeKey.String("SENT"),
otelgrpc.RPCMessageUncompressedSizeKey.Int(16),
},
},
{
Name: "message",
Attributes: []attribute.KeyValue{
otelgrpc.RPCMessageIDKey.Int(2),
otelgrpc.RPCMessageTypeKey.String("RECEIVED"),
otelgrpc.RPCMessageUncompressedSizeKey.Int(13),
},
},
{
Name: "message",
Attributes: []attribute.KeyValue{
otelgrpc.RPCMessageIDKey.Int(3),
otelgrpc.RPCMessageTypeKey.String("SENT"),
otelgrpc.RPCMessageUncompressedSizeKey.Int(1839),
},
},
{
Name: "message",
Attributes: []attribute.KeyValue{
otelgrpc.RPCMessageIDKey.Int(3),
otelgrpc.RPCMessageTypeKey.String("RECEIVED"),
otelgrpc.RPCMessageUncompressedSizeKey.Int(2659),
},
},
{
Name: "message",
Attributes: []attribute.KeyValue{
otelgrpc.RPCMessageIDKey.Int(4),
otelgrpc.RPCMessageTypeKey.String("SENT"),
otelgrpc.RPCMessageUncompressedSizeKey.Int(45918),
},
},
{
Name: "message",
Attributes: []attribute.KeyValue{
otelgrpc.RPCMessageIDKey.Int(4),
otelgrpc.RPCMessageTypeKey.String("RECEIVED"),
otelgrpc.RPCMessageUncompressedSizeKey.Int(58987),
},
},
}, pingPong.Events())
assert.ElementsMatch(t, []attribute.KeyValue{
semconv.RPCMethodKey.String("FullDuplexCall"),
semconv.RPCServiceKey.String("grpc.testing.TestService"),
otelgrpc.RPCSystemGRPC,
otelgrpc.GRPCStatusCodeKey.Int64(int64(codes.OK)),
}, pingPong.Attributes())
}

0 comments on commit 3da968b

Please sign in to comment.