Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cherry-pick PRs into v2.10.10 release branch #5033

Merged
merged 1 commit into from
Feb 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
29 changes: 25 additions & 4 deletions server/jetstream_cluster.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2020-2023 The NATS Authors
// Copyright 2020-2024 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
Expand Down Expand Up @@ -432,10 +432,10 @@ func (cc *jetStreamCluster) isStreamCurrent(account, stream string) bool {
}

// Restart the stream in question.
// Should only be called when the stream is know in a bad state.
// Should only be called when the stream is known to be in a bad state.
func (js *jetStream) restartStream(acc *Account, csa *streamAssignment) {
js.mu.Lock()
cc := js.cluster
s, cc := js.srv, js.cluster
if cc == nil {
js.mu.Unlock()
return
Expand All @@ -458,9 +458,18 @@ func (js *jetStream) restartStream(acc *Account, csa *streamAssignment) {
}
rg.node = nil
}
sinceCreation := time.Since(sa.Created)
js.mu.Unlock()

// Process stream assignment to recreate.
// Check that we have given system enough time to start us up.
// This will be longer than obvious, and matches consumer logic in case system very busy.
if sinceCreation < 10*time.Second {
s.Debugf("Not restarting missing stream '%s > %s', too soon since creation %v",
acc, csa.Config.Name, sinceCreation)
return
}

js.processStreamAssignment(sa)

// If we had consumers assigned to this server they will be present in the copy, csa.
Expand Down Expand Up @@ -569,13 +578,24 @@ func (js *jetStream) isConsumerHealthy(mset *stream, consumer string, ca *consum
// When we try to restart we nil out the node if applicable
// and reprocess the consumer assignment.
restartConsumer := func() {
mset.mu.RLock()
accName, streamName := mset.acc.GetName(), mset.cfg.Name
mset.mu.RUnlock()

js.mu.Lock()
deleted := ca.deleted
// Check that we have not just been created.
if !deleted && time.Since(ca.Created) < 10*time.Second {
s.Debugf("Not restarting missing consumer '%s > %s > %s', too soon since creation %v",
accName, streamName, consumer, time.Since(ca.Created))
js.mu.Unlock()
return
}
// Make sure the node is stopped if still running.
if node != nil && node.State() != Closed {
node.Stop()
}
ca.Group.node = nil
deleted := ca.deleted
js.mu.Unlock()
if !deleted {
js.processConsumerAssignment(ca)
Expand Down Expand Up @@ -4112,6 +4132,7 @@ func (js *jetStream) processConsumerRemoval(ca *consumerAssignment) {
// Make sure this removal is for what we have, otherwise ignore.
if ca.Group != nil && oca.Group != nil && ca.Group.Name == oca.Group.Name {
needDelete = true
oca.deleted = true
delete(sa.consumers, ca.Name)
}
}
Expand Down
2 changes: 1 addition & 1 deletion server/jetstream_cluster_3_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3786,7 +3786,7 @@ func TestJetStreamClusterHealthzCheckForStoppedAssets(t *testing.T) {
// Wait for exit.
time.Sleep(100 * time.Millisecond)

checkFor(t, 5*time.Second, 500*time.Millisecond, func() error {
checkFor(t, 15*time.Second, 500*time.Millisecond, func() error {
hs := s.healthz(nil)
if hs.Error != _EMPTY_ {
return errors.New(hs.Error)
Expand Down
124 changes: 124 additions & 0 deletions server/norace_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9390,3 +9390,127 @@ func TestNoRaceJetStreamClusterStreamCatchupLargeInteriorDeletes(t *testing.T) {
return fmt.Errorf("Msgs not equal %d vs %d", state.Msgs, si.State.Msgs)
})
}

func TestNoRaceJetStreamClusterBadRestartsWithHealthzPolling(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()

nc, js := jsClientConnect(t, c.randomServer())
defer nc.Close()

cfg := &nats.StreamConfig{
Name: "TEST",
Subjects: []string{"foo.>"},
Replicas: 3,
}
_, err := js.AddStream(cfg)
require_NoError(t, err)

// We will poll healthz at a decent clip and make sure any restart logic works
// correctly with assets coming and going.
ch := make(chan struct{})
defer close(ch)

go func() {
for {
select {
case <-ch:
return
case <-time.After(50 * time.Millisecond):
for _, s := range c.servers {
s.healthz(nil)
}
}
}
}()

numConsumers := 500
consumers := make([]string, 0, numConsumers)

var wg sync.WaitGroup

for i := 0; i < numConsumers; i++ {
cname := fmt.Sprintf("CONS-%d", i+1)
consumers = append(consumers, cname)
wg.Add(1)
go func() {
defer wg.Done()
_, err := js.PullSubscribe("foo.>", cname, nats.BindStream("TEST"))
require_NoError(t, err)
}()
}
wg.Wait()

// Make sure all are reported.
checkFor(t, 5*time.Second, 100*time.Millisecond, func() error {
for _, s := range c.servers {
jsz, _ := s.Jsz(nil)
if jsz.Consumers != numConsumers {
return fmt.Errorf("%v wrong number of consumers: %d vs %d", s, jsz.Consumers, numConsumers)
}
}
return nil
})

// Now do same for streams.
numStreams := 200
streams := make([]string, 0, numStreams)

for i := 0; i < numStreams; i++ {
sname := fmt.Sprintf("TEST-%d", i+1)
streams = append(streams, sname)
wg.Add(1)
go func() {
defer wg.Done()
_, err := js.AddStream(&nats.StreamConfig{Name: sname, Replicas: 3})
require_NoError(t, err)
}()
}
wg.Wait()

// Make sure all are reported.
checkFor(t, 5*time.Second, 100*time.Millisecond, func() error {
for _, s := range c.servers {
jsz, _ := s.Jsz(nil)
if jsz.Streams != numStreams+1 {
return fmt.Errorf("%v wrong number of streams: %d vs %d", s, jsz.Streams, numStreams+1)
}
}
return nil
})

// Delete consumers.
for _, cname := range consumers {
err := js.DeleteConsumer("TEST", cname)
require_NoError(t, err)
}
// Make sure reporting goes to zero.
checkFor(t, 5*time.Second, 100*time.Millisecond, func() error {
for _, s := range c.servers {
jsz, _ := s.Jsz(nil)
if jsz.Consumers != 0 {
return fmt.Errorf("%v still has %d consumers", s, jsz.Consumers)
}
}
return nil
})

// Delete streams
for _, sname := range streams {
err := js.DeleteStream(sname)
require_NoError(t, err)
}
err = js.DeleteStream("TEST")
require_NoError(t, err)

// Make sure reporting goes to zero.
checkFor(t, 5*time.Second, 100*time.Millisecond, func() error {
for _, s := range c.servers {
jsz, _ := s.Jsz(nil)
if jsz.Streams != 0 {
return fmt.Errorf("%v still has %d streams", s, jsz.Streams)
}
}
return nil
})
}