Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Producer] respect context cancellation in Flush #1165

Merged
merged 1 commit into from
Feb 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
6 changes: 3 additions & 3 deletions pulsar/consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -983,7 +983,7 @@ func TestConsumerBatchCumulativeAck(t *testing.T) {
}
wg.Wait()

err = producer.Flush()
err = producer.FlushWithCtx(context.Background())
assert.NoError(t, err)

// send another batch
Expand Down Expand Up @@ -1218,7 +1218,7 @@ func TestConsumerCompressionWithBatches(t *testing.T) {
}, nil)
}

producer.Flush()
producer.FlushWithCtx(context.Background())

for i := 0; i < N; i++ {
msg, err := consumer.Receive(ctx)
Expand Down Expand Up @@ -3932,7 +3932,7 @@ func runBatchIndexAckTest(t *testing.T, ackWithResponse bool, cumulative bool, o
log.Printf("Sent to %v:%d:%d", id, id.BatchIdx(), id.BatchSize())
})
}
assert.Nil(t, producer.Flush())
assert.Nil(t, producer.FlushWithCtx(context.Background()))

msgIds := make([]MessageID, BatchingMaxSize)
for i := 0; i < BatchingMaxSize; i++ {
Expand Down
4 changes: 4 additions & 0 deletions pulsar/internal/pulsartracing/producer_interceptor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,4 +67,8 @@ func (p *mockProducer) Flush() error {
return nil
}

func (p *mockProducer) FlushWithCtx(ctx context.Context) error {
return nil
}

func (p *mockProducer) Close() {}
7 changes: 5 additions & 2 deletions pulsar/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,10 +237,13 @@ type Producer interface {
// return the last sequence id published by this producer.
LastSequenceID() int64

// Flush all the messages buffered in the client and wait until all messages have been successfully
// persisted.
// Deprecated: Use `FlushWithCtx()` instead.
Flush() error

// Flush all the messages buffered in the client and wait until all messageshave been successfully
// persisted.
FlushWithCtx(ctx context.Context) error

// Close the producer and releases resources allocated
// No more writes will be accepted from this producer. Waits until all pending write request are persisted. In case
// of errors, pending writes will not be retried.
Expand Down
6 changes: 5 additions & 1 deletion pulsar/producer_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -334,11 +334,15 @@ func (p *producer) LastSequenceID() int64 {
}

func (p *producer) Flush() error {
return p.FlushWithCtx(context.Background())
}

func (p *producer) FlushWithCtx(ctx context.Context) error {
jayshrivastava marked this conversation as resolved.
Show resolved Hide resolved
p.RLock()
defer p.RUnlock()

for _, pp := range p.producers {
if err := pp.Flush(); err != nil {
if err := pp.FlushWithCtx(ctx); err != nil {
return err
}

Expand Down
18 changes: 15 additions & 3 deletions pulsar/producer_partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -1422,15 +1422,27 @@ func (p *partitionProducer) LastSequenceID() int64 {
}

func (p *partitionProducer) Flush() error {
return p.FlushWithCtx(context.Background())
}

func (p *partitionProducer) FlushWithCtx(ctx context.Context) error {
flushReq := &flushRequest{
doneCh: make(chan struct{}),
err: nil,
}
p.cmdChan <- flushReq
select {
case <-ctx.Done():
return ctx.Err()
case p.cmdChan <- flushReq:
}

// wait for the flush request to complete
<-flushReq.doneCh
return flushReq.err
select {
case <-ctx.Done():
return ctx.Err()
case <-flushReq.doneCh:
return flushReq.err
}
}

func (p *partitionProducer) getProducerState() producerState {
Expand Down
24 changes: 12 additions & 12 deletions pulsar/producer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ func TestProducerAsyncSend(t *testing.T) {
assert.NoError(t, err)
}

err = producer.Flush()
err = producer.FlushWithCtx(context.Background())
assert.Nil(t, err)

wg.Wait()
Expand Down Expand Up @@ -220,7 +220,7 @@ func TestProducerFlushDisableBatching(t *testing.T) {
assert.NoError(t, err)
}

err = producer.Flush()
err = producer.FlushWithCtx(context.Background())
assert.Nil(t, err)

wg.Wait()
Expand Down Expand Up @@ -387,7 +387,7 @@ func TestFlushInProducer(t *testing.T) {
})
assert.Nil(t, err)
}
err = producer.Flush()
err = producer.FlushWithCtx(context.Background())
assert.Nil(t, err)
wg.Wait()

Expand Down Expand Up @@ -429,7 +429,7 @@ func TestFlushInProducer(t *testing.T) {
assert.Nil(t, err)
}

err = producer.Flush()
err = producer.FlushWithCtx(context.Background())
assert.Nil(t, err)
wg.Wait()

Expand Down Expand Up @@ -500,7 +500,7 @@ func TestFlushInPartitionedProducer(t *testing.T) {
}

// After flush, should be able to consume.
err = producer.Flush()
err = producer.FlushWithCtx(context.Background())
assert.Nil(t, err)

wg.Wait()
Expand Down Expand Up @@ -1717,7 +1717,7 @@ func TestMultipleSchemaOfKeyBasedBatchProducerConsumer(t *testing.T) {
}

}
producer.Flush()
producer.FlushWithCtx(context.Background())

//// create consumer
consumer, err := client.Subscribe(ConsumerOptions{
Expand Down Expand Up @@ -1808,7 +1808,7 @@ func TestMultipleSchemaProducerConsumer(t *testing.T) {
assert.NotNil(t, id)
})
}
producer.Flush()
producer.FlushWithCtx(context.Background())

//// create consumer
consumer, err := client.Subscribe(ConsumerOptions{
Expand Down Expand Up @@ -2027,9 +2027,9 @@ func TestMemLimitRejectProducerMessages(t *testing.T) {
assert.ErrorContains(t, err, getResultStr(ClientMemoryBufferIsFull))

// flush pending msg
err = producer1.Flush()
err = producer1.FlushWithCtx(context.Background())
assert.NoError(t, err)
err = producer2.Flush()
err = producer2.FlushWithCtx(context.Background())
assert.NoError(t, err)
assert.Equal(t, int64(0), c.(*client).memLimit.CurrentUsage())

Expand Down Expand Up @@ -2118,9 +2118,9 @@ func TestMemLimitRejectProducerMessagesWithSchema(t *testing.T) {
assert.ErrorContains(t, err, getResultStr(ClientMemoryBufferIsFull))

// flush pending msg
err = producer1.Flush()
err = producer1.FlushWithCtx(context.Background())
assert.NoError(t, err)
err = producer2.Flush()
err = producer2.FlushWithCtx(context.Background())
assert.NoError(t, err)
assert.Equal(t, int64(0), c.(*client).memLimit.CurrentUsage())

Expand Down Expand Up @@ -2244,7 +2244,7 @@ func TestMemLimitContextCancel(t *testing.T) {
cancel()
wg.Wait()

err = producer.Flush()
err = producer.FlushWithCtx(context.Background())
assert.NoError(t, err)
assert.Equal(t, int64(0), c.(*client).memLimit.CurrentUsage())

Expand Down
6 changes: 3 additions & 3 deletions pulsar/reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,7 @@ func TestReaderOnSpecificMessageWithBatching(t *testing.T) {
})
}

err = producer.Flush()
err = producer.FlushWithCtx(context.Background())
assert.NoError(t, err)

// create reader on 5th message (not included)
Expand Down Expand Up @@ -353,7 +353,7 @@ func TestReaderOnLatestWithBatching(t *testing.T) {
})
}

err = producer.Flush()
err = producer.FlushWithCtx(context.Background())
assert.NoError(t, err)

// create reader on 5th message (not included)
Expand Down Expand Up @@ -592,7 +592,7 @@ func TestReaderSeek(t *testing.T) {
seekID = id
}
}
err = producer.Flush()
err = producer.FlushWithCtx(context.Background())
assert.NoError(t, err)

for i := 0; i < N; i++ {
Expand Down