Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix SIGSEGV with zstd compression enabled #1164

Merged
merged 5 commits into from
Feb 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
16 changes: 12 additions & 4 deletions pulsar/internal/compression/zstd_cgo.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,19 +25,23 @@
package compression

import (
"sync"

"github.com/DataDog/zstd"
log "github.com/sirupsen/logrus"
)

type zstdCGoProvider struct {
ctx zstd.Ctx
ctxPool sync.Pool
level Level
zstdLevel int
}

func newCGoZStdProvider(level Level) Provider {
z := &zstdCGoProvider{
ctx: zstd.NewCtx(),
ctxPool: sync.Pool{New: func() any {
return zstd.NewCtx()
}},
}

switch level {
Expand All @@ -61,7 +65,9 @@ func (z *zstdCGoProvider) CompressMaxSize(originalSize int) int {
}

func (z *zstdCGoProvider) Compress(dst, src []byte) []byte {
out, err := z.ctx.CompressLevel(dst, src, z.zstdLevel)
ctx := z.ctxPool.Get().(zstd.Ctx)
defer z.ctxPool.Put(ctx)
out, err := ctx.CompressLevel(dst, src, z.zstdLevel)
if err != nil {
log.WithError(err).Fatal("Failed to compress")
}
Expand All @@ -70,7 +76,9 @@ func (z *zstdCGoProvider) Compress(dst, src []byte) []byte {
}

func (z *zstdCGoProvider) Decompress(dst, src []byte, originalSize int) ([]byte, error) {
return z.ctx.Decompress(dst, src)
ctx := z.ctxPool.Get().(zstd.Ctx)
defer z.ctxPool.Put(ctx)
return ctx.Decompress(dst, src)
}

func (z *zstdCGoProvider) Close() error {
Expand Down
2 changes: 1 addition & 1 deletion pulsar/internal/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func TimestampMillis(t time.Time) uint64 {
// GetAndAdd perform atomic read and update
func GetAndAdd(n *uint64, diff uint64) uint64 {
for {
v := *n
v := atomic.LoadUint64(n)
if atomic.CompareAndSwapUint64(n, v, v+diff) {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not familiar enough with the pulsar codebase to understand what this is trying to achieve, but the usual way to perform an atomic get-and-add on a uint64 in golang is to use atomic.AddUint64(). Also, is this change related to the issue at hand, or is it an unrelated change?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change is to fix a race issue in the CI discovered by the new test case TestSendConcurrently.
The difference between them is that atomic.AddUint64() returns the new value but GetAndAdd here returns the old value.
Actually I was trying to use atomic.AddUint64(), but if failed in the CI because there are other cases that need the old values.
And yes, this is to fix another regression bug introduced in 0.12.0.

return v
}
Expand Down
28 changes: 28 additions & 0 deletions pulsar/producer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2357,6 +2357,34 @@ func TestFailPendingMessageWithClose(t *testing.T) {
assert.Equal(t, 0, partitionProducerImp.pendingQueue.Size())
}

func TestSendConcurrently(t *testing.T) {
client, err := NewClient(ClientOptions{
URL: lookupURL,
})
assert.NoError(t, err)
defer client.Close()
testProducer, err := client.CreateProducer(ProducerOptions{
Topic: newTopicName(),
CompressionType: ZSTD,
CompressionLevel: Better,
DisableBatching: true,
})
assert.NoError(t, err)

var wg sync.WaitGroup
for i := 0; i < 100; i++ {
wg.Add(1)
go func() {
_, err := testProducer.Send(context.Background(), &ProducerMessage{
Payload: make([]byte, 100),
})
assert.NoError(t, err)
wg.Done()
}()
}
wg.Wait()
}

type pendingQueueWrapper struct {
pendingQueue internal.BlockingQueue
writtenBuffers *[]internal.Buffer
Expand Down