Skip to content

Commit

Permalink
Handle 2xx as success for OTLP HTTP trace and metric exporters (#4365)
Browse files Browse the repository at this point in the history
* handle no content responses for otlpmetric exporter

* handle no content responses for otlptraces http exporter

* add changelog entry

* add a ResponseStatus attribute rather than using error

* accept any status code between 200 and 299

* rename i to code

* switch require to assert

* shutdown the client in defer

---------

Co-authored-by: Tyler Yahn <MrAlias@users.noreply.github.com>
  • Loading branch information
dmathieu and MrAlias committed Aug 7, 2023
1 parent b221025 commit 10099bb
Show file tree
Hide file tree
Showing 6 changed files with 74 additions and 10 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Expand Up @@ -21,6 +21,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
- Add info and debug logging to the metric SDK. (#4315)
- The `go.opentelemetry.io/otel/semconv/v1.21.0` package.
The package contains semantic conventions from the `v1.21.0` version of the OpenTelemetry Semantic Conventions. (#4362)
- Accept 201 to 299 HTTP status as success in `go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp` and `go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp`. (#4365)
- Document the `Temporality` and `Aggregation` methods of the `"go.opentelemetry.io/otel/sdk/metric".Exporter"` need to be concurrent safe. (#4381)
- Expand the set of units supported by the prometheus exporter, and don't add unit suffixes if they are already present in `go.opentelemetry.op/otel/exporters/prometheus` (#4374)

Expand Down
28 changes: 28 additions & 0 deletions exporters/otlp/otlpmetric/internal/otest/client.go
Expand Up @@ -272,6 +272,34 @@ func RunClientTests(f ClientFactory) func(*testing.T) {
want := fmt.Sprintf("%s (%d metric data points rejected)", msg, n)
assert.ErrorContains(t, errs[0], want)
})

t.Run("Other HTTP success", func(t *testing.T) {
for code := 201; code <= 299; code++ {
t.Run(fmt.Sprintf("status_%d", code), func(t *testing.T) {
rCh := make(chan ExportResult, 1)
rCh <- ExportResult{
ResponseStatus: code,
}

ctx := context.Background()
client, _ := f(rCh)
defer func() {
assert.NoError(t, client.Shutdown(ctx))
}()

defer func(orig otel.ErrorHandler) {
otel.SetErrorHandler(orig)
}(otel.GetErrorHandler())

errs := []error{}
eh := otel.ErrorHandlerFunc(func(e error) { errs = append(errs, e) })
otel.SetErrorHandler(eh)

assert.NoError(t, client.UploadMetrics(ctx, nil))
assert.Equal(t, 0, len(errs))
})
}
})
}
}

Expand Down
11 changes: 8 additions & 3 deletions exporters/otlp/otlpmetric/internal/otest/collector.go
Expand Up @@ -50,8 +50,9 @@ type Collector interface {
}

type ExportResult struct {
Response *collpb.ExportMetricsServiceResponse
Err error
Response *collpb.ExportMetricsServiceResponse
ResponseStatus int
Err error
}

// Storage stores uploaded OTLP metric data in their proto form.
Expand Down Expand Up @@ -376,7 +377,11 @@ func (c *HTTPCollector) respond(w http.ResponseWriter, resp ExportResult) {
}

w.Header().Set("Content-Type", "application/x-protobuf")
w.WriteHeader(http.StatusOK)
if resp.ResponseStatus != 0 {
w.WriteHeader(resp.ResponseStatus)
} else {
w.WriteHeader(http.StatusOK)
}
if resp.Response == nil {
_, _ = w.Write(emptyExportMetricsServiceResponse)
} else {
Expand Down
7 changes: 3 additions & 4 deletions exporters/otlp/otlpmetric/otlpmetrichttp/client.go
Expand Up @@ -153,8 +153,8 @@ func (c *client) UploadMetrics(ctx context.Context, protoMetrics *metricpb.Resou
}

var rErr error
switch resp.StatusCode {
case http.StatusOK:
switch sc := resp.StatusCode; {
case sc >= 200 && sc <= 299:
// Success, do not retry.

// Read the partial success message, if any.
Expand All @@ -179,8 +179,7 @@ func (c *client) UploadMetrics(ctx context.Context, protoMetrics *metricpb.Resou
}
}
return nil
case http.StatusTooManyRequests,
http.StatusServiceUnavailable:
case sc == http.StatusTooManyRequests, sc == http.StatusServiceUnavailable:
// Retry-able failure.
rErr = newResponseError(resp.Header)

Expand Down
6 changes: 3 additions & 3 deletions exporters/otlp/otlptrace/otlptracehttp/client.go
Expand Up @@ -164,8 +164,8 @@ func (d *client) UploadTraces(ctx context.Context, protoSpans []*tracepb.Resourc
}()
}

switch resp.StatusCode {
case http.StatusOK:
switch sc := resp.StatusCode; {
case sc >= 200 && sc <= 299:
// Success, do not retry.
// Read the partial success message, if any.
var respData bytes.Buffer
Expand All @@ -190,7 +190,7 @@ func (d *client) UploadTraces(ctx context.Context, protoSpans []*tracepb.Resourc
}
return nil

case http.StatusTooManyRequests, http.StatusServiceUnavailable:
case sc == http.StatusTooManyRequests, sc == http.StatusServiceUnavailable:
// Retry-able failures. Drain the body to reuse the connection.
if _, err := io.Copy(io.Discard, resp.Body); err != nil {
otel.Handle(err)
Expand Down
31 changes: 31 additions & 0 deletions exporters/otlp/otlptrace/otlptracehttp/client_test.go
Expand Up @@ -401,3 +401,34 @@ func TestPartialSuccess(t *testing.T) {
require.Contains(t, errs[0].Error(), "partially successful")
require.Contains(t, errs[0].Error(), "2 spans rejected")
}

func TestOtherHTTPSuccess(t *testing.T) {
for code := 201; code <= 299; code++ {
t.Run(fmt.Sprintf("status_%d", code), func(t *testing.T) {
mcCfg := mockCollectorConfig{
InjectHTTPStatus: []int{code},
}
mc := runMockCollector(t, mcCfg)
defer mc.MustStop(t)
driver := otlptracehttp.NewClient(
otlptracehttp.WithEndpoint(mc.Endpoint()),
otlptracehttp.WithInsecure(),
)
ctx := context.Background()
exporter, err := otlptrace.New(ctx, driver)
require.NoError(t, err)
defer func() {
assert.NoError(t, exporter.Shutdown(context.Background()))
}()

errs := []error{}
otel.SetErrorHandler(otel.ErrorHandlerFunc(func(err error) {
errs = append(errs, err)
}))
err = exporter.ExportSpans(ctx, otlptracetest.SingleReadOnlySpan())
assert.NoError(t, err)

assert.Equal(t, 0, len(errs))
})
}
}

0 comments on commit 10099bb

Please sign in to comment.