Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix] [issue 1051]: Fix inaccurate producer mem limit in chunking and schema #1055

Merged
merged 5 commits into from
Jul 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
55 changes: 33 additions & 22 deletions pulsar/producer_partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -477,21 +477,19 @@ func (p *partitionProducer) internalSend(request *sendRequest) {

// read payload from message
uncompressedPayload := msg.Payload
uncompressedPayloadSize := int64(len(uncompressedPayload))

var schemaPayload []byte
var err error

// The block chan must be closed when returned with exception
defer request.stopBlock()
if !p.canAddToQueue(request, uncompressedPayloadSize) {
if !p.canAddToQueue(request) {
Copy link
Contributor

@gunli gunli Jul 13, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

forget to runCallback, it is better to add a debug log

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

runCallback is invoked by canReserveMem if it's failed.

return
}

if p.options.DisableMultiSchema {
if msg.Schema != nil && p.options.Schema != nil &&
msg.Schema.GetSchemaInfo().hash() != p.options.Schema.GetSchemaInfo().hash() {
p.releaseSemaphoreAndMem(uncompressedPayloadSize)
runCallback(request.callback, nil, request.msg, fmt.Errorf("msg schema can not match with producer schema"))
p.log.WithError(err).Errorf("The producer %s of the topic %s is disabled the `MultiSchema`", p.producerName, p.topic)
return
Expand All @@ -510,7 +508,6 @@ func (p *partitionProducer) internalSend(request *sendRequest) {
if uncompressedPayload == nil && schema != nil {
schemaPayload, err = schema.Encode(msg.Value)
if err != nil {
p.releaseSemaphoreAndMem(uncompressedPayloadSize)
runCallback(request.callback, nil, request.msg, newError(SchemaFailure, err.Error()))
p.log.WithError(err).Errorf("Schema encode message failed %s", msg.Value)
return
Expand All @@ -526,7 +523,6 @@ func (p *partitionProducer) internalSend(request *sendRequest) {
if schemaVersion == nil {
schemaVersion, err = p.getOrCreateSchema(schema.GetSchemaInfo())
if err != nil {
p.releaseSemaphoreAndMem(uncompressedPayloadSize)
p.log.WithError(err).Error("get schema version fail")
runCallback(request.callback, nil, request.msg, fmt.Errorf("get schema version fail, err: %w", err))
return
Expand All @@ -537,6 +533,11 @@ func (p *partitionProducer) internalSend(request *sendRequest) {

uncompressedSize := len(uncompressedPayload)

// try to reserve memory for uncompressedPayload
if !p.canReserveMem(request, int64(uncompressedSize)) {
return
Copy link
Contributor

@gunli gunli Jul 13, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

forget to release semaphore and runCallback, it is better to add a debug log

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Semaphore is released by canReserveMem if it failed, runCallback as well.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And I think Semaphore released by canReserveMem may not be a good idea. It makes canReserveMem must be invoked after canAddToQueue . What do you think?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think so, runCallback/releaseSemaphore/releaseMemory in canAddToQueue and canReserveMem violates the Single Responsibility Principle

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The final solution is encapsulating the resource releasing logic in request.done(), anywhere there is an error, just call request.done()

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm +1 for moving the semaphore release out of canReserveMem. It's better that we release it here than in the canReserveMem before we find a good solution for it.

}

deliverAt := msg.DeliverAt
if msg.DeliverAfter.Nanoseconds() > 0 {
deliverAt = time.Now().Add(msg.DeliverAfter)
Expand Down Expand Up @@ -586,7 +587,7 @@ func (p *partitionProducer) internalSend(request *sendRequest) {

// if msg is too large and chunking is disabled
if checkSize > maxMessageSize && !p.options.EnableChunking {
p.releaseSemaphoreAndMem(uncompressedPayloadSize)
p.releaseSemaphoreAndMem(int64(uncompressedSize))
runCallback(request.callback, nil, request.msg, errMessageTooLarge)
p.log.WithError(errMessageTooLarge).
WithField("size", checkSize).
Expand All @@ -605,7 +606,7 @@ func (p *partitionProducer) internalSend(request *sendRequest) {
} else {
payloadChunkSize = int(p._getConn().GetMaxMessageSize()) - proto.Size(mm)
if payloadChunkSize <= 0 {
p.releaseSemaphoreAndMem(uncompressedPayloadSize)
p.releaseSemaphoreAndMem(int64(uncompressedSize))
runCallback(request.callback, nil, msg, errMetaTooLarge)
p.log.WithError(errMetaTooLarge).
WithField("metadata size", proto.Size(mm)).
Expand Down Expand Up @@ -652,10 +653,11 @@ func (p *partitionProducer) internalSend(request *sendRequest) {
uuid: uuid,
chunkRecorder: cr,
transaction: request.transaction,
reservedMem: int64(rhs - lhs),
}
// the permit of first chunk has acquired
if chunkID != 0 && !p.canAddToQueue(nsr, 0) {
p.releaseSemaphoreAndMem(uncompressedPayloadSize - int64(rhs))
if chunkID != 0 && !p.canAddToQueue(nsr) {
p.releaseSemaphoreAndMem(int64(uncompressedSize - lhs))
return
}
p.internalSingleSend(mm, compressedPayload[lhs:rhs], nsr, uint32(maxMessageSize))
Expand All @@ -680,7 +682,7 @@ func (p *partitionProducer) internalSend(request *sendRequest) {
// after flushing try again to add the current payload
if ok := addRequestToBatch(smm, p, uncompressedPayload, request, msg, deliverAt, schemaVersion,
multiSchemaEnabled); !ok {
p.releaseSemaphoreAndMem(uncompressedPayloadSize)
p.releaseSemaphoreAndMem(int64(uncompressedSize))
runCallback(request.callback, nil, request.msg, errFailAddToBatch)
p.log.WithField("size", uncompressedSize).
WithField("properties", msg.Properties).
Expand Down Expand Up @@ -832,7 +834,7 @@ func (p *partitionProducer) internalSingleSend(mm *pb.MessageMetadata,

if err != nil {
runCallback(request.callback, nil, request.msg, err)
p.releaseSemaphoreAndMem(int64(len(msg.Payload)))
p.releaseSemaphoreAndMem(request.reservedMem)
p.log.WithError(err).Errorf("Single message serialize failed %s", msg.Value)
return
}
Expand Down Expand Up @@ -971,7 +973,7 @@ func (p *partitionProducer) failTimeoutMessages() {
sr := i.(*sendRequest)
if sr.msg != nil {
size := len(sr.msg.Payload)
p.releaseSemaphoreAndMem(int64(size))
p.releaseSemaphoreAndMem(sr.reservedMem)
p.metrics.MessagesPending.Dec()
p.metrics.BytesPending.Sub(float64(size))
p.metrics.PublishErrorsTimeout.Inc()
Expand Down Expand Up @@ -1208,7 +1210,7 @@ func (p *partitionProducer) ReceivedSendReceipt(response *pb.CommandSendReceipt)
for idx, i := range pi.sendRequests {
sr := i.(*sendRequest)
atomic.StoreInt64(&p.lastSequenceID, int64(pi.sequenceID))
p.releaseSemaphoreAndMem(int64(len(sr.msg.Payload)))
p.releaseSemaphoreAndMem(sr.reservedMem)
p.metrics.PublishLatency.Observe(float64(now-sr.publishTime.UnixNano()) / 1.0e9)
p.metrics.MessagesPublished.Inc()
p.metrics.MessagesPending.Dec()
Expand Down Expand Up @@ -1352,6 +1354,7 @@ type sendRequest struct {
uuid string
chunkRecorder *chunkRecorder
transaction *transaction
reservedMem int64
}

// stopBlock can be invoked multiple times safety
Expand Down Expand Up @@ -1401,31 +1404,39 @@ func (p *partitionProducer) releaseSemaphoreAndMem(size int64) {
p.client.memLimit.ReleaseMemory(size)
}

func (p *partitionProducer) canAddToQueue(sr *sendRequest, uncompressedPayloadSize int64) bool {
func (p *partitionProducer) canAddToQueue(sr *sendRequest) bool {
if p.options.DisableBlockIfQueueFull {
if !p.publishSemaphore.TryAcquire() {
runCallback(sr.callback, nil, sr.msg, errSendQueueIsFull)
return false
}
if !p.client.memLimit.TryReserveMemory(uncompressedPayloadSize) {
} else {
if !p.publishSemaphore.Acquire(sr.ctx) {
runCallback(sr.callback, nil, sr.msg, errContextExpired)
return false
}
}
p.metrics.MessagesPending.Inc()
return true
}

func (p *partitionProducer) canReserveMem(sr *sendRequest, size int64) bool {
if p.options.DisableBlockIfQueueFull {
if !p.client.memLimit.TryReserveMemory(size) {
p.publishSemaphore.Release()
runCallback(sr.callback, nil, sr.msg, errMemoryBufferIsFull)
return false
}

} else {
if !p.publishSemaphore.Acquire(sr.ctx) {
runCallback(sr.callback, nil, sr.msg, errContextExpired)
return false
}
if !p.client.memLimit.ReserveMemory(sr.ctx, uncompressedPayloadSize) {
if !p.client.memLimit.ReserveMemory(sr.ctx, size) {
p.publishSemaphore.Release()
runCallback(sr.callback, nil, sr.msg, errContextExpired)
return false
}
}
p.metrics.MessagesPending.Inc()
p.metrics.BytesPending.Add(float64(len(sr.msg.Payload)))
sr.reservedMem += size
p.metrics.BytesPending.Add(float64(size))
return true
}

Expand Down
159 changes: 159 additions & 0 deletions pulsar/producer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1939,6 +1939,165 @@ func TestMemLimitRejectProducerMessages(t *testing.T) {
assert.NoError(t, err)
}

func TestMemLimitRejectProducerMessagesWithSchema(t *testing.T) {

c, err := NewClient(ClientOptions{
URL: serviceURL,
MemoryLimitBytes: 100 * 6,
})
assert.NoError(t, err)
defer c.Close()

schema := NewAvroSchema(`{"fields":
[
{"name":"id","type":"int"},{"default":null,"name":"name","type":["null","string"]}
],
"name":"MyAvro","namespace":"schemaNotFoundTestCase","type":"record"}`, nil)

topicName := newTopicName()
producer1, _ := c.CreateProducer(ProducerOptions{
Topic: topicName,
DisableBlockIfQueueFull: true,
DisableBatching: false,
BatchingMaxPublishDelay: 100 * time.Second,
SendTimeout: 2 * time.Second,
})

producer2, _ := c.CreateProducer(ProducerOptions{
Topic: topicName,
DisableBlockIfQueueFull: true,
DisableBatching: false,
BatchingMaxPublishDelay: 100 * time.Second,
SendTimeout: 2 * time.Second,
})

// the size of encoded value is 6 bytes
value := map[string]interface{}{
"id": 0,
"name": map[string]interface{}{
"string": "abc",
},
}

n := 101
for i := 0; i < n/2; i++ {
producer1.SendAsync(context.Background(), &ProducerMessage{
Value: value,
Schema: schema,
}, func(id MessageID, message *ProducerMessage, e error) {})

producer2.SendAsync(context.Background(), &ProducerMessage{
Value: value,
Schema: schema,
}, func(id MessageID, message *ProducerMessage, e error) {})
}
// Last message in order to reach the limit
producer1.SendAsync(context.Background(), &ProducerMessage{
Value: value,
Schema: schema,
}, func(id MessageID, message *ProducerMessage, e error) {})
time.Sleep(100 * time.Millisecond)
assert.Equal(t, int64(n*6), c.(*client).memLimit.CurrentUsage())

_, err = producer1.Send(context.Background(), &ProducerMessage{
Value: value,
Schema: schema,
})
assert.Error(t, err)
assert.ErrorContains(t, err, getResultStr(ClientMemoryBufferIsFull))

_, err = producer2.Send(context.Background(), &ProducerMessage{
Value: value,
Schema: schema,
})
assert.Error(t, err)
assert.ErrorContains(t, err, getResultStr(ClientMemoryBufferIsFull))

// flush pending msg
err = producer1.Flush()
assert.NoError(t, err)
err = producer2.Flush()
assert.NoError(t, err)
assert.Equal(t, int64(0), c.(*client).memLimit.CurrentUsage())

_, err = producer1.Send(context.Background(), &ProducerMessage{
Value: value,
Schema: schema,
})
assert.NoError(t, err)
_, err = producer2.Send(context.Background(), &ProducerMessage{
Value: value,
Schema: schema,
})
assert.NoError(t, err)
}

func TestMemLimitRejectProducerMessagesWithChunking(t *testing.T) {

c, err := NewClient(ClientOptions{
URL: serviceURL,
MemoryLimitBytes: 5 * 1024,
})
assert.NoError(t, err)
defer c.Close()

topicName := newTopicName()
producer1, _ := c.CreateProducer(ProducerOptions{
Topic: topicName,
DisableBlockIfQueueFull: true,
DisableBatching: true,
EnableChunking: true,
SendTimeout: 2 * time.Second,
})

producer2, _ := c.CreateProducer(ProducerOptions{
Topic: topicName,
DisableBlockIfQueueFull: true,
DisableBatching: false,
BatchingMaxPublishDelay: 100 * time.Millisecond,
SendTimeout: 2 * time.Second,
})

producer2.SendAsync(context.Background(), &ProducerMessage{
Payload: make([]byte, 5*1024+1),
}, func(id MessageID, message *ProducerMessage, e error) {
if e != nil {
t.Fatal(e)
}
})

time.Sleep(50 * time.Millisecond)
assert.Equal(t, int64(5*1024+1), c.(*client).memLimit.CurrentUsage())

_, err = producer1.Send(context.Background(), &ProducerMessage{
Payload: make([]byte, 1),
})
assert.Error(t, err)
assert.ErrorContains(t, err, getResultStr(ClientMemoryBufferIsFull))

// wait all the mem have been released
retryAssert(t, 10, 200, func() {}, func(t assert.TestingT) bool {
return assert.Equal(t, 0, int(c.(*client).memLimit.CurrentUsage()))
})

producer3, _ := c.CreateProducer(ProducerOptions{
Topic: topicName,
DisableBlockIfQueueFull: true,
DisableBatching: true,
EnableChunking: true,
MaxPendingMessages: 1,
ChunkMaxMessageSize: 1024,
SendTimeout: 2 * time.Second,
})

// producer2 will reserve 2*1024 bytes and then release 1024 byte (release the second chunk)
// because it reaches MaxPendingMessages in chunking
_, _ = producer3.Send(context.Background(), &ProducerMessage{
Payload: make([]byte, 2*1024),
})
assert.Equal(t, int64(1024), c.(*client).memLimit.CurrentUsage())
}

func TestMemLimitContextCancel(t *testing.T) {

c, err := NewClient(ClientOptions{
Expand Down