Skip to content

Commit

Permalink
[exporterhelper] New exporter helper with custom requests
Browse files Browse the repository at this point in the history
Introduce a new exporter helper that operates over client provided requests instead of pdata. 

It opens a door for moving batching to the exporter where batches will be built from clients data format, instead of pdata. The batches can be properly sized by custom request size which can be different from OTLP. The same custom request sizing will be applied to the sending queue. It will also improve performance of the sending queue retries for non-OTLP exporters, they don't need to translate pdata on every retry. 

This is an experimental API, once stabilized it's intended to replace the existing helpers.
  • Loading branch information
dmitryax committed Jun 30, 2023
1 parent 50c94c9 commit 9d5bbe4
Show file tree
Hide file tree
Showing 19 changed files with 973 additions and 112 deletions.
26 changes: 20 additions & 6 deletions exporter/exporterhelper/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,14 +56,25 @@ func (req *baseRequest) OnProcessingFinished() {
}
}

type queueSettings struct {
config QueueSettings
marshaler internal.RequestMarshaler
unmarshaler internal.RequestUnmarshaler
}

func (qs *queueSettings) persistenceEnabled() bool {
return qs.config.StorageID != nil && qs.marshaler != nil && qs.unmarshaler != nil
}

// baseSettings represents all the options that users can configure.
type baseSettings struct {
component.StartFunc
component.ShutdownFunc
consumerOptions []consumer.Option
TimeoutSettings
QueueSettings
queueSettings
RetrySettings
RequestSender
}

