Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

otlpmetrichttp otlptracehttp: Do not parse non-protobuf reponses #4719

Merged
merged 4 commits into from
Nov 16, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,11 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
- Remove the deprecated `go.opentelemetry.io/otel/exporters/otlp/otlpmetric` module. (#4707)
- Remove the deprecated `go.opentelemetry.io/otel/example/view` module. (#4708)

### Fixed

- Do not parse non-protobuf responses in `go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp`. (#4719)
- Do not parse non-protobuf responses in `go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp`. (#4719)

## [1.20.0/0.43.0] 2023-11-10

This release brings a breaking change for custom trace API implementations. Some interfaces (`TracerProvider`, `Tracer`, `Span`) now embed the `go.opentelemetry.io/otel/trace/embedded` types. Implementors need to update their implementations based on what they want the default behavior to be. See the "API Implementations" section of the [trace API] package documentation for more information about how to accomplish this.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,8 @@

// HTTPCollector is an OTLP HTTP server that collects all requests it receives.
type HTTPCollector struct {
plainTextResponse bool

headersMu sync.Mutex
headers http.Header
storage *Storage
Expand All @@ -217,7 +219,7 @@
// If errCh is not nil, the collector will respond to HTTP requests with errors
// sent on that channel. This means that if errCh is not nil Export calls will
// block until an error is received.
func NewHTTPCollector(endpoint string, resultCh <-chan ExportResult) (*HTTPCollector, error) {
func NewHTTPCollector(endpoint string, resultCh <-chan ExportResult, opts ...func(*HTTPCollector)) (*HTTPCollector, error) {

Check warning on line 222 in exporters/otlp/otlpmetric/otlpmetricgrpc/internal/otest/collector.go

View check run for this annotation

Codecov / codecov/patch

exporters/otlp/otlpmetric/otlpmetricgrpc/internal/otest/collector.go#L222

Added line #L222 was not covered by tests
u, err := url.Parse(endpoint)
if err != nil {
return nil, err
Expand All @@ -234,6 +236,9 @@
storage: NewStorage(),
resultCh: resultCh,
}
for _, opt := range opts {
opt(c)
}

Check warning on line 241 in exporters/otlp/otlpmetric/otlpmetricgrpc/internal/otest/collector.go

View check run for this annotation

Codecov / codecov/patch

exporters/otlp/otlpmetric/otlpmetricgrpc/internal/otest/collector.go#L239-L241

Added lines #L239 - L241 were not covered by tests

c.listener, err = net.Listen("tcp", u.Host)
if err != nil {
Expand Down Expand Up @@ -262,6 +267,14 @@
return c, nil
}

// WithHTTPCollectorRespondingPlainText makes the HTTPCollector return
// a plaintext, instead of protobuf, response.
func WithHTTPCollectorRespondingPlainText() func(*HTTPCollector) {
return func(s *HTTPCollector) {
s.plainTextResponse = true
}

Check warning on line 275 in exporters/otlp/otlpmetric/otlpmetricgrpc/internal/otest/collector.go

View check run for this annotation

Codecov / codecov/patch

exporters/otlp/otlpmetric/otlpmetricgrpc/internal/otest/collector.go#L272-L275

Added lines #L272 - L275 were not covered by tests
}

// Shutdown shuts down the HTTP server closing all open connections and
// listeners.
func (c *HTTPCollector) Shutdown(ctx context.Context) error {
Expand Down Expand Up @@ -382,6 +395,13 @@
return
}

if c.plainTextResponse {
w.Header().Set("Content-Type", "text/plain; charset=utf-8")
w.WriteHeader(http.StatusOK)
_, _ = w.Write([]byte("OK"))
return
}

Check warning on line 403 in exporters/otlp/otlpmetric/otlpmetricgrpc/internal/otest/collector.go

View check run for this annotation

Codecov / codecov/patch

exporters/otlp/otlpmetric/otlpmetricgrpc/internal/otest/collector.go#L398-L403

Added lines #L398 - L403 were not covered by tests

w.Header().Set("Content-Type", "application/x-protobuf")
w.WriteHeader(http.StatusOK)
if resp.Response == nil {
Expand Down
5 changes: 4 additions & 1 deletion exporters/otlp/otlpmetric/otlpmetrichttp/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,8 +166,11 @@ func (c *client) UploadMetrics(ctx context.Context, protoMetrics *metricpb.Resou
if _, err := io.Copy(&respData, resp.Body); err != nil {
return err
}
if respData.Len() == 0 {
return nil
}

if respData.Len() != 0 {
pellared marked this conversation as resolved.
Show resolved Hide resolved
if resp.Header.Get("Content-Type") == "application/x-protobuf" {
var respProto colmetricpb.ExportMetricsServiceResponse
if err := proto.Unmarshal(respData.Bytes(), &respProto); err != nil {
return err
Expand Down
18 changes: 18 additions & 0 deletions exporters/otlp/otlpmetric/otlpmetrichttp/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp/internal/otest"
"go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
mpb "go.opentelemetry.io/proto/otlp/metrics/v1"
)

type clientShim struct {
Expand Down Expand Up @@ -65,6 +66,23 @@ func TestClient(t *testing.T) {
t.Run("Integration", otest.RunClientTests(factory))
}

func TestClientWithHTTPCollectorRespondingPlainText(t *testing.T) {
ctx := context.Background()
coll, err := otest.NewHTTPCollector("", nil, otest.WithHTTPCollectorRespondingPlainText())
require.NoError(t, err)

addr := coll.Addr().String()
opts := []Option{WithEndpoint(addr), WithInsecure()}
cfg := oconf.NewHTTPConfig(asHTTPOptions(opts)...)
client, err := newClient(cfg)
require.NoError(t, err)

require.NoError(t, client.UploadMetrics(ctx, &mpb.ResourceMetrics{}))
require.NoError(t, client.Shutdown(ctx))
got := coll.Collect().Dump()
require.Len(t, got, 1, "upload of one ResourceMetrics")
}

func TestNewWithInvalidEndpoint(t *testing.T) {
ctx := context.Background()
exp, err := New(ctx, WithEndpoint("host:invalid-port"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,8 @@ func (e *HTTPResponseError) Unwrap() error { return e.Err }

// HTTPCollector is an OTLP HTTP server that collects all requests it receives.
type HTTPCollector struct {
plainTextResponse bool

headersMu sync.Mutex
headers http.Header
storage *Storage
Expand All @@ -217,7 +219,7 @@ type HTTPCollector struct {
// If errCh is not nil, the collector will respond to HTTP requests with errors
// sent on that channel. This means that if errCh is not nil Export calls will
// block until an error is received.
func NewHTTPCollector(endpoint string, resultCh <-chan ExportResult) (*HTTPCollector, error) {
func NewHTTPCollector(endpoint string, resultCh <-chan ExportResult, opts ...func(*HTTPCollector)) (*HTTPCollector, error) {
u, err := url.Parse(endpoint)
if err != nil {
return nil, err
Expand All @@ -234,6 +236,9 @@ func NewHTTPCollector(endpoint string, resultCh <-chan ExportResult) (*HTTPColle
storage: NewStorage(),
resultCh: resultCh,
}
for _, opt := range opts {
opt(c)
}

c.listener, err = net.Listen("tcp", u.Host)
if err != nil {
Expand Down Expand Up @@ -262,6 +267,14 @@ func NewHTTPCollector(endpoint string, resultCh <-chan ExportResult) (*HTTPColle
return c, nil
}

// WithHTTPCollectorRespondingPlainText makes the HTTPCollector return
// a plaintext, instead of protobuf, response.
func WithHTTPCollectorRespondingPlainText() func(*HTTPCollector) {
return func(s *HTTPCollector) {
s.plainTextResponse = true
}
}

// Shutdown shuts down the HTTP server closing all open connections and
// listeners.
func (c *HTTPCollector) Shutdown(ctx context.Context) error {
Expand Down Expand Up @@ -382,6 +395,13 @@ func (c *HTTPCollector) respond(w http.ResponseWriter, resp ExportResult) {
return
}

if c.plainTextResponse {
w.Header().Set("Content-Type", "text/plain; charset=utf-8")
w.WriteHeader(http.StatusOK)
_, _ = w.Write([]byte("OK"))
return
}

w.Header().Set("Content-Type", "application/x-protobuf")
w.WriteHeader(http.StatusOK)
if resp.Response == nil {
Expand Down
5 changes: 4 additions & 1 deletion exporters/otlp/otlptrace/otlptracehttp/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,8 +177,11 @@ func (d *client) UploadTraces(ctx context.Context, protoSpans []*tracepb.Resourc
if _, err := io.Copy(&respData, resp.Body); err != nil {
return err
}
if respData.Len() == 0 {
return nil
}

if respData.Len() != 0 {
if resp.Header.Get("Content-Type") == "application/x-protobuf" {
var respProto coltracepb.ExportTraceServiceResponse
if err := proto.Unmarshal(respData.Bytes(), &respProto); err != nil {
return err
Expand Down
21 changes: 21 additions & 0 deletions exporters/otlp/otlptrace/otlptracehttp/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -430,3 +430,24 @@ func TestOtherHTTPSuccess(t *testing.T) {
})
}
}

func TestCollectorRespondingNonProtobufContent(t *testing.T) {
mcCfg := mockCollectorConfig{
InjectContentType: "application/octet-stream",
}
mc := runMockCollector(t, mcCfg)
defer mc.MustStop(t)
driver := otlptracehttp.NewClient(
otlptracehttp.WithEndpoint(mc.Endpoint()),
otlptracehttp.WithInsecure(),
)
ctx := context.Background()
exporter, err := otlptrace.New(ctx, driver)
require.NoError(t, err)
defer func() {
assert.NoError(t, exporter.Shutdown(context.Background()))
}()
err = exporter.ExportSpans(ctx, otlptracetest.SingleReadOnlySpan())
assert.NoError(t, err)
assert.Len(t, mc.GetSpans(), 1)
}
22 changes: 21 additions & 1 deletion internal/shared/otlp/otlpmetric/otest/collector.go.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,8 @@ func (e *HTTPResponseError) Unwrap() error { return e.Err }

// HTTPCollector is an OTLP HTTP server that collects all requests it receives.
type HTTPCollector struct {
plainTextResponse bool

headersMu sync.Mutex
headers http.Header
storage *Storage
Expand All @@ -217,7 +219,7 @@ type HTTPCollector struct {
// If errCh is not nil, the collector will respond to HTTP requests with errors
// sent on that channel. This means that if errCh is not nil Export calls will
// block until an error is received.
func NewHTTPCollector(endpoint string, resultCh <-chan ExportResult) (*HTTPCollector, error) {
func NewHTTPCollector(endpoint string, resultCh <-chan ExportResult, opts ...func(*HTTPCollector)) (*HTTPCollector, error) {
u, err := url.Parse(endpoint)
if err != nil {
return nil, err
Expand All @@ -234,6 +236,9 @@ func NewHTTPCollector(endpoint string, resultCh <-chan ExportResult) (*HTTPColle
storage: NewStorage(),
resultCh: resultCh,
}
for _, opt := range opts {
opt(c)
}

c.listener, err = net.Listen("tcp", u.Host)
if err != nil {
Expand Down Expand Up @@ -262,6 +267,14 @@ func NewHTTPCollector(endpoint string, resultCh <-chan ExportResult) (*HTTPColle
return c, nil
}

// WithHTTPCollectorRespondingPlainText makes the HTTPCollector return
// a plaintext, instead of protobuf, response.
func WithHTTPCollectorRespondingPlainText() func(*HTTPCollector) {
return func(s *HTTPCollector) {
s.plainTextResponse = true
}
}

// Shutdown shuts down the HTTP server closing all open connections and
// listeners.
func (c *HTTPCollector) Shutdown(ctx context.Context) error {
Expand Down Expand Up @@ -382,6 +395,13 @@ func (c *HTTPCollector) respond(w http.ResponseWriter, resp ExportResult) {
return
}

if c.plainTextResponse {
w.Header().Set("Content-Type", "text/plain; charset=utf-8")
w.WriteHeader(http.StatusOK)
_, _ = w.Write([]byte("OK"))
return
}

w.Header().Set("Content-Type", "application/x-protobuf")
w.WriteHeader(http.StatusOK)
if resp.Response == nil {
Expand Down