Skip to content

Commit

Permalink
fix: fix test
Browse files Browse the repository at this point in the history
  • Loading branch information
shenjiaqi.2769 committed Jul 17, 2023
1 parent cbac56e commit e9eb93b
Showing 1 changed file with 17 additions and 7 deletions.
24 changes: 17 additions & 7 deletions pulsar/producer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2021,7 +2021,7 @@ func TestMemLimitRejectProducerMessagesWithChunking(t *testing.T) {

c, err := NewClient(ClientOptions{
URL: serviceURL,
MemoryLimitBytes: 10 * 1024,
MemoryLimitBytes: 5 * 1024,
})
assert.NoError(t, err)
defer c.Close()
Expand All @@ -2032,30 +2032,40 @@ func TestMemLimitRejectProducerMessagesWithChunking(t *testing.T) {
DisableBlockIfQueueFull: true,
DisableBatching: true,
EnableChunking: true,
ChunkMaxMessageSize: 1024,
SendTimeout: 2 * time.Second,
})

producer1.SendAsync(context.Background(), &ProducerMessage{
Payload: make([]byte, 10*1024+1),
producer2, _ := c.CreateProducer(ProducerOptions{
Topic: topicName,
DisableBlockIfQueueFull: true,
DisableBatching: false,
BatchingMaxPublishDelay: 100 * time.Millisecond,
SendTimeout: 2 * time.Second,
})

producer2.SendAsync(context.Background(), &ProducerMessage{
Payload: make([]byte, 5*1024+1),
}, func(id MessageID, message *ProducerMessage, e error) {
if e != nil {
t.Fatal(e)
}
})

time.Sleep(50 * time.Millisecond)
assert.Equal(t, int64(5*1024+1), c.(*client).memLimit.CurrentUsage())

_, err = producer1.Send(context.Background(), &ProducerMessage{
Payload: make([]byte, 1),
})
assert.Error(t, err)
assert.ErrorContains(t, err, getResultStr(ClientMemoryBufferIsFull))

// wait all the chunks have been released
// wait all the mem have been released
retryAssert(t, 10, 200, func() {}, func(t assert.TestingT) bool {
return assert.Equal(t, 0, int(c.(*client).memLimit.CurrentUsage()))
})

producer2, _ := c.CreateProducer(ProducerOptions{
producer3, _ := c.CreateProducer(ProducerOptions{
Topic: topicName,
DisableBlockIfQueueFull: true,
DisableBatching: true,
Expand All @@ -2067,7 +2077,7 @@ func TestMemLimitRejectProducerMessagesWithChunking(t *testing.T) {

// producer2 will reserve 2*1024 bytes and then release 1024 byte (release the second chunk)
// because it reaches MaxPendingMessages in chunking
_, _ = producer2.Send(context.Background(), &ProducerMessage{
_, _ = producer3.Send(context.Background(), &ProducerMessage{
Payload: make([]byte, 2*1024),
})
assert.Equal(t, int64(1024), c.(*client).memLimit.CurrentUsage())
Expand Down

0 comments on commit e9eb93b

Please sign in to comment.