Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add custom ring implementation to the BatchProcessor #5237

Merged
merged 10 commits into from
Apr 24, 2024
9 changes: 4 additions & 5 deletions sdk/log/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
package log // import "go.opentelemetry.io/otel/sdk/log"

import (
"container/ring"
"context"
"errors"
"slices"
Expand Down Expand Up @@ -255,11 +254,11 @@ type queue struct {
sync.Mutex

cap, len int
read, write *ring.Ring
read, write *ring
}

func newQueue(size int) *queue {
r := ring.New(size)
r := newRing(size)
return &queue{
cap: size,
read: r,
Expand Down Expand Up @@ -304,7 +303,7 @@ func (q *queue) TryDequeue(buf []Record, write func([]Record) bool) int {

n := min(len(buf), q.len)
for i := 0; i < n; i++ {
buf[i] = q.read.Value.(Record)
buf[i] = q.read.Value
q.read = q.read.Next()
}

Expand All @@ -324,7 +323,7 @@ func (q *queue) Flush() []Record {

out := make([]Record, q.len)
for i := range out {
out[i] = q.read.Value.(Record)
out[i] = q.read.Value
q.read = q.read.Next()
}
q.len = 0
Expand Down
29 changes: 29 additions & 0 deletions sdk/log/batch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"sync"
"testing"
"time"
"unsafe"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -560,3 +561,31 @@ func TestQueue(t *testing.T) {
assert.Len(t, out, goRoutines, "flushed Records")
})
}

func BenchmarkBatchProcessorOnEmit(b *testing.B) {
var r Record
body := log.BoolValue(true)
r.SetBody(body)

rSize := unsafe.Sizeof(r) + unsafe.Sizeof(body)
ctx := context.Background()
bp := NewBatchProcessor(
defaultNoopExporter,
WithMaxQueueSize(b.N+1),
WithExportMaxBatchSize(b.N+1),
WithExportInterval(time.Hour),
WithExportTimeout(time.Hour),
)
b.Cleanup(func() { _ = bp.Shutdown(ctx) })

b.SetBytes(int64(rSize))
b.ReportAllocs()
b.ResetTimer()
b.RunParallel(func(pb *testing.PB) {
var err error
for pb.Next() {
err = bp.OnEmit(ctx, r)
}
_ = err
})
}
82 changes: 82 additions & 0 deletions sdk/log/ring.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

// Copyright 2009 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.

package log // import "go.opentelemetry.io/otel/sdk/log"

// A ring is an element of a circular list, or ring. Rings do not have a
// beginning or end; a pointer to any ring element serves as reference to the
// entire ring. Empty rings are represented as nil ring pointers. The zero
// value for a ring is a one-element ring with a nil Value.
//
// This is copied from the "container/ring" package. It uses a Record type for
// Value instead of any to avoid allocations.
type ring struct {
next, prev *ring
Value Record
}

func (r *ring) init() *ring {
r.next = r
r.prev = r
return r
}

// Next returns the next ring element. r must not be empty.
func (r *ring) Next() *ring {
if r.next == nil {
return r.init()
}
return r.next
}

// Prev returns the previous ring element. r must not be empty.
func (r *ring) Prev() *ring {
if r.next == nil {
return r.init()
}
return r.prev
}

// newRing creates a ring of n elements.
func newRing(n int) *ring {
if n <= 0 {
return nil
}
r := new(ring)
p := r
for i := 1; i < n; i++ {
p.next = &ring{prev: p}
p = p.next
}
p.next = r
r.prev = p
return r
}

// Len computes the number of elements in ring r. It executes in time
// proportional to the number of elements.
func (r *ring) Len() int {
n := 0
if r != nil {
n = 1
for p := r.Next(); p != r; p = p.next {
n++
}
}
return n
}

// Do calls function f on each element of the ring, in forward order. The
// behavior of Do is undefined if f changes *r.
func (r *ring) Do(f func(Record)) {
if r != nil {
f(r.Value)
for p := r.Next(); p != r; p = p.next {
f(p.Value)
}
}
}
86 changes: 86 additions & 0 deletions sdk/log/ring_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

// Copyright 2009 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.

package log

import (
"testing"

"github.com/stretchr/testify/assert"

"go.opentelemetry.io/otel/log"
)

func verifyRing(t *testing.T, r *ring, N int, sum int) {
// Length.
assert.Equal(t, N, r.Len(), "r.Len()")

// Iteration.
var n, s int
r.Do(func(v Record) {
n++
body := v.Body()
if body.Kind() != log.KindEmpty {
s += int(body.AsInt64())
}
})
assert.Equal(t, N, n, "number of forward iterations")
if sum >= 0 {
assert.Equal(t, sum, s, "forward ring sum")
}

if r == nil {
return
}

// Connections.
if r.next != nil {
var p *ring // previous element.
for q := r; p == nil || q != r; q = q.next {
if p != nil {
assert.Equalf(t, p, q.prev, "prev = %p, expected q.prev = %p", p, q.prev)
}
p = q
}
assert.Equalf(t, p, r.prev, "prev = %p, expected r.prev = %p", p, r.prev)
}

// Next, Prev.
assert.Equal(t, r.next, r.Next(), "r.Next() != r.next")
assert.Equal(t, r.prev, r.Prev(), "r.Prev() != r.prev")
}

func TestNewRing(t *testing.T) {
for i := 0; i < 10; i++ {
// Empty value.
r := newRing(i)
verifyRing(t, r, i, -1)
}

for n := 0; n < 10; n++ {
r := newRing(n)
for i := 1; i <= n; i++ {
var rec Record
rec.SetBody(log.IntValue(i))
r.Value = rec
r = r.Next()
}

sum := (n*n + n) / 2
verifyRing(t, r, n, sum)
}
}

func TestEmptyRing(t *testing.T) {
var rNext, rPrev ring
verifyRing(t, rNext.Next(), 1, 0)
verifyRing(t, rPrev.Prev(), 1, 0)

var rLen, rDo *ring
assert.Equal(t, rLen.Len(), 0, "Len()")
rDo.Do(func(Record) { assert.Fail(t, "Do func arg called") })
}