Skip to content

Commit

Permalink
Default implementation for empty BatchProcessor
Browse files Browse the repository at this point in the history
Ensure an empty BatchProcessor does not panic when any method is called.

Initialize the empty BatchProcessor to use the default/envar setup if
not created with NewBatchProcessor.

Part of #5063
  • Loading branch information
MrAlias committed Apr 19, 2024
1 parent 1ea4ee2 commit 6fa33f5
Show file tree
Hide file tree
Showing 2 changed files with 66 additions and 11 deletions.
36 changes: 25 additions & 11 deletions sdk/log/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,12 @@ type BatchProcessor struct {
//
// All of the exporter's methods are called synchronously.
func NewBatchProcessor(exporter Exporter, opts ...BatchProcessorOption) *BatchProcessor {
cfg := newBatchConfig(opts)
b := new(BatchProcessor)
b.init(exporter, newBatchConfig(opts))
return b
}

func (b *BatchProcessor) init(exporter Exporter, cfg batchConfig) {
if exporter == nil {
// Do not panic on nil export.
exporter = defaultNoopExporter
Expand All @@ -113,17 +118,13 @@ func NewBatchProcessor(exporter Exporter, opts ...BatchProcessorOption) *BatchPr
// appropriately on export.
exporter = newChunkExporter(exporter, cfg.expMaxBatchSize.Value)

b := &BatchProcessor{
// TODO: explore making the size of this configurable.
exporter: newBufferExporter(exporter, 1),

q: newQueue(cfg.maxQSize.Value),
batchSize: cfg.expMaxBatchSize.Value,
pollTrigger: make(chan struct{}, 1),
pollKill: make(chan struct{}),
}
// TODO (#5238): explore making the size of this configurable.
b.exporter = newBufferExporter(exporter, 1)
b.q = newQueue(cfg.maxQSize.Value)
b.batchSize = cfg.expMaxBatchSize.Value
b.pollTrigger = make(chan struct{}, 1)
b.pollKill = make(chan struct{})
b.pollDone = b.poll(cfg.expInterval.Value)
return b
}

// poll spawns a goroutine to handle interval polling and batch exporting. The
Expand Down Expand Up @@ -173,6 +174,9 @@ func (b *BatchProcessor) OnEmit(_ context.Context, r Record) error {
if b.stopped.Load() {
return nil
}
if b.q == nil {
b.init(nil, newBatchConfig(nil))
}
if n := b.q.Enqueue(r); n >= b.batchSize {
select {
case b.pollTrigger <- struct{}{}:
Expand All @@ -196,6 +200,11 @@ func (b *BatchProcessor) Shutdown(ctx context.Context) error {
return nil
}

if b.q == nil {
// Nothing to stop.
return nil
}

// Stop the poll goroutine.
close(b.pollKill)
select {
Expand Down Expand Up @@ -223,6 +232,11 @@ func (b *BatchProcessor) ForceFlush(ctx context.Context) error {
return nil
}

if b.q == nil {
// Nothing to flush.
return nil
}

buf := make([]Record, b.q.cap)
notFlushed := func() bool {
var flushed bool
Expand Down
41 changes: 41 additions & 0 deletions sdk/log/batch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,47 @@ import (
"go.opentelemetry.io/otel/log"
)

func TestEmptyBatchConfig(t *testing.T) {
assert.NotPanics(t, func() {
var bp BatchProcessor
ctx := context.Background()
var record Record
assert.NoError(t, bp.OnEmit(ctx, record), "OnEmit")

assert.NotNil(t, bp.exporter, "Exporter")
if assert.NotNil(t, bp.q, "queue") {
assert.Equal(t, dfltMaxQSize, bp.q.cap, "queue capacity")
}
assert.Equal(t, dfltExpMaxBatchSize, bp.batchSize, "batch size")
assert.NotNil(t, bp.pollTrigger, "poll trigger chan")
assert.NotNil(t, bp.pollKill, "poll kill chan")
assert.NotNil(t, bp.pollDone, "poll done chan")

assert.True(t, bp.Enabled(ctx, record), "Enabled")
assert.NoError(t, bp.ForceFlush(ctx), "ForceFlush")
assert.NoError(t, bp.Shutdown(ctx), "Shutdown")
}, "init via OnEmit")

assert.NotPanics(t, func() {
var bp BatchProcessor
ctx := context.Background()
var record Record
assert.True(t, bp.Enabled(ctx, record), "Enabled")
}, "Enabled")

assert.NotPanics(t, func() {
var bp BatchProcessor
ctx := context.Background()
assert.NoError(t, bp.ForceFlush(ctx), "ForceFlush")
}, "ForceFlush")

assert.NotPanics(t, func() {
var bp BatchProcessor
ctx := context.Background()
assert.NoError(t, bp.Shutdown(ctx), "Shutdown")
}, "Shutdown")
}

func TestNewBatchConfig(t *testing.T) {
otel.SetErrorHandler(otel.ErrorHandlerFunc(func(err error) {
t.Log(err)
Expand Down

0 comments on commit 6fa33f5

Please sign in to comment.