Skip to content

Commit

Permalink
Default implementation for empty BatchProcessor
Browse files Browse the repository at this point in the history
Ensure an empty BatchProcessor does not panic when any method is called.
Default an empty BatchProcessor as being shut down.

Part of #5063
  • Loading branch information
MrAlias committed Apr 19, 2024
1 parent 1ea4ee2 commit bc49ecb
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 5 deletions.
12 changes: 7 additions & 5 deletions sdk/log/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,9 @@ const (
var _ Processor = (*BatchProcessor)(nil)

// BatchProcessor is a processor that exports batches of log records.
// A BatchProcessor must be created with [NewBatchProcessor].
//
// Use [NewBatchProcessor] to create a BatchProcessor. An empty BatchProcessor
// is shut down by default, no records will be batched or exported.
type BatchProcessor struct {
// The BatchProcessor is designed to provide the highest throughput of
// log records possible while being compatible with OpenTelemetry. The
Expand Down Expand Up @@ -170,7 +172,7 @@ func (b *BatchProcessor) poll(interval time.Duration) (done chan struct{}) {

// OnEmit batches provided log record.
func (b *BatchProcessor) OnEmit(_ context.Context, r Record) error {
if b.stopped.Load() {
if b.stopped.Load() || b.q == nil {
return nil
}
if n := b.q.Enqueue(r); n >= b.batchSize {
Expand All @@ -187,12 +189,12 @@ func (b *BatchProcessor) OnEmit(_ context.Context, r Record) error {

// Enabled returns if b is enabled.
func (b *BatchProcessor) Enabled(context.Context, Record) bool {
return !b.stopped.Load()
return !b.stopped.Load() && b.q != nil
}

// Shutdown flushes queued log records and shuts down the decorated exporter.
func (b *BatchProcessor) Shutdown(ctx context.Context) error {
if b.stopped.Swap(true) {
if b.stopped.Swap(true) || b.q == nil {
return nil
}

Expand All @@ -219,7 +221,7 @@ var ctxErr = func(ctx context.Context) error {

// ForceFlush flushes queued log records and flushes the decorated exporter.
func (b *BatchProcessor) ForceFlush(ctx context.Context) error {
if b.stopped.Load() {
if b.stopped.Load() || b.q == nil {
return nil
}

Expand Down
12 changes: 12 additions & 0 deletions sdk/log/batch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,18 @@ import (
"go.opentelemetry.io/otel/log"
)

func TestEmptyBatchConfig(t *testing.T) {
assert.NotPanics(t, func() {
var bp BatchProcessor
ctx := context.Background()
var record Record
assert.NoError(t, bp.OnEmit(ctx, record), "OnEmit")
assert.False(t, bp.Enabled(ctx, record), "Enabled")
assert.NoError(t, bp.ForceFlush(ctx), "ForceFlush")
assert.NoError(t, bp.Shutdown(ctx), "Shutdown")
})
}

func TestNewBatchConfig(t *testing.T) {
otel.SetErrorHandler(otel.ErrorHandlerFunc(func(err error) {
t.Log(err)
Expand Down

0 comments on commit bc49ecb

Please sign in to comment.