Skip to content

Commit

Permalink
Fix SIGSEGV with zstd compression enabled (#1164)
Browse files Browse the repository at this point in the history
* Fix SIGSEGV with zstd compression enabled

* Use sync.Pool to cache zstd ctx

* Fix race in sequenceID assignment

* Fix GetAndAdd

(cherry picked from commit 8776135)
  • Loading branch information
RobertIndie committed Feb 2, 2024
1 parent 853c108 commit 0f0d5a8
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 5 deletions.
16 changes: 12 additions & 4 deletions pulsar/internal/compression/zstd_cgo.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,19 +25,23 @@
package compression

import (
"sync"

"github.com/DataDog/zstd"
log "github.com/sirupsen/logrus"
)

type zstdCGoProvider struct {
ctx zstd.Ctx
ctxPool sync.Pool
level Level
zstdLevel int
}

func newCGoZStdProvider(level Level) Provider {
z := &zstdCGoProvider{
ctx: zstd.NewCtx(),
ctxPool: sync.Pool{New: func() any {
return zstd.NewCtx()
}},
}

switch level {
Expand All @@ -61,7 +65,9 @@ func (z *zstdCGoProvider) CompressMaxSize(originalSize int) int {
}

func (z *zstdCGoProvider) Compress(dst, src []byte) []byte {
out, err := z.ctx.CompressLevel(dst, src, z.zstdLevel)
ctx := z.ctxPool.Get().(zstd.Ctx)
defer z.ctxPool.Put(ctx)
out, err := ctx.CompressLevel(dst, src, z.zstdLevel)
if err != nil {
log.WithError(err).Fatal("Failed to compress")
}
Expand All @@ -70,7 +76,9 @@ func (z *zstdCGoProvider) Compress(dst, src []byte) []byte {
}

func (z *zstdCGoProvider) Decompress(dst, src []byte, originalSize int) ([]byte, error) {
return z.ctx.Decompress(dst, src)
ctx := z.ctxPool.Get().(zstd.Ctx)
defer z.ctxPool.Put(ctx)
return ctx.Decompress(dst, src)
}

func (z *zstdCGoProvider) Close() error {
Expand Down
2 changes: 1 addition & 1 deletion pulsar/internal/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func TimestampMillis(t time.Time) uint64 {
// GetAndAdd perform atomic read and update
func GetAndAdd(n *uint64, diff uint64) uint64 {
for {
v := *n
v := atomic.LoadUint64(n)
if atomic.CompareAndSwapUint64(n, v, v+diff) {
return v
}
Expand Down
28 changes: 28 additions & 0 deletions pulsar/producer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2357,6 +2357,34 @@ func TestFailPendingMessageWithClose(t *testing.T) {
assert.Equal(t, 0, partitionProducerImp.pendingQueue.Size())
}

func TestSendConcurrently(t *testing.T) {
client, err := NewClient(ClientOptions{
URL: lookupURL,
})
assert.NoError(t, err)
defer client.Close()
testProducer, err := client.CreateProducer(ProducerOptions{
Topic: newTopicName(),
CompressionType: ZSTD,
CompressionLevel: Better,
DisableBatching: true,
})
assert.NoError(t, err)

var wg sync.WaitGroup
for i := 0; i < 100; i++ {
wg.Add(1)
go func() {
_, err := testProducer.Send(context.Background(), &ProducerMessage{
Payload: make([]byte, 100),
})
assert.NoError(t, err)
wg.Done()
}()
}
wg.Wait()
}

type pendingQueueWrapper struct {
pendingQueue internal.BlockingQueue
writtenBuffers *[]internal.Buffer
Expand Down

0 comments on commit 0f0d5a8

Please sign in to comment.