Skip to content

Commit

Permalink
[exporterhelper] New exporter helper with custom requests
Browse files Browse the repository at this point in the history
Introduce a new exporter helper that operates over client provided requests instead of pdata. 

It opens a door for moving batching to the exporter where batches will be built from clients data format, instead of pdata. The batches can be properly sized by custom request size which can be different from OTLP. The same custom request sizing will be applied to the sending queue. It will also improve performance of the sending queue retries for non-OTLP exporters, they don't need to translate pdata on every retry. 

This is an experimental API, once stabilized it's intended to replace the existing helpers.
  • Loading branch information
dmitryax committed Jun 30, 2023
1 parent 50c94c9 commit 5fc2162
Show file tree
Hide file tree
Showing 20 changed files with 985 additions and 112 deletions.
12 changes: 12 additions & 0 deletions .chloggen/exporter-helper-v2.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
component: exporter/exporterhelper

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Introduce a new exporter helper that operates over client-provided requests instead of pdata

# One or more tracking issues or pull requests related to the change
issues: [7874]

26 changes: 20 additions & 6 deletions exporter/exporterhelper/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,14 +56,25 @@ func (req *baseRequest) OnProcessingFinished() {
}
}

type queueSettings struct {
config QueueSettings
marshaler internal.RequestMarshaler
unmarshaler internal.RequestUnmarshaler
}

func (qs *queueSettings) persistenceEnabled() bool {
return qs.config.StorageID != nil && qs.marshaler != nil && qs.unmarshaler != nil
}

// baseSettings represents all the options that users can configure.
type baseSettings struct {
component.StartFunc
component.ShutdownFunc
consumerOptions []consumer.Option
TimeoutSettings
QueueSettings
queueSettings
RetrySettings
RequestSender
}

