From bf86b26a1d265ad04a07ae2abadcd08e9d18e174 Mon Sep 17 00:00:00 2001 From: Ziqi Zhao Date: Tue, 19 Sep 2023 14:07:38 +0800 Subject: [PATCH] otelgrpc: Implement grpc.StatsHandler for trace instrumentation (#3002) --- CHANGELOG.md | 1 + .../google.golang.org/grpc/otelgrpc/doc.go | 45 ++ .../grpc/otelgrpc/stats_handler.go | 187 ++++++ .../otelgrpc/test/grpc_stats_handler_test.go | 581 ++++++++++++++++++ 4 files changed, 814 insertions(+) create mode 100644 instrumentation/google.golang.org/grpc/otelgrpc/doc.go create mode 100644 instrumentation/google.golang.org/grpc/otelgrpc/stats_handler.go create mode 100644 instrumentation/google.golang.org/grpc/otelgrpc/test/grpc_stats_handler_test.go diff --git a/CHANGELOG.md b/CHANGELOG.md index 249020b02dd..0b149fa66d4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,6 +12,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm - Add the new `go.opentelemetry.io/contrib/instrgen` package to provide auto-generated source code instrumentation. (#3068, #3108) - Set the description for the `rpc.server.duration` metric in `go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc`. (#4302) +- Add `NewServerHandler` and `NewClientHandler` that return a `grpc.StatsHandler` used for gRPC instrumentation in `go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc`. (#3002) ## [1.19.0/0.44.0/0.13.0] - 2023-09-12 diff --git a/instrumentation/google.golang.org/grpc/otelgrpc/doc.go b/instrumentation/google.golang.org/grpc/otelgrpc/doc.go new file mode 100644 index 00000000000..a993e0fc921 --- /dev/null +++ b/instrumentation/google.golang.org/grpc/otelgrpc/doc.go @@ -0,0 +1,45 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +/* +Package otelgrpc is the instrumentation library for [google.golang.org/grpc] + +For now you can instrument your program which use [google.golang.org/grpc] in two ways: + + - by [grpc.UnaryClientInterceptor], [grpc.UnaryServerInterceptor], [grpc.StreamClientInterceptor], [grpc.StreamServerInterceptor] + - by [stats.Handler] + +Notice: Do not use both interceptors and [stats.Handler] at the same time! If so, you will get duplicated spans and the parent/child relationships between spans will also be broken. + +We strongly still recommand you to use [stats.Handler], mainly for two reasons: + +Functional advantages: [stats.Handler] has more information for user to build more flexible and granular metric, for example + + - multiple different types of represent "data length": In [stats.InPayload], there exists "Length", "CompressedLength", "WireLength" to denote the size of uncompressed, compressed payload data, with or without framing data. But in interceptors, we can only got uncompressed data, and this feature is also removed due to performance problem. + + - more accurate timestamp: [stats.InPayload]'s "RecvTime" and [stats.OutPayload]'s "SentTime" records more accurate timestamp that server got and sent the message, the timestamp recorded by interceptors depends on the location of this interceptors in the total interceptor chain. + + - some other use cases: for example, catch failure of decoding message. + +Performance advantages: If too many interceptors are registered in a service, the interceptor chain can become too long, which increases the latency and processing time of the entire RPC call. + +[stats.Handler]: https://pkg.go.dev/google.golang.org/grpc/stats#Handler +[grpc.UnaryClientInterceptor]: https://pkg.go.dev/google.golang.org/grpc#UnaryClientInterceptor +[grpc.UnaryServerInterceptor]: https://pkg.go.dev/google.golang.org/grpc#UnaryServerInterceptor +[grpc.StreamClientInterceptor]: https://pkg.go.dev/google.golang.org/grpc#StreamClientInterceptor +[grpc.StreamServerInterceptor]: https://pkg.go.dev/google.golang.org/grpc#StreamServerInterceptor +[stats.OutPayload]: https://pkg.go.dev/google.golang.org/grpc/stats#OutPayload +[stats.InPayload]: https://pkg.go.dev/google.golang.org/grpc/stats#InPayload +*/ +package otelgrpc // import "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc" diff --git a/instrumentation/google.golang.org/grpc/otelgrpc/stats_handler.go b/instrumentation/google.golang.org/grpc/otelgrpc/stats_handler.go new file mode 100644 index 00000000000..c64a53443bc --- /dev/null +++ b/instrumentation/google.golang.org/grpc/otelgrpc/stats_handler.go @@ -0,0 +1,187 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package otelgrpc // import "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc" + +import ( + "context" + "sync/atomic" + + grpc_codes "google.golang.org/grpc/codes" + "google.golang.org/grpc/stats" + "google.golang.org/grpc/status" + + "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc/internal" + "go.opentelemetry.io/otel/codes" + semconv "go.opentelemetry.io/otel/semconv/v1.17.0" + "go.opentelemetry.io/otel/trace" +) + +type gRPCContextKey struct{} + +type gRPCContext struct { + messagesReceived int64 + messagesSent int64 +} + +// NewServerHandler creates a stats.Handler for gRPC server. +func NewServerHandler(opts ...Option) stats.Handler { + h := &serverHandler{ + config: newConfig(opts), + } + + h.tracer = h.config.TracerProvider.Tracer( + instrumentationName, + trace.WithInstrumentationVersion(SemVersion()), + ) + return h +} + +type serverHandler struct { + *config + tracer trace.Tracer +} + +// TagRPC can attach some information to the given context. +func (h *serverHandler) TagRPC(ctx context.Context, info *stats.RPCTagInfo) context.Context { + ctx = extract(ctx, h.config.Propagators) + + name, attrs := internal.ParseFullMethod(info.FullMethodName) + attrs = append(attrs, RPCSystemGRPC) + ctx, _ = h.tracer.Start( + trace.ContextWithRemoteSpanContext(ctx, trace.SpanContextFromContext(ctx)), + name, + trace.WithSpanKind(trace.SpanKindServer), + trace.WithAttributes(attrs...), + ) + + gctx := gRPCContext{} + return context.WithValue(ctx, gRPCContextKey{}, &gctx) +} + +// HandleRPC processes the RPC stats. +func (h *serverHandler) HandleRPC(ctx context.Context, rs stats.RPCStats) { + handleRPC(ctx, rs) +} + +// TagConn can attach some information to the given context. +func (h *serverHandler) TagConn(ctx context.Context, info *stats.ConnTagInfo) context.Context { + span := trace.SpanFromContext(ctx) + attrs := peerAttr(peerFromCtx(ctx)) + span.SetAttributes(attrs...) + return ctx +} + +// HandleConn processes the Conn stats. +func (h *serverHandler) HandleConn(ctx context.Context, info stats.ConnStats) { +} + +// NewClientHandler creates a stats.Handler for gRPC client. +func NewClientHandler(opts ...Option) stats.Handler { + h := &clientHandler{ + config: newConfig(opts), + } + + h.tracer = h.config.TracerProvider.Tracer( + instrumentationName, + trace.WithInstrumentationVersion(SemVersion()), + ) + + return h +} + +type clientHandler struct { + *config + tracer trace.Tracer +} + +// TagRPC can attach some information to the given context. +func (h *clientHandler) TagRPC(ctx context.Context, info *stats.RPCTagInfo) context.Context { + name, attrs := internal.ParseFullMethod(info.FullMethodName) + attrs = append(attrs, RPCSystemGRPC) + ctx, _ = h.tracer.Start( + ctx, + name, + trace.WithSpanKind(trace.SpanKindClient), + trace.WithAttributes(attrs...), + ) + + gctx := gRPCContext{} + + return inject(context.WithValue(ctx, gRPCContextKey{}, &gctx), h.config.Propagators) +} + +// HandleRPC processes the RPC stats. +func (h *clientHandler) HandleRPC(ctx context.Context, rs stats.RPCStats) { + handleRPC(ctx, rs) +} + +// TagConn can attach some information to the given context. +func (h *clientHandler) TagConn(ctx context.Context, cti *stats.ConnTagInfo) context.Context { + span := trace.SpanFromContext(ctx) + attrs := peerAttr(cti.RemoteAddr.String()) + span.SetAttributes(attrs...) + return ctx +} + +// HandleConn processes the Conn stats. +func (h *clientHandler) HandleConn(context.Context, stats.ConnStats) { + // no-op +} + +func handleRPC(ctx context.Context, rs stats.RPCStats) { + span := trace.SpanFromContext(ctx) + gctx, _ := ctx.Value(gRPCContextKey{}).(*gRPCContext) + var messageId int64 + + switch rs := rs.(type) { + case *stats.Begin: + case *stats.InPayload: + if gctx != nil { + messageId = atomic.AddInt64(&gctx.messagesReceived, 1) + } + span.AddEvent("message", + trace.WithAttributes( + semconv.MessageTypeReceived, + semconv.MessageIDKey.Int64(messageId), + semconv.MessageCompressedSizeKey.Int(rs.CompressedLength), + semconv.MessageUncompressedSizeKey.Int(rs.Length), + ), + ) + case *stats.OutPayload: + if gctx != nil { + messageId = atomic.AddInt64(&gctx.messagesSent, 1) + } + + span.AddEvent("message", + trace.WithAttributes( + semconv.MessageTypeSent, + semconv.MessageIDKey.Int64(messageId), + semconv.MessageCompressedSizeKey.Int(rs.CompressedLength), + semconv.MessageUncompressedSizeKey.Int(rs.Length), + ), + ) + case *stats.End: + if rs.Error != nil { + s, _ := status.FromError(rs.Error) + span.SetStatus(codes.Error, s.Message()) + span.SetAttributes(statusCodeAttr(s.Code())) + } else { + span.SetAttributes(statusCodeAttr(grpc_codes.OK)) + } + span.End() + default: + return + } +} diff --git a/instrumentation/google.golang.org/grpc/otelgrpc/test/grpc_stats_handler_test.go b/instrumentation/google.golang.org/grpc/otelgrpc/test/grpc_stats_handler_test.go new file mode 100644 index 00000000000..b6afd1a118d --- /dev/null +++ b/instrumentation/google.golang.org/grpc/otelgrpc/test/grpc_stats_handler_test.go @@ -0,0 +1,581 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package test + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "google.golang.org/grpc" + "google.golang.org/grpc/codes" + + "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/sdk/trace" + "go.opentelemetry.io/otel/sdk/trace/tracetest" + semconv "go.opentelemetry.io/otel/semconv/v1.17.0" +) + +func TestStatsHandler(t *testing.T) { + clientSR := tracetest.NewSpanRecorder() + clientTP := trace.NewTracerProvider(trace.WithSpanProcessor(clientSR)) + + serverSR := tracetest.NewSpanRecorder() + serverTP := trace.NewTracerProvider(trace.WithSpanProcessor(serverSR)) + + assert.NoError(t, doCalls( + []grpc.DialOption{ + grpc.WithStatsHandler(otelgrpc.NewClientHandler(otelgrpc.WithTracerProvider(clientTP))), + }, + []grpc.ServerOption{ + grpc.StatsHandler(otelgrpc.NewServerHandler(otelgrpc.WithTracerProvider(serverTP))), + }, + )) + + t.Run("ClientSpans", func(t *testing.T) { + checkClientSpans(t, clientSR.Ended()) + }) + + t.Run("ServerSpans", func(t *testing.T) { + checkServerSpans(t, serverSR.Ended()) + }) +} + +func checkClientSpans(t *testing.T, spans []trace.ReadOnlySpan) { + require.Len(t, spans, 5) + + emptySpan := spans[0] + assert.False(t, emptySpan.EndTime().IsZero()) + assert.Equal(t, "grpc.testing.TestService/EmptyCall", emptySpan.Name()) + assertEvents(t, []trace.Event{ + { + Name: "message", + Attributes: []attribute.KeyValue{ + otelgrpc.RPCMessageIDKey.Int(1), + otelgrpc.RPCMessageTypeKey.String("SENT"), + otelgrpc.RPCMessageCompressedSizeKey.Int(0), + otelgrpc.RPCMessageUncompressedSizeKey.Int(0), + }, + }, + { + Name: "message", + Attributes: []attribute.KeyValue{ + otelgrpc.RPCMessageIDKey.Int(1), + otelgrpc.RPCMessageTypeKey.String("RECEIVED"), + otelgrpc.RPCMessageCompressedSizeKey.Int(0), + otelgrpc.RPCMessageUncompressedSizeKey.Int(0), + }, + }, + }, emptySpan.Events()) + assert.ElementsMatch(t, []attribute.KeyValue{ + semconv.RPCMethodKey.String("EmptyCall"), + semconv.RPCServiceKey.String("grpc.testing.TestService"), + otelgrpc.RPCSystemGRPC, + otelgrpc.GRPCStatusCodeKey.Int64(int64(codes.OK)), + }, emptySpan.Attributes()) + + largeSpan := spans[1] + assert.False(t, largeSpan.EndTime().IsZero()) + assert.Equal(t, "grpc.testing.TestService/UnaryCall", largeSpan.Name()) + assertEvents(t, []trace.Event{ + { + Name: "message", + Attributes: []attribute.KeyValue{ + otelgrpc.RPCMessageIDKey.Int(1), + otelgrpc.RPCMessageTypeKey.String("SENT"), + otelgrpc.RPCMessageCompressedSizeKey.Int(271840), + otelgrpc.RPCMessageUncompressedSizeKey.Int(271840), + }, + }, + { + Name: "message", + Attributes: []attribute.KeyValue{ + otelgrpc.RPCMessageIDKey.Int(1), + otelgrpc.RPCMessageTypeKey.String("RECEIVED"), + otelgrpc.RPCMessageCompressedSizeKey.Int(314167), + otelgrpc.RPCMessageUncompressedSizeKey.Int(314167), + }, + }, + }, largeSpan.Events()) + assert.ElementsMatch(t, []attribute.KeyValue{ + semconv.RPCMethodKey.String("UnaryCall"), + semconv.RPCServiceKey.String("grpc.testing.TestService"), + otelgrpc.RPCSystemGRPC, + otelgrpc.GRPCStatusCodeKey.Int64(int64(codes.OK)), + }, largeSpan.Attributes()) + + streamInput := spans[2] + assert.False(t, streamInput.EndTime().IsZero()) + assert.Equal(t, "grpc.testing.TestService/StreamingInputCall", streamInput.Name()) + assertEvents(t, []trace.Event{ + { + Name: "message", + Attributes: []attribute.KeyValue{ + otelgrpc.RPCMessageIDKey.Int(1), + otelgrpc.RPCMessageTypeKey.String("SENT"), + otelgrpc.RPCMessageCompressedSizeKey.Int(27190), + otelgrpc.RPCMessageUncompressedSizeKey.Int(27190), + }, + }, + { + Name: "message", + Attributes: []attribute.KeyValue{ + otelgrpc.RPCMessageIDKey.Int(2), + otelgrpc.RPCMessageTypeKey.String("SENT"), + otelgrpc.RPCMessageCompressedSizeKey.Int(12), + otelgrpc.RPCMessageUncompressedSizeKey.Int(12), + }, + }, + { + Name: "message", + Attributes: []attribute.KeyValue{ + otelgrpc.RPCMessageIDKey.Int(3), + otelgrpc.RPCMessageTypeKey.String("SENT"), + otelgrpc.RPCMessageCompressedSizeKey.Int(1834), + otelgrpc.RPCMessageUncompressedSizeKey.Int(1834), + }, + }, + { + Name: "message", + Attributes: []attribute.KeyValue{ + otelgrpc.RPCMessageIDKey.Int(4), + otelgrpc.RPCMessageTypeKey.String("SENT"), + otelgrpc.RPCMessageCompressedSizeKey.Int(45912), + otelgrpc.RPCMessageUncompressedSizeKey.Int(45912), + }, + }, + { + Name: "message", + Attributes: []attribute.KeyValue{ + otelgrpc.RPCMessageIDKey.Int(1), + otelgrpc.RPCMessageTypeKey.String("RECEIVED"), + otelgrpc.RPCMessageCompressedSizeKey.Int(4), + otelgrpc.RPCMessageUncompressedSizeKey.Int(4), + }, + }, + // client does not record an event for the server response. + }, streamInput.Events()) + assert.ElementsMatch(t, []attribute.KeyValue{ + semconv.RPCMethodKey.String("StreamingInputCall"), + semconv.RPCServiceKey.String("grpc.testing.TestService"), + otelgrpc.RPCSystemGRPC, + otelgrpc.GRPCStatusCodeKey.Int64(int64(codes.OK)), + }, streamInput.Attributes()) + + streamOutput := spans[3] + assert.False(t, streamOutput.EndTime().IsZero()) + assert.Equal(t, "grpc.testing.TestService/StreamingOutputCall", streamOutput.Name()) + assertEvents(t, []trace.Event{ + { + Name: "message", + Attributes: []attribute.KeyValue{ + otelgrpc.RPCMessageIDKey.Int(1), + otelgrpc.RPCMessageTypeKey.String("SENT"), + otelgrpc.RPCMessageCompressedSizeKey.Int(21), + otelgrpc.RPCMessageUncompressedSizeKey.Int(21), + }, + }, + { + Name: "message", + Attributes: []attribute.KeyValue{ + otelgrpc.RPCMessageIDKey.Int(1), + otelgrpc.RPCMessageTypeKey.String("RECEIVED"), + otelgrpc.RPCMessageCompressedSizeKey.Int(31423), + otelgrpc.RPCMessageUncompressedSizeKey.Int(31423), + }, + }, + { + Name: "message", + Attributes: []attribute.KeyValue{ + otelgrpc.RPCMessageIDKey.Int(2), + otelgrpc.RPCMessageTypeKey.String("RECEIVED"), + otelgrpc.RPCMessageCompressedSizeKey.Int(13), + otelgrpc.RPCMessageUncompressedSizeKey.Int(13), + }, + }, + { + Name: "message", + Attributes: []attribute.KeyValue{ + otelgrpc.RPCMessageIDKey.Int(3), + otelgrpc.RPCMessageTypeKey.String("RECEIVED"), + otelgrpc.RPCMessageCompressedSizeKey.Int(2659), + otelgrpc.RPCMessageUncompressedSizeKey.Int(2659), + }, + }, + { + Name: "message", + Attributes: []attribute.KeyValue{ + otelgrpc.RPCMessageIDKey.Int(4), + otelgrpc.RPCMessageTypeKey.String("RECEIVED"), + otelgrpc.RPCMessageCompressedSizeKey.Int(58987), + otelgrpc.RPCMessageUncompressedSizeKey.Int(58987), + }, + }, + }, streamOutput.Events()) + assert.ElementsMatch(t, []attribute.KeyValue{ + semconv.RPCMethodKey.String("StreamingOutputCall"), + semconv.RPCServiceKey.String("grpc.testing.TestService"), + otelgrpc.RPCSystemGRPC, + otelgrpc.GRPCStatusCodeKey.Int64(int64(codes.OK)), + }, streamOutput.Attributes()) + + pingPong := spans[4] + assert.False(t, pingPong.EndTime().IsZero()) + assert.Equal(t, "grpc.testing.TestService/FullDuplexCall", pingPong.Name()) + assertEvents(t, []trace.Event{ + { + Name: "message", + Attributes: []attribute.KeyValue{ + otelgrpc.RPCMessageIDKey.Int(1), + otelgrpc.RPCMessageTypeKey.String("SENT"), + otelgrpc.RPCMessageCompressedSizeKey.Int(27196), + otelgrpc.RPCMessageUncompressedSizeKey.Int(27196), + }, + }, + { + Name: "message", + Attributes: []attribute.KeyValue{ + otelgrpc.RPCMessageIDKey.Int(1), + otelgrpc.RPCMessageTypeKey.String("RECEIVED"), + otelgrpc.RPCMessageCompressedSizeKey.Int(31423), + otelgrpc.RPCMessageUncompressedSizeKey.Int(31423), + }, + }, + { + Name: "message", + Attributes: []attribute.KeyValue{ + otelgrpc.RPCMessageIDKey.Int(2), + otelgrpc.RPCMessageTypeKey.String("SENT"), + otelgrpc.RPCMessageCompressedSizeKey.Int(16), + otelgrpc.RPCMessageUncompressedSizeKey.Int(16), + }, + }, + { + Name: "message", + Attributes: []attribute.KeyValue{ + otelgrpc.RPCMessageIDKey.Int(2), + otelgrpc.RPCMessageTypeKey.String("RECEIVED"), + otelgrpc.RPCMessageCompressedSizeKey.Int(13), + otelgrpc.RPCMessageUncompressedSizeKey.Int(13), + }, + }, + { + Name: "message", + Attributes: []attribute.KeyValue{ + otelgrpc.RPCMessageIDKey.Int(3), + otelgrpc.RPCMessageTypeKey.String("SENT"), + otelgrpc.RPCMessageCompressedSizeKey.Int(1839), + otelgrpc.RPCMessageUncompressedSizeKey.Int(1839), + }, + }, + { + Name: "message", + Attributes: []attribute.KeyValue{ + otelgrpc.RPCMessageIDKey.Int(3), + otelgrpc.RPCMessageTypeKey.String("RECEIVED"), + otelgrpc.RPCMessageCompressedSizeKey.Int(2659), + otelgrpc.RPCMessageUncompressedSizeKey.Int(2659), + }, + }, + { + Name: "message", + Attributes: []attribute.KeyValue{ + otelgrpc.RPCMessageIDKey.Int(4), + otelgrpc.RPCMessageTypeKey.String("SENT"), + otelgrpc.RPCMessageCompressedSizeKey.Int(45918), + otelgrpc.RPCMessageUncompressedSizeKey.Int(45918), + }, + }, + { + Name: "message", + Attributes: []attribute.KeyValue{ + otelgrpc.RPCMessageIDKey.Int(4), + otelgrpc.RPCMessageTypeKey.String("RECEIVED"), + otelgrpc.RPCMessageCompressedSizeKey.Int(58987), + otelgrpc.RPCMessageUncompressedSizeKey.Int(58987), + }, + }, + }, pingPong.Events()) + assert.ElementsMatch(t, []attribute.KeyValue{ + semconv.RPCMethodKey.String("FullDuplexCall"), + semconv.RPCServiceKey.String("grpc.testing.TestService"), + otelgrpc.RPCSystemGRPC, + otelgrpc.GRPCStatusCodeKey.Int64(int64(codes.OK)), + }, pingPong.Attributes()) +} + +func checkServerSpans(t *testing.T, spans []trace.ReadOnlySpan) { + require.Len(t, spans, 5) + + emptySpan := spans[0] + assert.False(t, emptySpan.EndTime().IsZero()) + assert.Equal(t, "grpc.testing.TestService/EmptyCall", emptySpan.Name()) + assertEvents(t, []trace.Event{ + { + Name: "message", + Attributes: []attribute.KeyValue{ + otelgrpc.RPCMessageIDKey.Int(1), + otelgrpc.RPCMessageTypeKey.String("RECEIVED"), + otelgrpc.RPCMessageCompressedSizeKey.Int(0), + otelgrpc.RPCMessageUncompressedSizeKey.Int(0), + }, + }, + { + Name: "message", + Attributes: []attribute.KeyValue{ + otelgrpc.RPCMessageIDKey.Int(1), + otelgrpc.RPCMessageTypeKey.String("SENT"), + otelgrpc.RPCMessageCompressedSizeKey.Int(0), + otelgrpc.RPCMessageUncompressedSizeKey.Int(0), + }, + }, + }, emptySpan.Events()) + assert.ElementsMatch(t, []attribute.KeyValue{ + semconv.RPCMethodKey.String("EmptyCall"), + semconv.RPCServiceKey.String("grpc.testing.TestService"), + otelgrpc.RPCSystemGRPC, + otelgrpc.GRPCStatusCodeKey.Int64(int64(codes.OK)), + }, emptySpan.Attributes()) + + largeSpan := spans[1] + assert.False(t, largeSpan.EndTime().IsZero()) + assert.Equal(t, "grpc.testing.TestService/UnaryCall", largeSpan.Name()) + assertEvents(t, []trace.Event{ + { + Name: "message", + Attributes: []attribute.KeyValue{ + otelgrpc.RPCMessageTypeKey.String("RECEIVED"), + otelgrpc.RPCMessageIDKey.Int(1), + otelgrpc.RPCMessageCompressedSizeKey.Int(271840), + otelgrpc.RPCMessageUncompressedSizeKey.Int(271840), + }, + }, + { + Name: "message", + Attributes: []attribute.KeyValue{ + otelgrpc.RPCMessageTypeKey.String("SENT"), + otelgrpc.RPCMessageIDKey.Int(1), + otelgrpc.RPCMessageCompressedSizeKey.Int(314167), + otelgrpc.RPCMessageUncompressedSizeKey.Int(314167), + }, + }, + }, largeSpan.Events()) + assert.ElementsMatch(t, []attribute.KeyValue{ + semconv.RPCMethodKey.String("UnaryCall"), + semconv.RPCServiceKey.String("grpc.testing.TestService"), + otelgrpc.RPCSystemGRPC, + otelgrpc.GRPCStatusCodeKey.Int64(int64(codes.OK)), + }, largeSpan.Attributes()) + + streamInput := spans[2] + assert.False(t, streamInput.EndTime().IsZero()) + assert.Equal(t, "grpc.testing.TestService/StreamingInputCall", streamInput.Name()) + assertEvents(t, []trace.Event{ + { + Name: "message", + Attributes: []attribute.KeyValue{ + otelgrpc.RPCMessageIDKey.Int(1), + otelgrpc.RPCMessageTypeKey.String("RECEIVED"), + otelgrpc.RPCMessageCompressedSizeKey.Int(27190), + otelgrpc.RPCMessageUncompressedSizeKey.Int(27190), + }, + }, + { + Name: "message", + Attributes: []attribute.KeyValue{ + otelgrpc.RPCMessageIDKey.Int(2), + otelgrpc.RPCMessageTypeKey.String("RECEIVED"), + otelgrpc.RPCMessageCompressedSizeKey.Int(12), + otelgrpc.RPCMessageUncompressedSizeKey.Int(12), + }, + }, + { + Name: "message", + Attributes: []attribute.KeyValue{ + otelgrpc.RPCMessageIDKey.Int(3), + otelgrpc.RPCMessageTypeKey.String("RECEIVED"), + otelgrpc.RPCMessageCompressedSizeKey.Int(1834), + otelgrpc.RPCMessageUncompressedSizeKey.Int(1834), + }, + }, + { + Name: "message", + Attributes: []attribute.KeyValue{ + otelgrpc.RPCMessageIDKey.Int(4), + otelgrpc.RPCMessageTypeKey.String("RECEIVED"), + otelgrpc.RPCMessageCompressedSizeKey.Int(45912), + otelgrpc.RPCMessageUncompressedSizeKey.Int(45912), + }, + }, + { + Name: "message", + Attributes: []attribute.KeyValue{ + otelgrpc.RPCMessageIDKey.Int(1), + otelgrpc.RPCMessageTypeKey.String("SENT"), + otelgrpc.RPCMessageCompressedSizeKey.Int(4), + otelgrpc.RPCMessageUncompressedSizeKey.Int(4), + }, + }, + // client does not record an event for the server response. + }, streamInput.Events()) + assert.ElementsMatch(t, []attribute.KeyValue{ + semconv.RPCMethodKey.String("StreamingInputCall"), + semconv.RPCServiceKey.String("grpc.testing.TestService"), + otelgrpc.RPCSystemGRPC, + otelgrpc.GRPCStatusCodeKey.Int64(int64(codes.OK)), + }, streamInput.Attributes()) + + streamOutput := spans[3] + assert.False(t, streamOutput.EndTime().IsZero()) + assert.Equal(t, "grpc.testing.TestService/StreamingOutputCall", streamOutput.Name()) + assertEvents(t, []trace.Event{ + { + Name: "message", + Attributes: []attribute.KeyValue{ + otelgrpc.RPCMessageIDKey.Int(1), + otelgrpc.RPCMessageTypeKey.String("RECEIVED"), + otelgrpc.RPCMessageCompressedSizeKey.Int(21), + otelgrpc.RPCMessageUncompressedSizeKey.Int(21), + }, + }, + { + Name: "message", + Attributes: []attribute.KeyValue{ + otelgrpc.RPCMessageIDKey.Int(1), + otelgrpc.RPCMessageTypeKey.String("SENT"), + otelgrpc.RPCMessageCompressedSizeKey.Int(31423), + otelgrpc.RPCMessageUncompressedSizeKey.Int(31423), + }, + }, + { + Name: "message", + Attributes: []attribute.KeyValue{ + otelgrpc.RPCMessageIDKey.Int(2), + otelgrpc.RPCMessageTypeKey.String("SENT"), + otelgrpc.RPCMessageCompressedSizeKey.Int(13), + otelgrpc.RPCMessageUncompressedSizeKey.Int(13), + }, + }, + { + Name: "message", + Attributes: []attribute.KeyValue{ + otelgrpc.RPCMessageIDKey.Int(3), + otelgrpc.RPCMessageTypeKey.String("SENT"), + otelgrpc.RPCMessageCompressedSizeKey.Int(2659), + otelgrpc.RPCMessageUncompressedSizeKey.Int(2659), + }, + }, + { + Name: "message", + Attributes: []attribute.KeyValue{ + otelgrpc.RPCMessageIDKey.Int(4), + otelgrpc.RPCMessageTypeKey.String("SENT"), + otelgrpc.RPCMessageCompressedSizeKey.Int(58987), + otelgrpc.RPCMessageUncompressedSizeKey.Int(58987), + }, + }, + }, streamOutput.Events()) + assert.ElementsMatch(t, []attribute.KeyValue{ + semconv.RPCMethodKey.String("StreamingOutputCall"), + semconv.RPCServiceKey.String("grpc.testing.TestService"), + otelgrpc.RPCSystemGRPC, + otelgrpc.GRPCStatusCodeKey.Int64(int64(codes.OK)), + }, streamOutput.Attributes()) + + pingPong := spans[4] + assert.False(t, pingPong.EndTime().IsZero()) + assert.Equal(t, "grpc.testing.TestService/FullDuplexCall", pingPong.Name()) + assertEvents(t, []trace.Event{ + { + Name: "message", + Attributes: []attribute.KeyValue{ + otelgrpc.RPCMessageIDKey.Int(1), + otelgrpc.RPCMessageTypeKey.String("RECEIVED"), + otelgrpc.RPCMessageCompressedSizeKey.Int(27196), + otelgrpc.RPCMessageUncompressedSizeKey.Int(27196), + }, + }, + { + Name: "message", + Attributes: []attribute.KeyValue{ + otelgrpc.RPCMessageIDKey.Int(1), + otelgrpc.RPCMessageTypeKey.String("SENT"), + otelgrpc.RPCMessageCompressedSizeKey.Int(31423), + otelgrpc.RPCMessageUncompressedSizeKey.Int(31423), + }, + }, + { + Name: "message", + Attributes: []attribute.KeyValue{ + otelgrpc.RPCMessageIDKey.Int(2), + otelgrpc.RPCMessageTypeKey.String("RECEIVED"), + otelgrpc.RPCMessageCompressedSizeKey.Int(16), + otelgrpc.RPCMessageUncompressedSizeKey.Int(16), + }, + }, + { + Name: "message", + Attributes: []attribute.KeyValue{ + otelgrpc.RPCMessageIDKey.Int(2), + otelgrpc.RPCMessageTypeKey.String("SENT"), + otelgrpc.RPCMessageCompressedSizeKey.Int(13), + otelgrpc.RPCMessageUncompressedSizeKey.Int(13), + }, + }, + { + Name: "message", + Attributes: []attribute.KeyValue{ + otelgrpc.RPCMessageIDKey.Int(3), + otelgrpc.RPCMessageTypeKey.String("RECEIVED"), + otelgrpc.RPCMessageCompressedSizeKey.Int(1839), + otelgrpc.RPCMessageUncompressedSizeKey.Int(1839), + }, + }, + { + Name: "message", + Attributes: []attribute.KeyValue{ + otelgrpc.RPCMessageIDKey.Int(3), + otelgrpc.RPCMessageTypeKey.String("SENT"), + otelgrpc.RPCMessageCompressedSizeKey.Int(2659), + otelgrpc.RPCMessageUncompressedSizeKey.Int(2659), + }, + }, + { + Name: "message", + Attributes: []attribute.KeyValue{ + otelgrpc.RPCMessageIDKey.Int(4), + otelgrpc.RPCMessageTypeKey.String("RECEIVED"), + otelgrpc.RPCMessageCompressedSizeKey.Int(45918), + otelgrpc.RPCMessageUncompressedSizeKey.Int(45918), + }, + }, + { + Name: "message", + Attributes: []attribute.KeyValue{ + otelgrpc.RPCMessageIDKey.Int(4), + otelgrpc.RPCMessageTypeKey.String("SENT"), + otelgrpc.RPCMessageCompressedSizeKey.Int(58987), + otelgrpc.RPCMessageUncompressedSizeKey.Int(58987), + }, + }, + }, pingPong.Events()) + assert.ElementsMatch(t, []attribute.KeyValue{ + semconv.RPCMethodKey.String("FullDuplexCall"), + semconv.RPCServiceKey.String("grpc.testing.TestService"), + otelgrpc.RPCSystemGRPC, + otelgrpc.GRPCStatusCodeKey.Int64(int64(codes.OK)), + }, pingPong.Attributes()) +}