Skip to content

Commit

Permalink
Cherry-pick PRs into v2.10.10 release branch (#5033)
Browse files Browse the repository at this point in the history
  • Loading branch information
wallyqs committed Feb 2, 2024
2 parents 7304c56 + 29756de commit 4360a45
Show file tree
Hide file tree
Showing 3 changed files with 150 additions and 5 deletions.
29 changes: 25 additions & 4 deletions server/jetstream_cluster.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2020-2023 The NATS Authors
// Copyright 2020-2024 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
Expand Down Expand Up @@ -432,10 +432,10 @@ func (cc *jetStreamCluster) isStreamCurrent(account, stream string) bool {
}

// Restart the stream in question.
// Should only be called when the stream is know in a bad state.
// Should only be called when the stream is known to be in a bad state.
func (js *jetStream) restartStream(acc *Account, csa *streamAssignment) {
js.mu.Lock()
cc := js.cluster
s, cc := js.srv, js.cluster
if cc == nil {
js.mu.Unlock()
return
Expand All @@ -458,9 +458,18 @@ func (js *jetStream) restartStream(acc *Account, csa *streamAssignment) {
}
rg.node = nil
}
sinceCreation := time.Since(sa.Created)
js.mu.Unlock()

// Process stream assignment to recreate.
// Check that we have given system enough time to start us up.
// This will be longer than obvious, and matches consumer logic in case system very busy.
if sinceCreation < 10*time.Second {
s.Debugf("Not restarting missing stream '%s > %s', too soon since creation %v",
acc, csa.Config.Name, sinceCreation)
return
}

js.processStreamAssignment(sa)

// If we had consumers assigned to this server they will be present in the copy, csa.
Expand Down Expand Up @@ -569,13 +578,24 @@ func (js *jetStream) isConsumerHealthy(mset *stream, consumer string, ca *consum
// When we try to restart we nil out the node if applicable
// and reprocess the consumer assignment.
restartConsumer := func() {
mset.mu.RLock()
accName, streamName := mset.acc.GetName(), mset.cfg.Name
mset.mu.RUnlock()

js.mu.Lock()
deleted := ca.deleted
// Check that we have not just been created.
if !deleted && time.Since(ca.Created) < 10*time.Second {
s.Debugf("Not restarting missing consumer '%s > %s > %s', too soon since creation %v",
accName, streamName, consumer, time.Since(ca.Created))
js.mu.Unlock()
return
}
// Make sure the node is stopped if still running.
if node != nil && node.State() != Closed {
node.Stop()
}
ca.Group.node = nil
deleted := ca.deleted
js.mu.Unlock()
if !deleted {
js.processConsumerAssignment(ca)
Expand Down Expand Up @@ -4112,6 +4132,7 @@ func (js *jetStream) processConsumerRemoval(ca *consumerAssignment) {
// Make sure this removal is for what we have, otherwise ignore.
if ca.Group != nil && oca.Group != nil && ca.Group.Name == oca.Group.Name {
needDelete = true
oca.deleted = true
delete(sa.consumers, ca.Name)
}
}
Expand Down
2 changes: 1 addition & 1 deletion server/jetstream_cluster_3_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3786,7 +3786,7 @@ func TestJetStreamClusterHealthzCheckForStoppedAssets(t *testing.T) {
// Wait for exit.
time.Sleep(100 * time.Millisecond)

checkFor(t, 5*time.Second, 500*time.Millisecond, func() error {
checkFor(t, 15*time.Second, 500*time.Millisecond, func() error {
hs := s.healthz(nil)
if hs.Error != _EMPTY_ {
return errors.New(hs.Error)
Expand Down
124 changes: 124 additions & 0 deletions server/norace_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9390,3 +9390,127 @@ func TestNoRaceJetStreamClusterStreamCatchupLargeInteriorDeletes(t *testing.T) {
return fmt.Errorf("Msgs not equal %d vs %d", state.Msgs, si.State.Msgs)
})
}

func TestNoRaceJetStreamClusterBadRestartsWithHealthzPolling(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()

nc, js := jsClientConnect(t, c.randomServer())
defer nc.Close()

cfg := &nats.StreamConfig{
Name: "TEST",
Subjects: []string{"foo.>"},
Replicas: 3,
}
_, err := js.AddStream(cfg)
require_NoError(t, err)

// We will poll healthz at a decent clip and make sure any restart logic works
// correctly with assets coming and going.
ch := make(chan struct{})
defer close(ch)

go func() {
for {
select {
case <-ch:
return
case <-time.After(50 * time.Millisecond):
for _, s := range c.servers {
s.healthz(nil)
}
}
}
}()

numConsumers := 500
consumers := make([]string, 0, numConsumers)

var wg sync.WaitGroup

for i := 0; i < numConsumers; i++ {
cname := fmt.Sprintf("CONS-%d", i+1)
consumers = append(consumers, cname)
wg.Add(1)
go func() {
defer wg.Done()
_, err := js.PullSubscribe("foo.>", cname, nats.BindStream("TEST"))
require_NoError(t, err)
}()
}
wg.Wait()

// Make sure all are reported.
checkFor(t, 5*time.Second, 100*time.Millisecond, func() error {
for _, s := range c.servers {
jsz, _ := s.Jsz(nil)
if jsz.Consumers != numConsumers {
return fmt.Errorf("%v wrong number of consumers: %d vs %d", s, jsz.Consumers, numConsumers)
}
}
return nil
})

// Now do same for streams.
numStreams := 200
streams := make([]string, 0, numStreams)

for i := 0; i < numStreams; i++ {
sname := fmt.Sprintf("TEST-%d", i+1)
streams = append(streams, sname)
wg.Add(1)
go func() {
defer wg.Done()
_, err := js.AddStream(&nats.StreamConfig{Name: sname, Replicas: 3})
require_NoError(t, err)
}()
}
wg.Wait()

// Make sure all are reported.
checkFor(t, 5*time.Second, 100*time.Millisecond, func() error {
for _, s := range c.servers {
jsz, _ := s.Jsz(nil)
if jsz.Streams != numStreams+1 {
return fmt.Errorf("%v wrong number of streams: %d vs %d", s, jsz.Streams, numStreams+1)
}
}
return nil
})

// Delete consumers.
for _, cname := range consumers {
err := js.DeleteConsumer("TEST", cname)
require_NoError(t, err)
}
// Make sure reporting goes to zero.
checkFor(t, 5*time.Second, 100*time.Millisecond, func() error {
for _, s := range c.servers {
jsz, _ := s.Jsz(nil)
if jsz.Consumers != 0 {
return fmt.Errorf("%v still has %d consumers", s, jsz.Consumers)
}
}
return nil
})

// Delete streams
for _, sname := range streams {
err := js.DeleteStream(sname)
require_NoError(t, err)
}
err = js.DeleteStream("TEST")
require_NoError(t, err)

// Make sure reporting goes to zero.
checkFor(t, 5*time.Second, 100*time.Millisecond, func() error {
for _, s := range c.servers {
jsz, _ := s.Jsz(nil)
if jsz.Streams != 0 {
return fmt.Errorf("%v still has %d streams", s, jsz.Streams)
}
}
return nil
})
}

0 comments on commit 4360a45

Please sign in to comment.