Skip to content

Commit

Permalink
[FIXED] Race condition when resetting ordered consumer (#1526)
Browse files Browse the repository at this point in the history
Signed-off-by: Piotr Piotrowski <piotr@synadia.com>
  • Loading branch information
piotrpio committed Jan 16, 2024
1 parent a8a8d18 commit 14e0c6b
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 13 deletions.
16 changes: 9 additions & 7 deletions jetstream/ordered.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,13 @@ func (c *orderedConsumer) Consume(handler MessageHandler, opts ...PullConsumeOpt
for {
select {
case <-c.doReset:
if err := c.reset(); err != nil {
sub, ok := c.currentConsumer.getSubscription("")
if !ok {
return
}
c.errHandler(c.serial)(sub, err)
}
if c.withStopAfter {
select {
case c.stopAfter = <-c.stopAfterMsgsLeft:
Expand All @@ -149,13 +156,6 @@ func (c *orderedConsumer) Consume(handler MessageHandler, opts ...PullConsumeOpt
return
}
}
if err := c.reset(); err != nil {
sub, ok := c.currentConsumer.getSubscription("")
if !ok {
return
}
c.errHandler(c.serial)(sub, err)
}
if c.stopAfter > 0 {
opts = opts[:len(opts)-2]
} else {
Expand Down Expand Up @@ -190,6 +190,8 @@ func (c *orderedConsumer) Consume(handler MessageHandler, opts ...PullConsumeOpt

func (c *orderedConsumer) errHandler(serial int) func(cc ConsumeContext, err error) {
return func(cc ConsumeContext, err error) {
c.Lock()
defer c.Unlock()
if c.userErrHandler != nil && !errors.Is(err, errOrderedSequenceMismatch) {
c.userErrHandler(cc, err)
}
Expand Down
9 changes: 3 additions & 6 deletions jetstream/pull.go
Original file line number Diff line number Diff line change
Expand Up @@ -649,12 +649,12 @@ func (s *pullSubscription) Next() (Msg, error) {

func (s *pullSubscription) handleStatusMsg(msg *nats.Msg, msgErr error) error {
if !errors.Is(msgErr, nats.ErrTimeout) && !errors.Is(msgErr, ErrMaxBytesExceeded) {
if s.consumeOpts.ErrHandler != nil {
s.consumeOpts.ErrHandler(s, msgErr)
}
if errors.Is(msgErr, ErrConsumerDeleted) || errors.Is(msgErr, ErrBadRequest) {
return msgErr
}
if s.consumeOpts.ErrHandler != nil {
s.consumeOpts.ErrHandler(s, msgErr)
}
if errors.Is(msgErr, ErrConsumerLeadershipChanged) {
s.pending.msgCount = 0
s.pending.byteCount = 0
Expand All @@ -663,9 +663,6 @@ func (s *pullSubscription) handleStatusMsg(msg *nats.Msg, msgErr error) error {
}
msgsLeft, bytesLeft, err := parsePending(msg)
if err != nil {
if s.consumeOpts.ErrHandler != nil {
s.consumeOpts.ErrHandler(s, err)
}
return err
}
s.pending.msgCount -= msgsLeft
Expand Down

0 comments on commit 14e0c6b

Please sign in to comment.