// fromOptions returns the internal options starting from the default and applying all configured options.
Expand All @@ -72,7 +83,9 @@ func fromOptions(options ...Option) *baseSettings {
opts := &baseSettings{
TimeoutSettings: NewDefaultTimeoutSettings(),
// TODO: Enable queuing by default (call DefaultQueueSettings)
QueueSettings: QueueSettings{Enabled: false},
queueSettings: queueSettings{
config: QueueSettings{Enabled: false},
},
// TODO: Enable retry by default (call DefaultRetrySettings)
RetrySettings: RetrySettings{Enabled: false},
}
Expand Down Expand Up @@ -121,9 +134,10 @@ func WithRetry(retrySettings RetrySettings) Option {

// WithQueue overrides the default QueueSettings for an exporter.
// The default QueueSettings is to disable queueing.
func WithQueue(queueSettings QueueSettings) Option {
// Permanent queue is not yet available for V2 exporters, StorageID is ignored.
func WithQueue(config QueueSettings) Option {
return func(o *baseSettings) {
o.QueueSettings = queueSettings
o.queueSettings.config = config
}
}

Expand All @@ -145,7 +159,7 @@ type baseExporter struct {
qrSender *queuedRetrySender
}

func newBaseExporter(set exporter.CreateSettings, bs *baseSettings, signal component.DataType, reqUnmarshaler internal.RequestUnmarshaler) (*baseExporter, error) {
func newBaseExporter(set exporter.CreateSettings, bs *baseSettings, signal component.DataType) (*baseExporter, error) {
be := &baseExporter{}

var err error
Expand All @@ -154,7 +168,7 @@ func newBaseExporter(set exporter.CreateSettings, bs *baseSettings, signal compo
return nil, err
}

be.qrSender = newQueuedRetrySender(set.ID, signal, bs.QueueSettings, bs.RetrySettings, reqUnmarshaler, &timeoutSender{cfg: bs.TimeoutSettings}, set.Logger)
be.qrSender = newQueuedRetrySender(set.ID, signal, bs.queueSettings, bs.RetrySettings, &timeoutSender{cfg: bs.TimeoutSettings}, set.Logger)
be.sender = be.qrSender
be.StartFunc = func(ctx context.Context, host component.Host) error {
// First start the wrapped exporter.
Expand Down
16 changes: 1 addition & 15 deletions exporter/exporterhelper/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,8 @@ import (

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/exporter"
"go.opentelemetry.io/collector/exporter/exporterhelper/internal"
"go.opentelemetry.io/collector/exporter/exportertest"
"go.opentelemetry.io/collector/pdata/ptrace"
)

var (
Expand All @@ -35,7 +32,7 @@ var (
)

func TestBaseExporter(t *testing.T) {
be, err := newBaseExporter(defaultSettings, fromOptions(), "", nopRequestUnmarshaler())
be, err := newBaseExporter(defaultSettings, fromOptions(), "")
require.NoError(t, err)
require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost()))
require.NoError(t, be.Shutdown(context.Background()))
Expand All @@ -50,7 +47,6 @@ func TestBaseExporterWithOptions(t *testing.T) {
WithShutdown(func(ctx context.Context) error { return want }),
WithTimeout(NewDefaultTimeoutSettings())),
"",
nopRequestUnmarshaler(),
)
require.NoError(t, err)
require.Equal(t, want, be.Start(context.Background(), componenttest.NewNopHost()))
Expand All @@ -65,13 +61,3 @@ func checkStatus(t *testing.T, sd sdktrace.ReadOnlySpan, err error) {
require.Equal(t, codes.Unset, sd.Status().Code, "SpanData %v", sd)
}
}

func nopTracePusher() consumer.ConsumeTracesFunc {
return func(ctx context.Context, ld ptrace.Traces) error {
return nil
}
}

func nopRequestUnmarshaler() internal.RequestUnmarshaler {
return newTraceRequestUnmarshalerFunc(nopTracePusher())
}
8 changes: 8 additions & 0 deletions exporter/exporterhelper/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,12 @@ var (
errNilPushMetricsData = errors.New("nil PushMetrics")
// errNilPushLogsData is returned when a nil PushLogs is given.
errNilPushLogsData = errors.New("nil PushLogs")
// errNilTracesConverter is returned when a nil TracesConverter is given.
errNilTracesConverter = errors.New("nil TracesConverter")
// errNilMetricsConverter is returned when a nil MetricsConverter is given.
errNilMetricsConverter = errors.New("nil MetricsConverter")
// errNilLogsConverter is returned when a nil LogsConverter is given.
errNilLogsConverter = errors.New("nil LogsConverter")
// errNilRequestSender is returned when a nil RequestSender is given.
errNilRequestSender = errors.New("nil RequestSender")
)
14 changes: 12 additions & 2 deletions exporter/exporterhelper/internal/persistent_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,21 @@ func buildPersistentStorageName(name string, signal component.DataType) string {
return fmt.Sprintf("%s-%s", name, signal)
}

type PersistentQueueSettings struct {
Name string
Signal component.DataType
Capacity uint64
Logger *zap.Logger
Client storage.Client
Unmarshaler RequestUnmarshaler
Marshaler RequestMarshaler
}

// NewPersistentQueue creates a new queue backed by file storage; name and signal must be a unique combination that identifies the queue storage
func NewPersistentQueue(ctx context.Context, name string, signal component.DataType, capacity int, logger *zap.Logger, client storage.Client, unmarshaler RequestUnmarshaler) ProducerConsumerQueue {
func NewPersistentQueue(ctx context.Context, params PersistentQueueSettings) ProducerConsumerQueue {
return &persistentQueue{
stopChan: make(chan struct{}),
storage: newPersistentContiguousStorage(ctx, buildPersistentStorageName(name, signal), uint64(capacity), logger, client, unmarshaler),
storage: newPersistentContiguousStorage(ctx, buildPersistentStorageName(params.Name, params.Signal), params),
}
}

Expand Down
10 changes: 9 additions & 1 deletion exporter/exporterhelper/internal/persistent_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,15 @@ func createTestQueue(extension storage.Extension, capacity int) *persistentQueue
panic(err)
}

wq := NewPersistentQueue(context.Background(), "foo", component.DataTypeTraces, capacity, logger, client, newFakeTracesRequestUnmarshalerFunc())
wq := NewPersistentQueue(context.Background(), PersistentQueueSettings{
Name: "foo",
Signal: component.DataTypeTraces,
Capacity: uint64(capacity),
Logger: logger,
Client: client,
Unmarshaler: newFakeTracesRequestUnmarshalerFunc(),
Marshaler: newFakeTracesRequestMarshalerFunc(),
})
return wq.(*persistentQueue)
}

Expand Down
14 changes: 8 additions & 6 deletions exporter/exporterhelper/internal/persistent_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ type persistentContiguousStorage struct {
queueName string
client storage.Client
unmarshaler RequestUnmarshaler
marshaler RequestMarshaler

putChan chan struct{}
stopChan chan struct{}
Expand Down Expand Up @@ -80,14 +81,15 @@ var (

// newPersistentContiguousStorage creates a new file-storage extension backed queue;
// queueName parameter must be a unique value that identifies the queue.
func newPersistentContiguousStorage(ctx context.Context, queueName string, capacity uint64, logger *zap.Logger, client storage.Client, unmarshaler RequestUnmarshaler) *persistentContiguousStorage {
func newPersistentContiguousStorage(ctx context.Context, queueName string, set PersistentQueueSettings) *persistentContiguousStorage {
pcs := &persistentContiguousStorage{
logger: logger,
client: client,
logger: set.Logger,
client: set.Client,
queueName: queueName,
unmarshaler: unmarshaler,
capacity: capacity,
putChan: make(chan struct{}, capacity),
unmarshaler: set.Unmarshaler,
marshaler: set.Marshaler,
capacity: set.Capacity,
putChan: make(chan struct{}, set.Capacity),
reqChan: make(chan Request),
stopChan: make(chan struct{}),
itemsCount: &atomic.Uint64{},
Expand Down
6 changes: 3 additions & 3 deletions exporter/exporterhelper/internal/persistent_storage_batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ func (bof *batchStruct) getItemIndexArrayResult(key string) ([]itemIndex, error)

// setRequest adds Set operation over a given request to the batch
func (bof *batchStruct) setRequest(key string, value Request) *batchStruct {
return bof.set(key, value, requestToBytes)
return bof.set(key, value, bof.requestToBytes)
}

// setItemIndex adds Set operation over a given itemIndex to the batch
Expand Down Expand Up @@ -206,8 +206,8 @@ func bytesToItemIndexArray(b []byte) (any, error) {
return val, err
}

func requestToBytes(req any) ([]byte, error) {
return req.(Request).Marshal()
func (bof *batchStruct) requestToBytes(req any) ([]byte, error) {
return bof.pcs.marshaler(req.(Request))
}

func (bof *batchStruct) bytesToRequest(b []byte) (any, error) {
Expand Down
15 changes: 14 additions & 1 deletion exporter/exporterhelper/internal/persistent_storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,13 @@ func createTestClient(extension storage.Extension) storage.Client {
}

func createTestPersistentStorageWithLoggingAndCapacity(client storage.Client, logger *zap.Logger, capacity uint64) *persistentContiguousStorage {
return newPersistentContiguousStorage(context.Background(), "foo", capacity, logger, client, newFakeTracesRequestUnmarshalerFunc())
return newPersistentContiguousStorage(context.Background(), "foo", PersistentQueueSettings{
Capacity: capacity,
Logger: logger,
Client: client,
Unmarshaler: newFakeTracesRequestUnmarshalerFunc(),
Marshaler: newFakeTracesRequestMarshalerFunc(),
})
}

func createTestPersistentStorage(client storage.Client) *persistentContiguousStorage {
Expand Down Expand Up @@ -82,6 +88,13 @@ func newFakeTracesRequestUnmarshalerFunc() RequestUnmarshaler {
}
}

func newFakeTracesRequestMarshalerFunc() RequestMarshaler {
return func(req Request) ([]byte, error) {
marshaler := ptrace.ProtoMarshaler{}
return marshaler.MarshalTraces(req.(*fakeTracesRequest).td)
}
}

func TestPersistentStorage_CorruptedData(t *testing.T) {
path := t.TempDir()

Expand Down
10 changes: 5 additions & 5 deletions exporter/exporterhelper/internal/request.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,8 @@ type Request interface {
// Otherwise, it should return the original Request.
OnError(error) Request

// Count returns the count of spans/metric points or log records.
Count() int

// Marshal serializes the current request into a byte stream
Marshal() ([]byte, error)
// ItemsCount returns the number of basic items in the request (spans, date points or log records for OTLP)
ItemsCount() int

// OnProcessingFinished calls the optional callback function to handle cleanup after all processing is finished
OnProcessingFinished()
Expand All @@ -34,3 +31,6 @@ type Request interface {

// RequestUnmarshaler defines a function which takes a byte slice and unmarshals it into a relevant request
type RequestUnmarshaler func([]byte) (Request, error)

// RequestMarshaler defines a function which takes a request and marshals it into a byte slice
type RequestMarshaler func(Request) ([]byte, error)
80 changes: 76 additions & 4 deletions exporter/exporterhelper/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import (
"context"
"errors"

"go.uber.org/zap"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/consumer/consumererror"
Expand Down Expand Up @@ -42,6 +44,10 @@ func newLogsRequestUnmarshalerFunc(pusher consumer.ConsumeLogsFunc) internal.Req
}
}

func logsRequestMarshaler(req internal.Request) ([]byte, error) {
return logsMarshaler.MarshalLogs(req.(*logsRequest).ld)

Check warning on line 48 in exporter/exporterhelper/logs.go

View check run for this annotation

Codecov / codecov/patch

exporter/exporterhelper/logs.go#L47-L48

Added lines #L47 - L48 were not covered by tests
}

func (req *logsRequest) OnError(err error) internal.Request {
var logError consumererror.Logs
if errors.As(err, &logError) {
Expand All @@ -58,7 +64,7 @@ func (req *logsRequest) Marshal() ([]byte, error) {
return logsMarshaler.MarshalLogs(req.ld)
}

func (req *logsRequest) Count() int {
func (req *logsRequest) ItemsCount() int {
return req.ld.LogRecordCount()
}

Expand Down Expand Up @@ -88,7 +94,9 @@ func NewLogsExporter(
}

bs := fromOptions(options...)
be, err := newBaseExporter(set, bs, component.DataTypeLogs, newLogsRequestUnmarshalerFunc(pusher))
bs.marshaler = logsRequestMarshaler
bs.unmarshaler = newLogsRequestUnmarshalerFunc(pusher)
be, err := newBaseExporter(set, bs, component.DataTypeLogs)
if err != nil {
return nil, err
}
Expand All @@ -103,7 +111,7 @@ func NewLogsExporter(
req := newLogsRequest(ctx, ld, pusher)
serr := be.sender.send(req)
if errors.Is(serr, errSendingQueueIsFull) {
be.obsrep.recordLogsEnqueueFailure(req.Context(), int64(req.Count()))
be.obsrep.recordLogsEnqueueFailure(req.Context(), int64(req.ItemsCount()))
}
return serr
}, bs.consumerOptions...)
Expand All @@ -114,6 +122,70 @@ func NewLogsExporter(
}, err
}

type LogsConverter interface {
// RequestFromLogs converts plog.Logs data into a request.
RequestFromLogs(context.Context, plog.Logs) (Request, error)
}

// NewLogsExporterV2 creates new logs exporter based on custom LogsConverter and RequestSender.
func NewLogsExporterV2(
_ context.Context,
set exporter.CreateSettings,
converter LogsConverter,
sender RequestSender,
options ...Option,
) (exporter.Logs, error) {
if set.Logger == nil {
return nil, errNilLogger
}

if converter == nil {
return nil, errNilLogsConverter
}

if sender == nil {
return nil, errNilRequestSender
}

bs := fromOptions(options...)
bs.RequestSender = sender

be, err := newBaseExporter(set, bs, component.DataTypeLogs)
if err != nil {
return nil, err
}

Check warning on line 156 in exporter/exporterhelper/logs.go

View check run for this annotation

Codecov / codecov/patch

exporter/exporterhelper/logs.go#L155-L156

Added lines #L155 - L156 were not covered by tests
be.wrapConsumerSender(func(nextSender requestSender) requestSender {
return &logsExporterWithObservability{
obsrep: be.obsrep,
nextSender: nextSender,
}
})

lc, err := consumer.NewLogs(func(ctx context.Context, ld plog.Logs) error {
req, cErr := converter.RequestFromLogs(ctx, ld)
if cErr != nil {
set.Logger.Error("Failed to convert logs. Dropping data.",
zap.Int("dropped_log_records", ld.LogRecordCount()),
zap.Error(err))
return consumererror.NewPermanent(cErr)
}
sErr := be.sender.send(&request{
baseRequest: baseRequest{ctx: ctx},
Request: req,
sender: sender,
})
if errors.Is(sErr, errSendingQueueIsFull) {
be.obsrep.recordLogsEnqueueFailure(ctx, int64(req.ItemsCount()))
}
return sErr
}, bs.consumerOptions...)

return &logsExporter{
baseExporter: be,
Logs: lc,
}, err
}

type logsExporterWithObservability struct {
obsrep *obsExporter
nextSender requestSender
Expand All @@ -122,6 +194,6 @@ type logsExporterWithObservability struct {
func (lewo *logsExporterWithObservability) send(req internal.Request) error {
req.SetContext(lewo.obsrep.StartLogsOp(req.Context()))
err := lewo.nextSender.send(req)
lewo.obsrep.EndLogsOp(req.Context(), req.Count(), err)
lewo.obsrep.EndLogsOp(req.Context(), req.ItemsCount(), err)
return err
}

0 comments on commit 9d5bbe4

Please sign in to comment.