Skip to content

Commit

Permalink
[FIXED] Async publish error handling on disconnect (#1592)
Browse files Browse the repository at this point in the history
Signed-off-by: Piotr Piotrowski <piotr@synadia.com>
  • Loading branch information
piotrpio committed Apr 2, 2024
1 parent 4207658 commit c97f022
Show file tree
Hide file tree
Showing 4 changed files with 186 additions and 21 deletions.
22 changes: 20 additions & 2 deletions jetstream/publish.go
Original file line number Diff line number Diff line change
Expand Up @@ -378,6 +378,7 @@ func (js *jetStream) handleAsyncReply(m *nats.Msg) {
cb := js.publisher.asyncPublisherOpts.aecb
js.publisher.Unlock()
if cb != nil {
paf.msg.Reply = ""
cb(js, paf.msg, err)
}
}
Expand All @@ -388,6 +389,12 @@ func (js *jetStream) handleAsyncReply(m *nats.Msg) {
paf.retries++
paf.msg.Reply = m.Subject
time.AfterFunc(paf.retryWait, func() {
js.publisher.Lock()
paf := js.getPAF(id)
js.publisher.Unlock()
if paf == nil {
return
}
_, err := js.PublishMsgAsync(paf.msg, func(po *pubOpts) error {
po.pafRetry = paf
return nil
Expand Down Expand Up @@ -453,10 +460,21 @@ func (js *jetStream) resetPendingAcksOnReconnect() {
return
}
js.publisher.Lock()
for _, paf := range js.publisher.acks {
errCb := js.publisher.asyncPublisherOpts.aecb
for id, paf := range js.publisher.acks {
paf.err = nats.ErrDisconnected
if paf.errCh != nil {
paf.errCh <- paf.err
}
if errCb != nil {
js.publisher.Unlock()
// clear reply subject so that new one is created on republish
paf.msg.Reply = ""
errCb(js, paf.msg, nats.ErrDisconnected)
js.publisher.Lock()
}
delete(js.publisher.acks, id)
}
js.publisher.acks = nil
if js.publisher.doneCh != nil {
close(js.publisher.doneCh)
js.publisher.doneCh = nil
Expand Down
107 changes: 90 additions & 17 deletions jetstream/test/publish_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,10 @@ package test
import (
"context"
"errors"
"fmt"
"os"
"reflect"
"sync"
"testing"
"time"

Expand Down Expand Up @@ -1330,7 +1332,6 @@ func TestPublishMsgAsyncWithPendingMsgs(t *testing.T) {

func TestPublishAsyncResetPendingOnReconnect(t *testing.T) {
s := RunBasicJetStreamServer()
defer shutdownJSServerAndRemoveStorage(t, s)

nc, err := nats.Connect(s.ClientURL())
if err != nil {
Expand All @@ -1352,6 +1353,7 @@ func TestPublishAsyncResetPendingOnReconnect(t *testing.T) {
errs := make(chan error, 1)
done := make(chan struct{}, 1)
acks := make(chan jetstream.PubAckFuture, 100)
wg := sync.WaitGroup{}
go func() {
for i := 0; i < 100; i++ {
if ack, err := js.PublishAsync("FOO.A", []byte("hello")); err != nil {
Expand All @@ -1360,6 +1362,7 @@ func TestPublishAsyncResetPendingOnReconnect(t *testing.T) {
} else {
acks <- ack
}
wg.Add(1)
}
close(acks)
done <- struct{}{}
Expand All @@ -1371,28 +1374,32 @@ func TestPublishAsyncResetPendingOnReconnect(t *testing.T) {
case <-time.After(5 * time.Second):
t.Fatalf("Did not receive completion signal")
}
s.Shutdown()
time.Sleep(100 * time.Millisecond)
if pending := js.PublishAsyncPending(); pending != 0 {
t.Fatalf("Expected no pending messages after server shutdown; got: %d", pending)
for ack := range acks {
go func(paf jetstream.PubAckFuture) {
select {
case <-paf.Ok():
case err := <-paf.Err():
if !errors.Is(err, nats.ErrDisconnected) && !errors.Is(err, nats.ErrNoResponders) {
errs <- fmt.Errorf("Expected error: %v or %v; got: %v", nats.ErrDisconnected, nats.ErrNoResponders, err)
}
case <-time.After(5 * time.Second):
errs <- fmt.Errorf("Did not receive completion signal")
}
wg.Done()
}(ack)
}
s = RunBasicJetStreamServer()
s = restartBasicJSServer(t, s)
defer shutdownJSServerAndRemoveStorage(t, s)

for ack := range acks {
select {
case <-ack.Ok():
case err := <-ack.Err():
if !errors.Is(err, nats.ErrDisconnected) && !errors.Is(err, nats.ErrNoResponders) {
t.Fatalf("Expected error: %v or %v; got: %v", nats.ErrDisconnected, nats.ErrNoResponders, err)
}
case <-time.After(5 * time.Second):
t.Fatalf("Did not receive completion signal")
}
wg.Wait()
select {
case err := <-errs:
t.Fatalf("Unexpected error: %v", err)
default:
}
}

func TestAsyncPublishRetry(t *testing.T) {
func TestPublishAsyncRetry(t *testing.T) {
tests := []struct {
name string
pubOpts []jetstream.PublishOpt
Expand Down Expand Up @@ -1472,3 +1479,69 @@ func TestAsyncPublishRetry(t *testing.T) {
})
}
}

func TestPublishAsyncRetryInErrHandler(t *testing.T) {
s := RunBasicJetStreamServer()
defer shutdownJSServerAndRemoveStorage(t, s)

nc, err := nats.Connect(s.ClientURL())
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}

streamCreated := make(chan struct{})
errCB := func(js jetstream.JetStream, m *nats.Msg, e error) {
<-streamCreated
_, err := js.PublishMsgAsync(m)
if err != nil {
t.Fatalf("Unexpected error when republishing: %v", err)
}
}

js, err := jetstream.New(nc, jetstream.WithPublishAsyncErrHandler(errCB))
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
defer nc.Close()
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

errs := make(chan error, 1)
done := make(chan struct{}, 1)
go func() {
for i := 0; i < 10; i++ {
if _, err := js.PublishAsync("FOO.A", []byte("hello"), jetstream.WithRetryAttempts(0)); err != nil {
errs <- err
return
}
}
done <- struct{}{}
}()
select {
case <-done:
case err := <-errs:
t.Fatalf("Unexpected error during publish: %v", err)
case <-time.After(5 * time.Second):
t.Fatalf("Did not receive completion signal")
}
stream, err := js.CreateStream(ctx, jetstream.StreamConfig{Name: "foo", Subjects: []string{"FOO.*"}})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}

close(streamCreated)
select {
case <-js.PublishAsyncComplete():
case <-time.After(5 * time.Second):
t.Fatalf("Did not receive completion signal")
}

info, err := stream.Info(context.Background())
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}

if info.State.Msgs != 10 {
t.Fatalf("Expected 10 messages in the stream; got: %d", info.State.Msgs)
}
}
14 changes: 12 additions & 2 deletions js.go
Original file line number Diff line number Diff line change
Expand Up @@ -712,10 +712,20 @@ func (js *js) resetPendingAcksOnReconnect() {
return
}
js.mu.Lock()
for _, paf := range js.pafs {
errCb := js.opts.aecb
for id, paf := range js.pafs {
paf.err = ErrDisconnected
if paf.errCh != nil {
paf.errCh <- paf.err
}
if errCb != nil {
// clear reply subject so that new one is created on republish
js.mu.Unlock()
errCb(js, paf.msg, ErrDisconnected)
js.mu.Lock()
}
delete(js.pafs, id)
}
js.pafs = nil
if js.dch != nil {
close(js.dch)
js.dch = nil
Expand Down
64 changes: 64 additions & 0 deletions test/js_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8076,6 +8076,70 @@ func TestPublishAsyncResetPendingOnReconnect(t *testing.T) {
}
}

func TestPublishAsyncRetryInErrHandler(t *testing.T) {
s := RunBasicJetStreamServer()
defer shutdownJSServerAndRemoveStorage(t, s)

nc, err := nats.Connect(s.ClientURL())
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}

streamCreated := make(chan struct{})
errCB := func(js nats.JetStream, m *nats.Msg, e error) {
<-streamCreated
_, err := js.PublishMsgAsync(m)
if err != nil {
t.Fatalf("Unexpected error when republishing: %v", err)
}
}

js, err := nc.JetStream(nats.PublishAsyncErrHandler(errCB))
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
defer nc.Close()

errs := make(chan error, 1)
done := make(chan struct{}, 1)
go func() {
for i := 0; i < 10; i++ {
if _, err := js.PublishAsync("FOO.A", []byte("hello")); err != nil {
errs <- err
return
}
}
done <- struct{}{}
}()
select {
case <-done:
case err := <-errs:
t.Fatalf("Unexpected error during publish: %v", err)
case <-time.After(5 * time.Second):
t.Fatalf("Did not receive completion signal")
}
_, err = js.AddStream(&nats.StreamConfig{Name: "foo", Subjects: []string{"FOO.*"}})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}

close(streamCreated)
select {
case <-js.PublishAsyncComplete():
case <-time.After(5 * time.Second):
t.Fatalf("Did not receive completion signal")
}

info, err := js.StreamInfo("foo")
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}

if info.State.Msgs != 10 {
t.Fatalf("Expected 10 messages in the stream; got: %d", info.State.Msgs)
}
}

func TestJetStreamPublishAsyncPerf(t *testing.T) {
// Comment out below to run this benchmark.
t.SkipNow()
Expand Down

0 comments on commit c97f022

Please sign in to comment.