Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add custom ring implementation to the BatchProcessor #5237

Merged
merged 10 commits into from
Apr 24, 2024
87 changes: 82 additions & 5 deletions sdk/log/batch.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

// Copyright 2009 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
MrAlias marked this conversation as resolved.
Show resolved Hide resolved

package log // import "go.opentelemetry.io/otel/sdk/log"

import (
"container/ring"
"context"
"errors"
"slices"
Expand Down Expand Up @@ -255,11 +258,11 @@ type queue struct {
sync.Mutex

cap, len int
read, write *ring.Ring
read, write *ring
}

func newQueue(size int) *queue {
r := ring.New(size)
r := newRing(size)
return &queue{
cap: size,
read: r,
Expand Down Expand Up @@ -304,7 +307,7 @@ func (q *queue) TryDequeue(buf []Record, write func([]Record) bool) int {

n := min(len(buf), q.len)
for i := 0; i < n; i++ {
buf[i] = q.read.Value.(Record)
buf[i] = q.read.Value
q.read = q.read.Next()
}

Expand All @@ -324,14 +327,88 @@ func (q *queue) Flush() []Record {

out := make([]Record, q.len)
for i := range out {
out[i] = q.read.Value.(Record)
out[i] = q.read.Value
q.read = q.read.Next()
}
q.len = 0

return out
}

// A ring is an element of a circular list, or ring. Rings do not have a
// beginning or end; a pointer to any ring element serves as reference to the
// entire ring. Empty rings are represented as nil ring pointers. The zero
// value for a ring is a one-element ring with a nil Value.
//
// This is copied from the "container/ring" package. It uses a Record type for
// Value instead of any to avoid allocations.
type ring struct {
next, prev *ring
Value Record
}

func (r *ring) init() *ring {
r.next = r
r.prev = r
return r
}

// Next returns the next ring element. r must not be empty.
func (r *ring) Next() *ring {
if r.next == nil {
return r.init()
}
return r.next
}

// Prev returns the previous ring element. r must not be empty.
func (r *ring) Prev() *ring {
if r.next == nil {
return r.init()
}
return r.prev
}

// newRing creates a ring of n elements.
func newRing(n int) *ring {
if n <= 0 {
return nil
}
r := new(ring)
p := r
for i := 1; i < n; i++ {
p.next = &ring{prev: p}
p = p.next
}
p.next = r
r.prev = p
return r
}

// Len computes the number of elements in ring r. It executes in time
// proportional to the number of elements.
func (r *ring) Len() int {
n := 0
if r != nil {
n = 1
for p := r.Next(); p != r; p = p.next {
n++
}
}
return n
}

// Do calls function f on each element of the ring, in forward order. The
// behavior of Do is undefined if f changes *r.
func (r *ring) Do(f func(Record)) {
if r != nil {
f(r.Value)
for p := r.Next(); p != r; p = p.next {
f(p.Value)
}
}
}

type batchConfig struct {
maxQSize setting[int]
expInterval setting[time.Duration]
Expand Down
103 changes: 103 additions & 0 deletions sdk/log/batch_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

// Copyright 2009 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.

package log // import "go.opentelemetry.io/otel/sdk/log"

import (
Expand All @@ -10,6 +14,7 @@ import (
"sync"
"testing"
"time"
"unsafe"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -560,3 +565,101 @@ func TestQueue(t *testing.T) {
assert.Len(t, out, goRoutines, "flushed Records")
})
}

func verifyRing(t *testing.T, r *ring, N int, sum int) {
// Length.
assert.Equal(t, N, r.Len(), "r.Len()")

// Iteration.
var n, s int
r.Do(func(v Record) {
n++
body := v.Body()
if body.Kind() != log.KindEmpty {
s += int(body.AsInt64())
}
})
assert.Equal(t, N, n, "number of forward iterations")
if sum >= 0 {
assert.Equal(t, sum, s, "forward ring sum")
}

if r == nil {
return
}

// Connections.
if r.next != nil {
var p *ring // previous element.
for q := r; p == nil || q != r; q = q.next {
if p != nil {
assert.Equalf(t, p, q.prev, "prev = %p, expected q.prev = %p", p, q.prev)
}
p = q
}
assert.Equalf(t, p, r.prev, "prev = %p, expected r.prev = %p", p, r.prev)
}

// Next, Prev.
assert.Equal(t, r.next, r.Next(), "r.Next() != r.next")
assert.Equal(t, r.prev, r.Prev(), "r.Prev() != r.prev")
}

func TestNewRing(t *testing.T) {
for i := 0; i < 10; i++ {
// Empty value.
r := newRing(i)
verifyRing(t, r, i, -1)
}

for n := 0; n < 10; n++ {
r := newRing(n)
for i := 1; i <= n; i++ {
var rec Record
rec.SetBody(log.IntValue(i))
r.Value = rec
r = r.Next()
}

sum := (n*n + n) / 2
verifyRing(t, r, n, sum)
}
}

func TestEmptyRing(t *testing.T) {
var rNext, rPrev ring
verifyRing(t, rNext.Next(), 1, 0)
verifyRing(t, rPrev.Prev(), 1, 0)

var rLen, rDo *ring
assert.Equal(t, rLen.Len(), 0, "Len()")
rDo.Do(func(Record) { assert.Fail(t, "Do func arg called") })
}

func BenchmarkBatchProcessorOnEmit(b *testing.B) {
var r Record
body := log.BoolValue(true)
r.SetBody(body)

rSize := unsafe.Sizeof(r) + unsafe.Sizeof(body)
ctx := context.Background()
bp := NewBatchProcessor(
defaultNoopExporter,
WithMaxQueueSize(b.N+1),
WithExportMaxBatchSize(b.N+1),
WithExportInterval(time.Hour),
WithExportTimeout(time.Hour),
)
b.Cleanup(func() { _ = bp.Shutdown(ctx) })

b.SetBytes(int64(rSize))
b.ReportAllocs()
b.ResetTimer()
b.RunParallel(func(pb *testing.PB) {
var err error
for pb.Next() {
err = bp.OnEmit(ctx, r)
}
_ = err
})
}