Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

otelhttp: client metrics #4707

Merged
merged 16 commits into from
Feb 2, 2024
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
13 changes: 7 additions & 6 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,13 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
### Added

- Add the new `go.opentelemetry.io/contrib/instrgen` package to provide auto-generated source code instrumentation. (#3068, #3108)
- Add client metric support to `go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp`. (#4707)
- Add peer attributes to spans recorded by `NewClientHandler`, `NewServerHandler` in `go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc`. (#4873)

### Deprecated

- The `RequestCount`, `RequestContentLength`, `ResponseContentLength`, `ServerLatency` constants in `go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp` are deprecated. (#4707)

### Fixed

- Do not panic in `go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc` if `MeterProvider` returns a `nil` instrument. (#4875)
Expand All @@ -22,7 +27,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
### Added

- Add `SDK.Shutdown` method in `"go.opentelemetry.io/contrib/config"`. (#4583)
- Add client metric support to `go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp`. (#4707)
- `NewSDK` in `go.opentelemetry.io/contrib/config` now returns a configured SDK with a valid `TracerProvider`. (#4741)

### Changed

Expand All @@ -34,7 +39,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
- The semantic conventions used by `go.opentelemetry.io/contrib/instrumentation/net/http/httptrace/otelhttptrace` are upgraded to v1.20.0. (#4320)
- The semantic conventions used by `go.opentelemetry.io/contrib/instrumentation/net/http/httptrace/otelhttptrace/example` are upgraded to v1.20.0. (#4320)
- The semantic conventions used by `go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp/example` are upgraded to v1.20.0. (#4320)
- The semantic conventions used by `go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp` are upgraded to v1.20.0. (#4320, #4707)
- The semantic conventions used by `go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp`are upgraded to v1.20.0. (#4320)
- Updated configuration schema to include `schema_url` for resource definition and `without_type_suffix` and `without_units` for the Prometheus exporter. (#4727)
- The semantic conventions used by the `go.opentelemetry.io/contrib/detectors/aws/ecs` resource detector are upgraded to v1.24.0. (#4803)
- The semantic conventions used by the `go.opentelemetry.io/contrib/detectors/aws/lambda` resource detector are upgraded to v1.24.0. (#4803)
Expand All @@ -43,10 +48,6 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
- The semantic conventions used by the `go.opentelemetry.io/contrib/detectors/gcp` resource detector are upgraded to v1.24.0. (#4803)
- The semantic conventions used in `go.opentelemetry.io/contrib/instrumentation/github.com/aws/aws-lambda-go/otellambda/test` are upgraded to v1.24.0. (#4803)

### Removed

- Remove `RequestCount`, `RequestContentLength`, `ResponseContentLength`, `ServerLatency` constants from `go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp`. (#4707)

### Fixed

- Fix `NewServerHandler` in `go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc` to correctly set the span status depending on the gRPC status. (#4587)
Expand Down
8 changes: 8 additions & 0 deletions instrumentation/net/http/otelhttp/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,14 @@ const (

// Server HTTP metrics.
const (
// Deprecated: This field is unused.
RequestCount = "http.server.request_count" // Incoming request count total
// Deprecated: Use of this field has been migrated to serverRequestSize. It will be removed in a future version.
RequestContentLength = "http.server.request_content_length" // Incoming request bytes total
// Deprecated: Use of this field has been migrated to serverResponseSize. It will be removed in a future version.
ResponseContentLength = "http.server.response_content_length" // Incoming response bytes total
// Deprecated: Use of this field has been migrated to serverDuration. It will be removed in a future version.
ServerLatency = "http.server.duration" // Incoming end to end duration, milliseconds
serverRequestSize = "http.server.request.size" // Incoming request bytes total
serverResponseSize = "http.server.response.size" // Incoming response bytes total
serverDuration = "http.server.duration" // Incoming end to end duration, milliseconds
Expand Down
71 changes: 67 additions & 4 deletions instrumentation/net/http/otelhttp/test/transport_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,7 @@ func TestTransportMetrics(t *testing.T) {
semconv.HTTPMethod("GET"),
semconv.HTTPStatusCode(200),
)
assertClientScopeMetrics(t, rm.ScopeMetrics[0], attrs)
assertClientScopeMetrics(t, rm.ScopeMetrics[0], attrs, 13)
})

t.Run("make http request and buffer response", func(t *testing.T) {
Expand Down Expand Up @@ -377,11 +377,74 @@ func TestTransportMetrics(t *testing.T) {
semconv.HTTPMethod("GET"),
semconv.HTTPStatusCode(200),
)
assertClientScopeMetrics(t, rm.ScopeMetrics[0], attrs)
assertClientScopeMetrics(t, rm.ScopeMetrics[0], attrs, 13)
})

t.Run("make http request and close body before reading completely", func(t *testing.T) {
reader := metric.NewManualReader()
meterProvider := metric.NewMeterProvider(metric.WithReader(reader))

ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
if _, err := w.Write(responseBody); err != nil {
t.Fatal(err)
}
}))
defer ts.Close()

r, err := http.NewRequest(http.MethodGet, ts.URL, bytes.NewReader(requestBody))
if err != nil {
t.Fatal(err)
}

tr := otelhttp.NewTransport(
http.DefaultTransport,
otelhttp.WithMeterProvider(meterProvider),
)

c := http.Client{Transport: tr}
res, err := c.Do(r)
if err != nil {
t.Fatal(err)
}

// Must read the body or else we won't get response metrics
smallBuf := make([]byte, 10)

// Read first 10 bytes
bc, err := res.Body.Read(smallBuf)
if err != nil {
t.Fatal(err)
}
require.Equal(t, 10, bc)

// close the response body early
require.NoError(t, res.Body.Close())

host, portStr, _ := net.SplitHostPort(r.Host)
if host == "" {
host = "127.0.0.1"
}
port, err := strconv.Atoi(portStr)
if err != nil {
port = 0
}

rm := metricdata.ResourceMetrics{}
err = reader.Collect(context.Background(), &rm)
require.NoError(t, err)
require.Len(t, rm.ScopeMetrics, 1)
attrs := attribute.NewSet(
semconv.NetPeerName(host),
semconv.NetPeerPort(port),
semconv.HTTPMethod("GET"),
semconv.HTTPStatusCode(200),
)
assertClientScopeMetrics(t, rm.ScopeMetrics[0], attrs, 10)
})
}

func assertClientScopeMetrics(t *testing.T, sm metricdata.ScopeMetrics, attrs attribute.Set) {
func assertClientScopeMetrics(t *testing.T, sm metricdata.ScopeMetrics, attrs attribute.Set, rxBytes int64) {
assert.Equal(t, instrumentation.Scope{
Name: "go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp",
Version: Version(),
Expand All @@ -404,7 +467,7 @@ func assertClientScopeMetrics(t *testing.T, sm metricdata.ScopeMetrics, attrs at
want = metricdata.Metrics{
Name: "http.client.response.size",
Data: metricdata.Sum[int64]{
DataPoints: []metricdata.DataPoint[int64]{{Attributes: attrs, Value: 13}},
DataPoints: []metricdata.DataPoint[int64]{{Attributes: attrs, Value: rxBytes}},
Temporality: metricdata.CumulativeTemporality,
IsMonotonic: true,
},
Expand Down
28 changes: 19 additions & 9 deletions instrumentation/net/http/otelhttp/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,14 +153,15 @@ func (t *Transport) RoundTrip(r *http.Request) (*http.Response, error) {

r = r.Clone(ctx) // According to RoundTripper spec, we shouldn't modify the origin request.

writeRecordFunc := func(int64) {}
// use a body wrapper to determine the request size
var bw bodyWrapper
// if request body is nil or NoBody, we don't want to mutate the body as it
// will affect the identity of it in an unforeseeable way because we assert
// ReadCloser fulfills a certain interface and it is indeed nil or NoBody.
if r.Body != nil && r.Body != http.NoBody {
bw.ReadCloser = r.Body
bw.record = writeRecordFunc
// noop to prevent nil panic. not using this record fun yet.
bw.record = func(int64) {}
r.Body = &bw
}

Expand Down Expand Up @@ -226,10 +227,11 @@ func newWrappedBody(span trace.Span, record func(n int64), body io.ReadCloser) i
// If the response body implements the io.Writer interface (i.e. for
// successful protocol switches), the wrapped body also will.
type wrappedBody struct {
span trace.Span
record func(n int64)
body io.ReadCloser
read atomic.Int64
span trace.Span
recorded atomic.Bool
record func(n int64)
body io.ReadCloser
read atomic.Int64
}

var _ io.ReadWriteCloser = &wrappedBody{}
Expand All @@ -246,15 +248,14 @@ func (wb *wrappedBody) Write(p []byte) (int, error) {

func (wb *wrappedBody) Read(b []byte) (int, error) {
n, err := wb.body.Read(b)
// Locally record the number of bytes read
// Record the number of bytes read
wb.read.Add(int64(n))

switch err {
case nil:
// nothing to do here but fall through to the return
case io.EOF:
// Record the total number of bytes read
wb.record(wb.read.Load())
wb.recordBytesRead()
wb.span.End()
default:
wb.span.RecordError(err)
Expand All @@ -263,7 +264,16 @@ func (wb *wrappedBody) Read(b []byte) (int, error) {
return n, err
}

// recordBytesRead is a function that ensures the number of bytes read is recorded once and only once.
func (wb *wrappedBody) recordBytesRead() {
if wb.recorded.CompareAndSwap(false, true) {
Sovietaced marked this conversation as resolved.
Show resolved Hide resolved
// Record the total number of bytes read
wb.record(wb.read.Load())
}
}

func (wb *wrappedBody) Close() error {
wb.recordBytesRead()
wb.span.End()
if wb.body != nil {
return wb.body.Close()
Expand Down
4 changes: 2 additions & 2 deletions instrumentation/net/http/otelhttp/transport_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,7 @@ func TestWrappedBodyClose(t *testing.T) {
wb := &wrappedBody{span: trace.Span(s), record: record, body: readCloser{}}
assert.NoError(t, wb.Close())
s.assert(t, true, nil, codes.Unset, "")
assert.False(t, called, "record should not have been called")
assert.True(t, called, "record should not have been called")
Sovietaced marked this conversation as resolved.
Show resolved Hide resolved
}

func TestWrappedBodyClosePanic(t *testing.T) {
Expand All @@ -312,7 +312,7 @@ func TestWrappedBodyCloseError(t *testing.T) {
wb := &wrappedBody{span: trace.Span(s), record: record, body: readCloser{closeErr: expectedErr}}
assert.Equal(t, expectedErr, wb.Close())
s.assert(t, true, nil, codes.Unset, "")
assert.False(t, called, "record should not have been called")
assert.True(t, called, "record should not have been called")
Sovietaced marked this conversation as resolved.
Show resolved Hide resolved
}

type readWriteCloser struct {
Expand Down