Skip to content

Commit

Permalink
address review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
dmitryax committed Jul 29, 2023
1 parent c44d32d commit 2ff5402
Show file tree
Hide file tree
Showing 10 changed files with 94 additions and 153 deletions.
17 changes: 11 additions & 6 deletions exporter/exporterhelper/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,12 +75,14 @@ type baseSettings struct {
queueSettings
RetrySettings
RequestSender
requestExporter bool
}

// fromOptions returns the internal options starting from the default and applying all configured options.
func fromOptions(options ...Option) *baseSettings {
// Start from the default options:
opts := &baseSettings{
// fromOptions returns the baseSettings starting from the default and applying all configured options.
// requestExporter indicates whether the base settings are for a new request exporter or not.
func newBaseSettings(requestExporter bool, options ...Option) *baseSettings {
bs := &baseSettings{
requestExporter: requestExporter,
TimeoutSettings: NewDefaultTimeoutSettings(),
// TODO: Enable queuing by default (call DefaultQueueSettings)
queueSettings: queueSettings{
Expand All @@ -91,10 +93,10 @@ func fromOptions(options ...Option) *baseSettings {
}

for _, op := range options {
op(opts)
op(bs)
}

return opts
return bs
}

// Option apply changes to baseSettings.
Expand Down Expand Up @@ -137,6 +139,9 @@ func WithRetry(retrySettings RetrySettings) Option {
// Permanent queue is not yet available for exporter helpers V2, they ignore StorageID field.
func WithQueue(config QueueSettings) Option {
return func(o *baseSettings) {
if o.requestExporter {
panic("queueing is not supported for request exporters yet")
}
o.queueSettings.config = config
}
}
Expand Down
9 changes: 7 additions & 2 deletions exporter/exporterhelper/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,11 @@ var (
)

func TestBaseExporter(t *testing.T) {
be, err := newBaseExporter(defaultSettings, fromOptions(), "")
be, err := newBaseExporter(defaultSettings, newBaseSettings(false), "")
require.NoError(t, err)
require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost()))
require.NoError(t, be.Shutdown(context.Background()))
be, err = newBaseExporter(defaultSettings, newBaseSettings(true), "")
require.NoError(t, err)
require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost()))
require.NoError(t, be.Shutdown(context.Background()))
Expand All @@ -42,7 +46,8 @@ func TestBaseExporterWithOptions(t *testing.T) {
want := errors.New("my error")
be, err := newBaseExporter(
defaultSettings,
fromOptions(
newBaseSettings(
false,
WithStart(func(ctx context.Context, host component.Host) error { return want }),
WithShutdown(func(ctx context.Context) error { return want }),
WithTimeout(NewDefaultTimeoutSettings())),
Expand Down
8 changes: 4 additions & 4 deletions exporter/exporterhelper/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ func NewLogsExporter(
return nil, errNilPushLogsData
}

bs := fromOptions(options...)
bs := newBaseSettings(false, options...)
bs.marshaler = logsRequestMarshaler
bs.unmarshaler = newLogsRequestUnmarshalerFunc(pusher)
be, err := newBaseExporter(set, bs, component.DataTypeLogs)
Expand Down Expand Up @@ -127,8 +127,8 @@ type LogsConverter interface {
RequestFromLogs(context.Context, plog.Logs) (Request, error)
}

// NewLogsExporterV2 creates new logs exporter based on custom LogsConverter and RequestSender.
func NewLogsExporterV2(
// NewLogsRequestExporter creates new logs exporter based on custom LogsConverter and RequestSender.
func NewLogsRequestExporter(
_ context.Context,
set exporter.CreateSettings,
converter LogsConverter,
Expand All @@ -147,7 +147,7 @@ func NewLogsExporterV2(
return nil, errNilRequestSender
}

bs := fromOptions(options...)
bs := newBaseSettings(true, options...)
bs.RequestSender = sender

be, err := newBaseExporter(set, bs, component.DataTypeLogs)
Expand Down
52 changes: 13 additions & 39 deletions exporter/exporterhelper/logs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func TestLogsExporter_NilLogger(t *testing.T) {
}

func TestLogsExporterV2_NilLogger(t *testing.T) {
le, err := NewLogsExporterV2(context.Background(), exporter.CreateSettings{}, &fakeRequestConverter{}, newFakeRequestSender(nil))
le, err := NewLogsRequestExporter(context.Background(), exporter.CreateSettings{}, &fakeRequestConverter{}, newFakeRequestSender(nil))
require.Nil(t, le)
require.Equal(t, errNilLogger, err)
}
Expand All @@ -73,13 +73,13 @@ func TestLogsExporter_NilPushLogsData(t *testing.T) {
}

func TestLogsExporterV2_NilLogsConverter(t *testing.T) {
le, err := NewLogsExporterV2(context.Background(), exportertest.NewNopCreateSettings(), nil, newFakeRequestSender(nil))
le, err := NewLogsRequestExporter(context.Background(), exportertest.NewNopCreateSettings(), nil, newFakeRequestSender(nil))
require.Nil(t, le)
require.Equal(t, errNilLogsConverter, err)
}

func TestLogsExporterV2_NilRequestSender(t *testing.T) {
le, err := NewLogsExporterV2(context.Background(), exportertest.NewNopCreateSettings(), &fakeRequestConverter{}, nil)
le, err := NewLogsRequestExporter(context.Background(), exportertest.NewNopCreateSettings(), &fakeRequestConverter{}, nil)
require.Nil(t, le)
require.Equal(t, errNilRequestSender, err)
}
Expand All @@ -98,7 +98,7 @@ func TestLogsExporter_Default(t *testing.T) {

func TestLogsExporterV2_Default(t *testing.T) {
ld := plog.NewLogs()
le, err := NewLogsExporterV2(context.Background(), exportertest.NewNopCreateSettings(), &fakeRequestConverter{}, newFakeRequestSender(nil))
le, err := NewLogsRequestExporter(context.Background(), exportertest.NewNopCreateSettings(), &fakeRequestConverter{}, newFakeRequestSender(nil))
assert.NotNil(t, le)
assert.NoError(t, err)

Expand All @@ -119,7 +119,7 @@ func TestLogsExporter_WithCapabilities(t *testing.T) {

func TestLogsExporterV2_WithCapabilities(t *testing.T) {
capabilities := consumer.Capabilities{MutatesData: true}
le, err := NewLogsExporterV2(context.Background(), exportertest.NewNopCreateSettings(), &fakeRequestConverter{}, newFakeRequestSender(nil), WithCapabilities(capabilities))
le, err := NewLogsRequestExporter(context.Background(), exportertest.NewNopCreateSettings(), &fakeRequestConverter{}, newFakeRequestSender(nil), WithCapabilities(capabilities))
require.NoError(t, err)
require.NotNil(t, le)

Expand All @@ -138,7 +138,7 @@ func TestLogsExporter_Default_ReturnError(t *testing.T) {
func TestLogsExporterV2_Default_ConverterError(t *testing.T) {
ld := plog.NewLogs()
want := errors.New("convert_error")
le, err := NewLogsExporterV2(context.Background(), exportertest.NewNopCreateSettings(),
le, err := NewLogsRequestExporter(context.Background(), exportertest.NewNopCreateSettings(),
&fakeRequestConverter{logsError: want}, newFakeRequestSender(nil))
require.NoError(t, err)
require.NotNil(t, le)
Expand All @@ -148,7 +148,7 @@ func TestLogsExporterV2_Default_ConverterError(t *testing.T) {
func TestLogsExporterV2_Default_RequestSenderError(t *testing.T) {
ld := plog.NewLogs()
want := errors.New("send_error")
le, err := NewLogsExporterV2(context.Background(), exportertest.NewNopCreateSettings(),
le, err := NewLogsRequestExporter(context.Background(), exportertest.NewNopCreateSettings(),
&fakeRequestConverter{}, newFakeRequestSender(want))
require.NoError(t, err)
require.NotNil(t, le)
Expand All @@ -172,7 +172,7 @@ func TestLogsExporterV2_WithRecordMetrics(t *testing.T) {
require.NoError(t, err)
t.Cleanup(func() { require.NoError(t, tt.Shutdown(context.Background())) })

le, err := NewLogsExporterV2(context.Background(), tt.ToExporterCreateSettings(), &fakeRequestConverter{}, newFakeRequestSender(nil))
le, err := NewLogsRequestExporter(context.Background(), tt.ToExporterCreateSettings(), &fakeRequestConverter{}, newFakeRequestSender(nil))
require.NoError(t, err)
require.NotNil(t, le)

Expand All @@ -198,7 +198,7 @@ func TestLogsExporterV2_WithRecordMetrics_ReturnError(t *testing.T) {
require.NoError(t, err)
t.Cleanup(func() { require.NoError(t, tt.Shutdown(context.Background())) })

le, err := NewLogsExporterV2(context.Background(), tt.ToExporterCreateSettings(),
le, err := NewLogsRequestExporter(context.Background(), tt.ToExporterCreateSettings(),
&fakeRequestConverter{}, newFakeRequestSender(want))
require.Nil(t, err)
require.NotNil(t, le)
Expand Down Expand Up @@ -231,32 +231,6 @@ func TestLogsExporter_WithRecordEnqueueFailedMetrics(t *testing.T) {
checkExporterEnqueueFailedLogsStats(t, globalInstruments, fakeLogsExporterName, int64(15))
}

func TestLogsExporterV2_WithRecordEnqueueFailedMetrics(t *testing.T) {
tt, err := obsreporttest.SetupTelemetry(fakeLogsExporterV2Name)
require.NoError(t, err)
t.Cleanup(func() { require.NoError(t, tt.Shutdown(context.Background())) })

rCfg := NewDefaultRetrySettings()
qCfg := NewDefaultQueueSettings()
qCfg.NumConsumers = 1
qCfg.QueueSize = 2
wantErr := errors.New("some-error")
te, err := NewLogsExporterV2(context.Background(), tt.ToExporterCreateSettings(), &fakeRequestConverter{},
newFakeRequestSender(wantErr), WithRetry(rCfg), WithQueue(qCfg))
require.NoError(t, err)
require.NotNil(t, te)

md := testdata.GenerateLogs(3)
const numBatches = 7
for i := 0; i < numBatches; i++ {
// errors are checked in the checkExporterEnqueueFailedLogsStats function below.
_ = te.ConsumeLogs(context.Background(), md)
}

// 2 batched must be in queue, and 5 batches (15 log records) rejected due to queue overflow
checkExporterEnqueueFailedLogsStats(t, globalInstruments, fakeLogsExporterV2Name, int64(15))
}

func TestLogsExporter_WithSpan(t *testing.T) {
set := exportertest.NewNopCreateSettings()
sr := new(tracetest.SpanRecorder)
Expand All @@ -277,7 +251,7 @@ func TestLogsExporterV2_WithSpan(t *testing.T) {
otel.SetTracerProvider(set.TracerProvider)
defer otel.SetTracerProvider(trace.NewNoopTracerProvider())

le, err := NewLogsExporterV2(context.Background(), set, &fakeRequestConverter{}, newFakeRequestSender(nil))
le, err := NewLogsRequestExporter(context.Background(), set, &fakeRequestConverter{}, newFakeRequestSender(nil))
require.Nil(t, err)
require.NotNil(t, le)
checkWrapSpanForLogsExporter(t, sr, set.TracerProvider.Tracer("test"), le, nil, 1)
Expand Down Expand Up @@ -305,7 +279,7 @@ func TestLogsExporterV2_WithSpan_ReturnError(t *testing.T) {
defer otel.SetTracerProvider(trace.NewNoopTracerProvider())

want := errors.New("my_error")
le, err := NewLogsExporterV2(context.Background(), set, &fakeRequestConverter{}, newFakeRequestSender(want))
le, err := NewLogsRequestExporter(context.Background(), set, &fakeRequestConverter{}, newFakeRequestSender(want))
require.Nil(t, err)
require.NotNil(t, le)
checkWrapSpanForLogsExporter(t, sr, set.TracerProvider.Tracer("test"), le, want, 1)
Expand All @@ -327,7 +301,7 @@ func TestLogsExporterV2_WithShutdown(t *testing.T) {
shutdownCalled := false
shutdown := func(context.Context) error { shutdownCalled = true; return nil }

le, err := NewLogsExporterV2(context.Background(), exportertest.NewNopCreateSettings(), &fakeRequestConverter{}, newFakeRequestSender(nil), WithShutdown(shutdown))
le, err := NewLogsRequestExporter(context.Background(), exportertest.NewNopCreateSettings(), &fakeRequestConverter{}, newFakeRequestSender(nil), WithShutdown(shutdown))
assert.NotNil(t, le)
assert.NoError(t, err)

Expand All @@ -350,7 +324,7 @@ func TestLogsExporterV2_WithShutdown_ReturnError(t *testing.T) {
want := errors.New("my_error")
shutdownErr := func(context.Context) error { return want }

le, err := NewLogsExporterV2(context.Background(), exportertest.NewNopCreateSettings(), &fakeRequestConverter{}, newFakeRequestSender(nil), WithShutdown(shutdownErr))
le, err := NewLogsRequestExporter(context.Background(), exportertest.NewNopCreateSettings(), &fakeRequestConverter{}, newFakeRequestSender(nil), WithShutdown(shutdownErr))
assert.NotNil(t, le)
assert.NoError(t, err)

Expand Down
8 changes: 4 additions & 4 deletions exporter/exporterhelper/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ func NewMetricsExporter(
return nil, errNilPushMetricsData
}

bs := fromOptions(options...)
bs := newBaseSettings(false, options...)
bs.marshaler = metricsRequestMarshaler
bs.unmarshaler = newMetricsRequestUnmarshalerFunc(pusher)
be, err := newBaseExporter(set, bs, component.DataTypeMetrics)
Expand Down Expand Up @@ -128,8 +128,8 @@ type MetricsConverter interface {
RequestFromMetrics(context.Context, pmetric.Metrics) (Request, error)
}

// NewMetricsExporterV2 creates a new metrics exporter based on a custom TracesConverter and RequestSender.
func NewMetricsExporterV2(
// NewMetricsRequestExporter creates a new metrics exporter based on a custom TracesConverter and RequestSender.
func NewMetricsRequestExporter(
_ context.Context,
set exporter.CreateSettings,
converter MetricsConverter,
Expand All @@ -148,7 +148,7 @@ func NewMetricsExporterV2(
return nil, errNilRequestSender
}

bs := fromOptions(options...)
bs := newBaseSettings(true, options...)
bs.RequestSender = sender

be, err := newBaseExporter(set, bs, component.DataTypeMetrics)
Expand Down

0 comments on commit 2ff5402

Please sign in to comment.