// fromOptions returns the internal options starting from the default and applying all configured options.
Expand All @@ -72,7 +83,9 @@ func fromOptions(options ...Option) *baseSettings {
opts := &baseSettings{
TimeoutSettings: NewDefaultTimeoutSettings(),
// TODO: Enable queuing by default (call DefaultQueueSettings)
QueueSettings: QueueSettings{Enabled: false},
queueSettings: queueSettings{
config: QueueSettings{Enabled: false},
},
// TODO: Enable retry by default (call DefaultRetrySettings)
RetrySettings: RetrySettings{Enabled: false},
}
Expand Down Expand Up @@ -121,9 +134,10 @@ func WithRetry(retrySettings RetrySettings) Option {

// WithQueue overrides the default QueueSettings for an exporter.
// The default QueueSettings is to disable queueing.
func WithQueue(queueSettings QueueSettings) Option {
// Permanent queue is not yet available for exporter helpers V2, they ignore StorageID field.
func WithQueue(config QueueSettings) Option {
return func(o *baseSettings) {
o.QueueSettings = queueSettings
o.queueSettings.config = config
}
}

Expand All @@ -145,7 +159,7 @@ type baseExporter struct {
qrSender *queuedRetrySender
}

func newBaseExporter(set exporter.CreateSettings, bs *baseSettings, signal component.DataType, reqUnmarshaler internal.RequestUnmarshaler) (*baseExporter, error) {
func newBaseExporter(set exporter.CreateSettings, bs *baseSettings, signal component.DataType) (*baseExporter, error) {
be := &baseExporter{}

var err error
Expand All @@ -154,7 +168,7 @@ func newBaseExporter(set exporter.CreateSettings, bs *baseSettings, signal compo
return nil, err
}

be.qrSender = newQueuedRetrySender(set.ID, signal, bs.QueueSettings, bs.RetrySettings, reqUnmarshaler, &timeoutSender{cfg: bs.TimeoutSettings}, set.Logger)
be.qrSender = newQueuedRetrySender(set.ID, signal, bs.queueSettings, bs.RetrySettings, &timeoutSender{cfg: bs.TimeoutSettings}, set.Logger)
be.sender = be.qrSender
be.StartFunc = func(ctx context.Context, host component.Host) error {
// First start the wrapped exporter.
Expand Down
16 changes: 1 addition & 15 deletions exporter/exporterhelper/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,8 @@ import (

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/exporter"
"go.opentelemetry.io/collector/exporter/exporterhelper/internal"
"go.opentelemetry.io/collector/exporter/exportertest"
"go.opentelemetry.io/collector/pdata/ptrace"
)

var (
Expand All @@ -35,7 +32,7 @@ var (
)

func TestBaseExporter(t *testing.T) {
be, err := newBaseExporter(defaultSettings, fromOptions(), "", nopRequestUnmarshaler())
be, err := newBaseExporter(defaultSettings, fromOptions(), "")
require.NoError(t, err)
require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost()))
require.NoError(t, be.Shutdown(context.Background()))
Expand All @@ -50,7 +47,6 @@ func TestBaseExporterWithOptions(t *testing.T) {
WithShutdown(func(ctx context.Context) error { return want }),
WithTimeout(NewDefaultTimeoutSettings())),
"",
nopRequestUnmarshaler(),
)
require.NoError(t, err)
require.Equal(t, want, be.Start(context.Background(), componenttest.NewNopHost()))
Expand All @@ -65,13 +61,3 @@ func checkStatus(t *testing.T, sd sdktrace.ReadOnlySpan, err error) {
require.Equal(t, codes.Unset, sd.Status().Code, "SpanData %v", sd)
}
}

func nopTracePusher() consumer.ConsumeTracesFunc {
return func(ctx context.Context, ld ptrace.Traces) error {
return nil
}
}

func nopRequestUnmarshaler() internal.RequestUnmarshaler {
return newTraceRequestUnmarshalerFunc(nopTracePusher())
}
8 changes: 8 additions & 0 deletions exporter/exporterhelper/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,12 @@ var (
errNilPushMetricsData = errors.New("nil PushMetrics")
// errNilPushLogsData is returned when a nil PushLogs is given.
errNilPushLogsData = errors.New("nil PushLogs")
// errNilTracesConverter is returned when a nil TracesConverter is given.
errNilTracesConverter = errors.New("nil TracesConverter")
// errNilMetricsConverter is returned when a nil MetricsConverter is given.
errNilMetricsConverter = errors.New("nil MetricsConverter")
// errNilLogsConverter is returned when a nil LogsConverter is given.
errNilLogsConverter = errors.New("nil LogsConverter")
// errNilRequestSender is returned when a nil RequestSender is given.
errNilRequestSender = errors.New("nil RequestSender")
)
14 changes: 12 additions & 2 deletions exporter/exporterhelper/internal/persistent_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,21 @@ func buildPersistentStorageName(name string, signal component.DataType) string {
return fmt.Sprintf("%s-%s", name, signal)
}

type PersistentQueueSettings struct {
Name string
Signal component.DataType
Capacity uint64
Logger *zap.Logger
Client storage.Client
Unmarshaler RequestUnmarshaler
Marshaler RequestMarshaler
}

// NewPersistentQueue creates a new queue backed by file storage; name and signal must be a unique combination that identifies the queue storage
func NewPersistentQueue(ctx context.Context, name string, signal component.DataType, capacity int, logger *zap.Logger, client storage.Client, unmarshaler RequestUnmarshaler) ProducerConsumerQueue {
func NewPersistentQueue(ctx context.Context, params PersistentQueueSettings) ProducerConsumerQueue {
return &persistentQueue{
stopChan: make(chan struct{}),
storage: newPersistentContiguousStorage(ctx, buildPersistentStorageName(name, signal), uint64(capacity), logger, client, unmarshaler),
storage: newPersistentContiguousStorage(ctx, buildPersistentStorageName(params.Name, params.Signal), params),
}
}

Expand Down
10 changes: 9 additions & 1 deletion exporter/exporterhelper/internal/persistent_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,15 @@ func createTestQueue(extension storage.Extension, capacity int) *persistentQueue
panic(err)
}

wq := NewPersistentQueue(context.Background(), "foo", component.DataTypeTraces, capacity, logger, client, newFakeTracesRequestUnmarshalerFunc())
wq := NewPersistentQueue(context.Background(), PersistentQueueSettings{
Name: "foo",
Signal: component.DataTypeTraces,
Capacity: uint64(capacity),
Logger: logger,
Client: client,
Unmarshaler: newFakeTracesRequestUnmarshalerFunc(),
Marshaler: newFakeTracesRequestMarshalerFunc(),
})
return wq.(*persistentQueue)
}

Expand Down
14 changes: 8 additions & 6 deletions exporter/exporterhelper/internal/persistent_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ type persistentContiguousStorage struct {
queueName string
client storage.Client
unmarshaler RequestUnmarshaler
marshaler RequestMarshaler

putChan chan struct{}
stopChan chan struct{}
Expand Down Expand Up @@ -80,14 +81,15 @@ var (

// newPersistentContiguousStorage creates a new file-storage extension backed queue;
// queueName parameter must be a unique value that identifies the queue.
func newPersistentContiguousStorage(ctx context.Context, queueName string, capacity uint64, logger *zap.Logger, client storage.Client, unmarshaler RequestUnmarshaler) *persistentContiguousStorage {
func newPersistentContiguousStorage(ctx context.Context, queueName string, set PersistentQueueSettings) *persistentContiguousStorage {
pcs := &persistentContiguousStorage{
logger: logger,
client: client,
logger: set.Logger,
client: set.Client,
queueName: queueName,
unmarshaler: unmarshaler,
capacity: capacity,
putChan: make(chan struct{}, capacity),
unmarshaler: set.Unmarshaler,
marshaler: set.Marshaler,
capacity: set.Capacity,
putChan: make(chan struct{}, set.Capacity),
reqChan: make(chan Request),
stopChan: make(chan struct{}),
itemsCount: &atomic.Uint64{},
Expand Down
6 changes: 3 additions & 3 deletions exporter/exporterhelper/internal/persistent_storage_batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ func (bof *batchStruct) getItemIndexArrayResult(key string) ([]itemIndex, error)

// setRequest adds Set operation over a given request to the batch
func (bof *batchStruct) setRequest(key string, value Request) *batchStruct {
return bof.set(key, value, requestToBytes)
return bof.set(key, value, bof.requestToBytes)
}

// setItemIndex adds Set operation over a given itemIndex to the batch
Expand Down Expand Up @@ -206,8 +206,8 @@ func bytesToItemIndexArray(b []byte) (any, error) {
return val, err
}

func requestToBytes(req any) ([]byte, error) {
return req.(Request).Marshal()
func (bof *batchStruct) requestToBytes(req any) ([]byte, error) {
return bof.pcs.marshaler(req.(Request))
}

func (bof *batchStruct) bytesToRequest(b []byte) (any, error) {
Expand Down
15 changes: 14 additions & 1 deletion exporter/exporterhelper/internal/persistent_storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,13 @@ func createTestClient(extension storage.Extension) storage.Client {
}

func createTestPersistentStorageWithLoggingAndCapacity(client storage.Client, logger *zap.Logger, capacity uint64) *persistentContiguousStorage {
return newPersistentContiguousStorage(context.Background(), "foo", capacity, logger, client, newFakeTracesRequestUnmarshalerFunc())
return newPersistentContiguousStorage(context.Background(), "foo", PersistentQueueSettings{
Capacity: capacity,
Logger: logger,
Client: client,
Unmarshaler: newFakeTracesRequestUnmarshalerFunc(),
Marshaler: newFakeTracesRequestMarshalerFunc(),
})
}

func createTestPersistentStorage(client storage.Client) *persistentContiguousStorage {
Expand Down Expand Up @@ -82,6 +88,13 @@ func newFakeTracesRequestUnmarshalerFunc() RequestUnmarshaler {
}
}

func newFakeTracesRequestMarshalerFunc() RequestMarshaler {
return func(req Request) ([]byte, error) {
marshaler := ptrace.ProtoMarshaler{}
return marshaler.MarshalTraces(req.(*fakeTracesRequest).td)
}
}

func TestPersistentStorage_CorruptedData(t *testing.T) {
path := t.TempDir()

Expand Down
10 changes: 5 additions & 5 deletions exporter/exporterhelper/internal/request.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,8 @@ type Request interface {
// Otherwise, it should return the original Request.
OnError(error) Request

// Count returns the count of spans/metric points or log records.
Count() int

// Marshal serializes the current request into a byte stream
Marshal() ([]byte, error)
// ItemsCount returns the number of basic items in the request (spans, date points or log records for OTLP)
ItemsCount() int

// OnProcessingFinished calls the optional callback function to handle cleanup after all processing is finished
OnProcessingFinished()
Expand All @@ -34,3 +31,6 @@ type Request interface {

// RequestUnmarshaler defines a function which takes a byte slice and unmarshals it into a relevant request
type RequestUnmarshaler func([]byte) (Request, error)

// RequestMarshaler defines a function which takes a request and marshals it into a byte slice
type RequestMarshaler func(Request) ([]byte, error)

0 comments on commit 5fc2162

Please sign in to comment.