Skip to content

Commit

Permalink
Add custom ring implementation to the BatchProcessor (#5237)
Browse files Browse the repository at this point in the history
  • Loading branch information
MrAlias committed Apr 24, 2024
1 parent baeb560 commit 29e1c7e
Show file tree
Hide file tree
Showing 4 changed files with 201 additions and 5 deletions.
9 changes: 4 additions & 5 deletions sdk/log/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
package log // import "go.opentelemetry.io/otel/sdk/log"

import (
"container/ring"
"context"
"errors"
"slices"
Expand Down Expand Up @@ -255,11 +254,11 @@ type queue struct {
sync.Mutex

cap, len int
read, write *ring.Ring
read, write *ring
}

func newQueue(size int) *queue {
r := ring.New(size)
r := newRing(size)
return &queue{
cap: size,
read: r,
Expand Down Expand Up @@ -304,7 +303,7 @@ func (q *queue) TryDequeue(buf []Record, write func([]Record) bool) int {

n := min(len(buf), q.len)
for i := 0; i < n; i++ {
buf[i] = q.read.Value.(Record)
buf[i] = q.read.Value
q.read = q.read.Next()
}

Expand All @@ -324,7 +323,7 @@ func (q *queue) Flush() []Record {

out := make([]Record, q.len)
for i := range out {
out[i] = q.read.Value.(Record)
out[i] = q.read.Value
q.read = q.read.Next()
}
q.len = 0
Expand Down
29 changes: 29 additions & 0 deletions sdk/log/batch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"sync"
"testing"
"time"
"unsafe"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -560,3 +561,31 @@ func TestQueue(t *testing.T) {
assert.Len(t, out, goRoutines, "flushed Records")
})
}

func BenchmarkBatchProcessorOnEmit(b *testing.B) {
var r Record
body := log.BoolValue(true)
r.SetBody(body)

rSize := unsafe.Sizeof(r) + unsafe.Sizeof(body)
ctx := context.Background()
bp := NewBatchProcessor(
defaultNoopExporter,
WithMaxQueueSize(b.N+1),
WithExportMaxBatchSize(b.N+1),
WithExportInterval(time.Hour),
WithExportTimeout(time.Hour),
)
b.Cleanup(func() { _ = bp.Shutdown(ctx) })

b.SetBytes(int64(rSize))
b.ReportAllocs()
b.ResetTimer()
b.RunParallel(func(pb *testing.PB) {
var err error
for pb.Next() {
err = bp.OnEmit(ctx, r)
}
_ = err
})
}
82 changes: 82 additions & 0 deletions sdk/log/ring.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

// Copyright 2009 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.

package log // import "go.opentelemetry.io/otel/sdk/log"

// A ring is an element of a circular list, or ring. Rings do not have a
// beginning or end; a pointer to any ring element serves as reference to the
// entire ring. Empty rings are represented as nil ring pointers. The zero
// value for a ring is a one-element ring with a nil Value.
//
// This is copied from the "container/ring" package. It uses a Record type for
// Value instead of any to avoid allocations.
type ring struct {
next, prev *ring
Value Record
}

func (r *ring) init() *ring {
r.next = r
r.prev = r
return r
}

// Next returns the next ring element. r must not be empty.
func (r *ring) Next() *ring {
if r.next == nil {
return r.init()
}
return r.next
}

// Prev returns the previous ring element. r must not be empty.
func (r *ring) Prev() *ring {
if r.next == nil {
return r.init()
}
return r.prev
}

// newRing creates a ring of n elements.
func newRing(n int) *ring {
if n <= 0 {
return nil
}
r := new(ring)
p := r
for i := 1; i < n; i++ {
p.next = &ring{prev: p}
p = p.next
}
p.next = r
r.prev = p
return r
}

// Len computes the number of elements in ring r. It executes in time
// proportional to the number of elements.
func (r *ring) Len() int {
n := 0
if r != nil {
n = 1
for p := r.Next(); p != r; p = p.next {
n++
}
}
return n
}

// Do calls function f on each element of the ring, in forward order. The
// behavior of Do is undefined if f changes *r.
func (r *ring) Do(f func(Record)) {
if r != nil {
f(r.Value)
for p := r.Next(); p != r; p = p.next {
f(p.Value)
}
}
}
86 changes: 86 additions & 0 deletions sdk/log/ring_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

// Copyright 2009 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.

package log

import (
"testing"

"github.com/stretchr/testify/assert"

"go.opentelemetry.io/otel/log"
)

func verifyRing(t *testing.T, r *ring, N int, sum int) {
// Length.
assert.Equal(t, N, r.Len(), "r.Len()")

// Iteration.
var n, s int
r.Do(func(v Record) {
n++
body := v.Body()
if body.Kind() != log.KindEmpty {
s += int(body.AsInt64())
}
})
assert.Equal(t, N, n, "number of forward iterations")
if sum >= 0 {
assert.Equal(t, sum, s, "forward ring sum")
}

if r == nil {
return
}

// Connections.
if r.next != nil {
var p *ring // previous element.
for q := r; p == nil || q != r; q = q.next {
if p != nil {
assert.Equalf(t, p, q.prev, "prev = %p, expected q.prev = %p", p, q.prev)
}
p = q
}
assert.Equalf(t, p, r.prev, "prev = %p, expected r.prev = %p", p, r.prev)
}

// Next, Prev.
assert.Equal(t, r.next, r.Next(), "r.Next() != r.next")
assert.Equal(t, r.prev, r.Prev(), "r.Prev() != r.prev")
}

func TestNewRing(t *testing.T) {
for i := 0; i < 10; i++ {
// Empty value.
r := newRing(i)
verifyRing(t, r, i, -1)
}

for n := 0; n < 10; n++ {
r := newRing(n)
for i := 1; i <= n; i++ {
var rec Record
rec.SetBody(log.IntValue(i))
r.Value = rec
r = r.Next()
}

sum := (n*n + n) / 2
verifyRing(t, r, n, sum)
}
}

func TestEmptyRing(t *testing.T) {
var rNext, rPrev ring
verifyRing(t, rNext.Next(), 1, 0)
verifyRing(t, rPrev.Prev(), 1, 0)

var rLen, rDo *ring
assert.Equal(t, rLen.Len(), 0, "Len()")
rDo.Do(func(Record) { assert.Fail(t, "Do func arg called") })
}

0 comments on commit 29e1c7e

Please sign in to comment.