Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[exporterhelper] New exporter helper for custom requests #7874

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
12 changes: 12 additions & 0 deletions .chloggen/exporter-helper-v2.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
component: exporter/exporterhelper

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Introduce a new exporter helper that operates over client-provided requests instead of pdata

# One or more tracking issues or pull requests related to the change
issues: [7874]

43 changes: 31 additions & 12 deletions exporter/exporterhelper/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,32 +56,47 @@ func (req *baseRequest) OnProcessingFinished() {
}
}

type queueSettings struct {
config QueueSettings
marshaler internal.RequestMarshaler
unmarshaler internal.RequestUnmarshaler
}

func (qs *queueSettings) persistenceEnabled() bool {
return qs.config.StorageID != nil && qs.marshaler != nil && qs.unmarshaler != nil
}

// baseSettings represents all the options that users can configure.
type baseSettings struct {
component.StartFunc
component.ShutdownFunc
consumerOptions []consumer.Option
TimeoutSettings
QueueSettings
queueSettings
RetrySettings
RequestSender
requestExporter bool
}

// fromOptions returns the internal options starting from the default and applying all configured options.
func fromOptions(options ...Option) *baseSettings {
// Start from the default options:
opts := &baseSettings{
// fromOptions returns the baseSettings starting from the default and applying all configured options.
// requestExporter indicates whether the base settings are for a new request exporter or not.
func newBaseSettings(requestExporter bool, options ...Option) *baseSettings {
bs := &baseSettings{
requestExporter: requestExporter,
TimeoutSettings: NewDefaultTimeoutSettings(),
// TODO: Enable queuing by default (call DefaultQueueSettings)
QueueSettings: QueueSettings{Enabled: false},
queueSettings: queueSettings{
config: QueueSettings{Enabled: false},
},
// TODO: Enable retry by default (call DefaultRetrySettings)
RetrySettings: RetrySettings{Enabled: false},
}

for _, op := range options {
op(opts)
op(bs)
}

return opts
return bs
}

// Option apply changes to baseSettings.
Expand Down Expand Up @@ -121,9 +136,13 @@ func WithRetry(retrySettings RetrySettings) Option {

// WithQueue overrides the default QueueSettings for an exporter.
// The default QueueSettings is to disable queueing.
func WithQueue(queueSettings QueueSettings) Option {
// Permanent queue is not yet available for exporter helpers V2, they ignore StorageID field.
dmitryax marked this conversation as resolved.
Show resolved Hide resolved
func WithQueue(config QueueSettings) Option {
return func(o *baseSettings) {
o.QueueSettings = queueSettings
if o.requestExporter {
panic("queueing is not supported for request exporters yet")
}
o.queueSettings.config = config
}
}

Expand All @@ -145,7 +164,7 @@ type baseExporter struct {
qrSender *queuedRetrySender
}

func newBaseExporter(set exporter.CreateSettings, bs *baseSettings, signal component.DataType, reqUnmarshaler internal.RequestUnmarshaler) (*baseExporter, error) {
func newBaseExporter(set exporter.CreateSettings, bs *baseSettings, signal component.DataType) (*baseExporter, error) {
be := &baseExporter{}

var err error
Expand All @@ -154,7 +173,7 @@ func newBaseExporter(set exporter.CreateSettings, bs *baseSettings, signal compo
return nil, err
}

be.qrSender = newQueuedRetrySender(set.ID, signal, bs.QueueSettings, bs.RetrySettings, reqUnmarshaler, &timeoutSender{cfg: bs.TimeoutSettings}, set.Logger)
be.qrSender = newQueuedRetrySender(set.ID, signal, bs.queueSettings, bs.RetrySettings, &timeoutSender{cfg: bs.TimeoutSettings}, set.Logger)
be.sender = be.qrSender
be.StartFunc = func(ctx context.Context, host component.Host) error {
// First start the wrapped exporter.
Expand Down
23 changes: 7 additions & 16 deletions exporter/exporterhelper/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,8 @@ import (

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/exporter"
"go.opentelemetry.io/collector/exporter/exporterhelper/internal"
"go.opentelemetry.io/collector/exporter/exportertest"
"go.opentelemetry.io/collector/pdata/ptrace"
)

var (
Expand All @@ -35,7 +32,11 @@ var (
)

func TestBaseExporter(t *testing.T) {
be, err := newBaseExporter(defaultSettings, fromOptions(), "", nopRequestUnmarshaler())
be, err := newBaseExporter(defaultSettings, newBaseSettings(false), "")
require.NoError(t, err)
require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost()))
require.NoError(t, be.Shutdown(context.Background()))
be, err = newBaseExporter(defaultSettings, newBaseSettings(true), "")
require.NoError(t, err)
require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost()))
require.NoError(t, be.Shutdown(context.Background()))
Expand All @@ -45,12 +46,12 @@ func TestBaseExporterWithOptions(t *testing.T) {
want := errors.New("my error")
be, err := newBaseExporter(
defaultSettings,
fromOptions(
newBaseSettings(
false,
WithStart(func(ctx context.Context, host component.Host) error { return want }),
WithShutdown(func(ctx context.Context) error { return want }),
WithTimeout(NewDefaultTimeoutSettings())),
"",
nopRequestUnmarshaler(),
)
require.NoError(t, err)
require.Equal(t, want, be.Start(context.Background(), componenttest.NewNopHost()))
Expand All @@ -65,13 +66,3 @@ func checkStatus(t *testing.T, sd sdktrace.ReadOnlySpan, err error) {
require.Equal(t, codes.Unset, sd.Status().Code, "SpanData %v", sd)
}
}

func nopTracePusher() consumer.ConsumeTracesFunc {
return func(ctx context.Context, ld ptrace.Traces) error {
return nil
}
}

func nopRequestUnmarshaler() internal.RequestUnmarshaler {
return newTraceRequestUnmarshalerFunc(nopTracePusher())
}
8 changes: 8 additions & 0 deletions exporter/exporterhelper/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,12 @@ var (
errNilPushMetricsData = errors.New("nil PushMetrics")
// errNilPushLogsData is returned when a nil PushLogs is given.
errNilPushLogsData = errors.New("nil PushLogs")
// errNilTracesConverter is returned when a nil TracesConverter is given.
errNilTracesConverter = errors.New("nil TracesConverter")
// errNilMetricsConverter is returned when a nil MetricsConverter is given.
errNilMetricsConverter = errors.New("nil MetricsConverter")
// errNilLogsConverter is returned when a nil LogsConverter is given.
errNilLogsConverter = errors.New("nil LogsConverter")
// errNilRequestSender is returned when a nil RequestSender is given.
errNilRequestSender = errors.New("nil RequestSender")
)
14 changes: 12 additions & 2 deletions exporter/exporterhelper/internal/persistent_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,21 @@ func buildPersistentStorageName(name string, signal component.DataType) string {
return fmt.Sprintf("%s-%s", name, signal)
}

type PersistentQueueSettings struct {
Name string
Signal component.DataType
Capacity uint64
Logger *zap.Logger
Client storage.Client
Unmarshaler RequestUnmarshaler
Marshaler RequestMarshaler
}

// NewPersistentQueue creates a new queue backed by file storage; name and signal must be a unique combination that identifies the queue storage
func NewPersistentQueue(ctx context.Context, name string, signal component.DataType, capacity int, logger *zap.Logger, client storage.Client, unmarshaler RequestUnmarshaler) ProducerConsumerQueue {
func NewPersistentQueue(ctx context.Context, params PersistentQueueSettings) ProducerConsumerQueue {
return &persistentQueue{
stopChan: make(chan struct{}),
storage: newPersistentContiguousStorage(ctx, buildPersistentStorageName(name, signal), uint64(capacity), logger, client, unmarshaler),
storage: newPersistentContiguousStorage(ctx, buildPersistentStorageName(params.Name, params.Signal), params),
}
}

Expand Down
10 changes: 9 additions & 1 deletion exporter/exporterhelper/internal/persistent_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,15 @@ func createTestQueue(extension storage.Extension, capacity int) *persistentQueue
panic(err)
}

wq := NewPersistentQueue(context.Background(), "foo", component.DataTypeTraces, capacity, logger, client, newFakeTracesRequestUnmarshalerFunc())
wq := NewPersistentQueue(context.Background(), PersistentQueueSettings{
Name: "foo",
Signal: component.DataTypeTraces,
Capacity: uint64(capacity),
Logger: logger,
Client: client,
Unmarshaler: newFakeTracesRequestUnmarshalerFunc(),
Marshaler: newFakeTracesRequestMarshalerFunc(),
})
return wq.(*persistentQueue)
}

Expand Down
14 changes: 8 additions & 6 deletions exporter/exporterhelper/internal/persistent_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ type persistentContiguousStorage struct {
queueName string
client storage.Client
unmarshaler RequestUnmarshaler
marshaler RequestMarshaler

putChan chan struct{}
stopChan chan struct{}
Expand Down Expand Up @@ -80,14 +81,15 @@ var (

// newPersistentContiguousStorage creates a new file-storage extension backed queue;
// queueName parameter must be a unique value that identifies the queue.
func newPersistentContiguousStorage(ctx context.Context, queueName string, capacity uint64, logger *zap.Logger, client storage.Client, unmarshaler RequestUnmarshaler) *persistentContiguousStorage {
func newPersistentContiguousStorage(ctx context.Context, queueName string, set PersistentQueueSettings) *persistentContiguousStorage {
pcs := &persistentContiguousStorage{
logger: logger,
client: client,
logger: set.Logger,
client: set.Client,
queueName: queueName,
unmarshaler: unmarshaler,
capacity: capacity,
putChan: make(chan struct{}, capacity),
unmarshaler: set.Unmarshaler,
marshaler: set.Marshaler,
capacity: set.Capacity,
putChan: make(chan struct{}, set.Capacity),
reqChan: make(chan Request),
stopChan: make(chan struct{}),
itemsCount: &atomic.Uint64{},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ func (bof *batchStruct) getItemIndexArrayResult(key string) ([]itemIndex, error)

// setRequest adds Set operation over a given request to the batch
func (bof *batchStruct) setRequest(key string, value Request) *batchStruct {
return bof.set(key, value, requestToBytes)
return bof.set(key, value, bof.requestToBytes)
}

// setItemIndex adds Set operation over a given itemIndex to the batch
Expand Down Expand Up @@ -206,8 +206,8 @@ func bytesToItemIndexArray(b []byte) (any, error) {
return val, err
}

func requestToBytes(req any) ([]byte, error) {
return req.(Request).Marshal()
func (bof *batchStruct) requestToBytes(req any) ([]byte, error) {
return bof.pcs.marshaler(req.(Request))
}

func (bof *batchStruct) bytesToRequest(b []byte) (any, error) {
Expand Down
15 changes: 14 additions & 1 deletion exporter/exporterhelper/internal/persistent_storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,13 @@ func createTestClient(extension storage.Extension) storage.Client {
}

func createTestPersistentStorageWithLoggingAndCapacity(client storage.Client, logger *zap.Logger, capacity uint64) *persistentContiguousStorage {
return newPersistentContiguousStorage(context.Background(), "foo", capacity, logger, client, newFakeTracesRequestUnmarshalerFunc())
return newPersistentContiguousStorage(context.Background(), "foo", PersistentQueueSettings{
Capacity: capacity,
Logger: logger,
Client: client,
Unmarshaler: newFakeTracesRequestUnmarshalerFunc(),
Marshaler: newFakeTracesRequestMarshalerFunc(),
})
}

func createTestPersistentStorage(client storage.Client) *persistentContiguousStorage {
Expand Down Expand Up @@ -82,6 +88,13 @@ func newFakeTracesRequestUnmarshalerFunc() RequestUnmarshaler {
}
}

func newFakeTracesRequestMarshalerFunc() RequestMarshaler {
return func(req Request) ([]byte, error) {
marshaler := ptrace.ProtoMarshaler{}
return marshaler.MarshalTraces(req.(*fakeTracesRequest).td)
}
}

func TestPersistentStorage_CorruptedData(t *testing.T) {
path := t.TempDir()

Expand Down
10 changes: 5 additions & 5 deletions exporter/exporterhelper/internal/request.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,8 @@ type Request interface {
// Otherwise, it should return the original Request.
OnError(error) Request

// Count returns the count of spans/metric points or log records.
Count() int

// Marshal serializes the current request into a byte stream
Marshal() ([]byte, error)
// ItemsCount returns the number of basic items in the request (spans, date points or log records for OTLP)
ItemsCount() int

// OnProcessingFinished calls the optional callback function to handle cleanup after all processing is finished
OnProcessingFinished()
Expand All @@ -34,3 +31,6 @@ type Request interface {

// RequestUnmarshaler defines a function which takes a byte slice and unmarshals it into a relevant request
type RequestUnmarshaler func([]byte) (Request, error)

// RequestMarshaler defines a function which takes a request and marshals it into a byte slice
type RequestMarshaler func(Request) ([]byte, error)