Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cherry picks for Release v2.10.11 #5084

Merged
merged 4 commits into from
Feb 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ require (
github.com/klauspost/compress v1.17.6
github.com/minio/highwayhash v1.0.2
github.com/nats-io/jwt/v2 v2.5.3
github.com/nats-io/nats.go v1.32.0
github.com/nats-io/nats.go v1.33.0
github.com/nats-io/nkeys v0.4.7
github.com/nats-io/nuid v1.0.1
go.uber.org/automaxprocs v1.5.3
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ github.com/nats-io/jwt/v2 v2.5.3 h1:/9SWvzc6hTfamcgXJ3uYRpgj+QuY2aLNqRiqrKcrpEo=
github.com/nats-io/jwt/v2 v2.5.3/go.mod h1:iysuPemFcc7p4IoYots3IuELSI4EDe9Y0bQMe+I3Bf4=
github.com/nats-io/nats.go v1.32.0 h1:Bx9BZS+aXYlxW08k8Gd3yR2s73pV5XSoAQUyp1Kwvp0=
github.com/nats-io/nats.go v1.32.0/go.mod h1:Ubdu4Nh9exXdSz0RVWRFBbRfrbSxOYd26oF0wkWclB8=
github.com/nats-io/nats.go v1.33.0 h1:rRg0l2F29B30n6EPl0j50hl8eYp7rA2ecoJ74E62US8=
github.com/nats-io/nats.go v1.33.0/go.mod h1:Ubdu4Nh9exXdSz0RVWRFBbRfrbSxOYd26oF0wkWclB8=
github.com/nats-io/nkeys v0.4.7 h1:RwNJbbIdYCoClSDNY7QVKZlyb/wfT6ugvFCiKy6vDvI=
github.com/nats-io/nkeys v0.4.7/go.mod h1:kqXRgRDPlGy7nGaEDMuYzmiJCIAAWDK0IMBtDmGD0nc=
github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw=
Expand Down
22 changes: 17 additions & 5 deletions server/filestore.go
Original file line number Diff line number Diff line change
Expand Up @@ -1466,6 +1466,16 @@ func (fs *fileStore) warn(format string, args ...any) {
fs.srv.Warnf(fmt.Sprintf("Filestore [%s] %s", fs.cfg.Name, format), args...)
}

// For doing debug logging.
// Lock should be held.
func (fs *fileStore) debug(format string, args ...any) {
// No-op if no server configured.
if fs.srv == nil {
return
}
fs.srv.Debugf(fmt.Sprintf("Filestore [%s] %s", fs.cfg.Name, format), args...)
}

// Track local state but ignore timestamps here.
func updateTrackingState(state *StreamState, mb *msgBlock) {
if state.FirstSeq == 0 {
Expand Down Expand Up @@ -2225,8 +2235,11 @@ func (mb *msgBlock) firstMatching(filter string, wc bool, start uint64, sm *Stor

fseq, isAll, subs := start, filter == _EMPTY_ || filter == fwcs, []string{filter}

if err := mb.ensurePerSubjectInfoLoaded(); err != nil {
return nil, false, err
var didLoad bool
if mb.fssNotLoaded() {
// Make sure we have fss loaded.
mb.loadMsgsWithLock()
didLoad = true
}

// If we only have 1 subject currently and it matches our filter we can also set isAll.
Expand Down Expand Up @@ -2271,10 +2284,9 @@ func (mb *msgBlock) firstMatching(filter string, wc bool, start uint64, sm *Stor
}

if fseq > lseq {
return nil, false, ErrStoreMsgNotFound
return nil, didLoad, ErrStoreMsgNotFound
}

var didLoad bool
// Need messages loaded from here on out.
if mb.cacheNotLoaded() {
if err := mb.loadMsgsWithLock(); err != nil {
Expand Down Expand Up @@ -7540,7 +7552,7 @@ func (fs *fileStore) writeFullState() error {
}

if cap(buf) > sz {
fs.warn("WriteFullState reallocated from %d to %d", sz, cap(buf))
fs.debug("WriteFullState reallocated from %d to %d", sz, cap(buf))
}

// Write to a tmp file and rename.
Expand Down
106 changes: 52 additions & 54 deletions server/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,15 +196,14 @@ type raft struct {
hcommit uint64 // The commit at the time that applies were paused
pobserver bool // Whether we were an observer at the time that applies were paused

prop *ipQueue[*Entry] // Proposals
entry *ipQueue[*appendEntry] // Append entries
resp *ipQueue[*appendEntryResponse] // Append entries responses
apply *ipQueue[*CommittedEntry] // Apply queue (committed entries to be passed to upper layer)
reqs *ipQueue[*voteRequest] // Vote requests
votes *ipQueue[*voteResponse] // Vote responses
stepdown *ipQueue[string] // Stepdown requests
leadc chan bool // Leader changes
quit chan struct{} // Raft group shutdown
prop *ipQueue[*Entry] // Proposals
entry *ipQueue[*appendEntry] // Append entries
resp *ipQueue[*appendEntryResponse] // Append entries responses
apply *ipQueue[*CommittedEntry] // Apply queue (committed entries to be passed to upper layer)
reqs *ipQueue[*voteRequest] // Vote requests
votes *ipQueue[*voteResponse] // Vote responses
leadc chan bool // Leader changes
quit chan struct{} // Raft group shutdown
}

// cacthupState structure that holds our subscription, and catchup term and index
Expand Down Expand Up @@ -388,7 +387,6 @@ func (s *Server) startRaftNode(accName string, cfg *RaftConfig, labels pprofLabe
entry: newIPQueue[*appendEntry](s, qpfx+"appendEntry"),
resp: newIPQueue[*appendEntryResponse](s, qpfx+"appendEntryResponse"),
apply: newIPQueue[*CommittedEntry](s, qpfx+"committedEntry"),
stepdown: newIPQueue[string](s, qpfx+"stepdown"),
accName: accName,
leadc: make(chan bool, 1),
observer: cfg.Observer,
Expand Down Expand Up @@ -863,7 +861,7 @@ func (n *raft) PauseApply() error {

// If we are currently a candidate make sure we step down.
if n.State() == Candidate {
n.stepdown.push(noLeader)
n.stepdownLocked(noLeader)
}

n.debug("Pausing our apply channel")
Expand Down Expand Up @@ -1252,6 +1250,21 @@ func (n *raft) Leader() bool {
return n.State() == Leader
}

// stepdown immediately steps down the Raft node to the
// follower state. This will take the lock itself.
func (n *raft) stepdown(newLeader string) {
n.Lock()
defer n.Unlock()
n.stepdownLocked(newLeader)
}

// stepdownLocked immediately steps down the Raft node to the
// follower state. This requires the lock is already held.
func (n *raft) stepdownLocked(newLeader string) {
n.debug("Stepping down")
n.switchToFollowerLocked(newLeader)
}

// isCatchingUp returns true if a catchup is currently taking place.
func (n *raft) isCatchingUp() bool {
n.RLock()
Expand Down Expand Up @@ -1459,7 +1472,6 @@ func (n *raft) StepDown(preferred ...string) error {
n.vote = noVote
n.writeTermVote()

stepdown := n.stepdown
prop := n.prop
n.Unlock()

Expand All @@ -1473,8 +1485,7 @@ func (n *raft) StepDown(preferred ...string) error {
prop.push(newEntry(EntryLeaderTransfer, []byte(maybeLeader)))
} else {
// Force us to stepdown here.
n.debug("Stepping down")
stepdown.push(noLeader)
n.stepdown(noLeader)
}

return nil
Expand Down Expand Up @@ -1651,7 +1662,7 @@ func (n *raft) shutdown(shouldDelete bool) {
// just will remove them from the central monitoring map
queues := []interface {
unregister()
}{n.reqs, n.votes, n.prop, n.entry, n.resp, n.apply, n.stepdown}
}{n.reqs, n.votes, n.prop, n.entry, n.resp, n.apply}
for _, q := range queues {
q.unregister()
}
Expand Down Expand Up @@ -1913,7 +1924,7 @@ func (n *raft) processAppendEntries() {
// runAsFollower is called by run and will block for as long as the node is
// running in the follower state.
func (n *raft) runAsFollower() {
for {
for n.State() == Follower {
elect := n.electTimer()

select {
Expand Down Expand Up @@ -1964,13 +1975,6 @@ func (n *raft) runAsFollower() {
if voteReq, ok := n.reqs.popOne(); ok {
n.processVoteRequest(voteReq)
}
case <-n.stepdown.ch:
// We've received a stepdown request, start following the new leader if
// we can.
if newLeader, ok := n.stepdown.popOne(); ok {
n.switchToFollower(newLeader)
return
}
}
}
}
Expand Down Expand Up @@ -2307,14 +2311,14 @@ func (n *raft) runAsLeader() {
fsub, err := n.subscribe(psubj, n.handleForwardedProposal)
if err != nil {
n.warn("Error subscribing to forwarded proposals: %v", err)
n.stepdown.push(noLeader)
n.stepdown(noLeader)
return
}
rpsub, err := n.subscribe(rpsubj, n.handleForwardedRemovePeerProposal)
if err != nil {
n.warn("Error subscribing to forwarded remove peer proposals: %v", err)
n.unsubscribe(fsub)
n.stepdown.push(noLeader)
n.stepdown(noLeader)
return
}

Expand Down Expand Up @@ -2369,7 +2373,7 @@ func (n *raft) runAsLeader() {
if b.Type == EntryLeaderTransfer {
n.prop.recycle(&es)
n.debug("Stepping down due to leadership transfer")
n.switchToFollower(noLeader)
n.stepdown(noLeader)
return
}
// We need to re-create `entries` because there is a reference
Expand All @@ -2384,7 +2388,7 @@ func (n *raft) runAsLeader() {
}
case <-lq.C:
if n.lostQuorum() {
n.switchToFollower(noLeader)
n.stepdown(noLeader)
return
}
case <-n.votes.ch:
Expand All @@ -2394,7 +2398,7 @@ func (n *raft) runAsLeader() {
continue
}
if vresp.term > n.Term() {
n.switchToFollower(noLeader)
n.stepdown(noLeader)
return
}
n.trackPeer(vresp.peer)
Expand All @@ -2403,11 +2407,6 @@ func (n *raft) runAsLeader() {
if voteReq, ok := n.reqs.popOne(); ok {
n.processVoteRequest(voteReq)
}
case <-n.stepdown.ch:
if newLeader, ok := n.stepdown.popOne(); ok {
n.switchToFollower(newLeader)
return
}
case <-n.entry.ch:
n.processAppendEntries()
}
Expand Down Expand Up @@ -2576,7 +2575,7 @@ func (n *raft) sendSnapshotToFollower(subject string) (uint64, error) {
snap, err := n.loadLastSnapshot()
if err != nil {
// We need to stepdown here when this happens.
n.stepdown.push(noLeader)
n.stepdownLocked(noLeader)
// We need to reset our state here as well.
n.resetWAL()
return 0, err
Expand Down Expand Up @@ -2642,7 +2641,7 @@ func (n *raft) catchupFollower(ar *appendEntryResponse) {
n.warn("Request from follower for entry at index [%d] errored for state %+v - %v", start, state, err)
if err == ErrStoreEOF {
// If we are here we are seeing a request for an item beyond our state, meaning we should stepdown.
n.stepdown.push(noLeader)
n.stepdownLocked(noLeader)
n.Unlock()
arPool.Put(ar)
return
Expand All @@ -2654,7 +2653,7 @@ func (n *raft) catchupFollower(ar *appendEntryResponse) {
// If we are here we are seeing a request for an item we do not have, meaning we should stepdown.
// This is possible on a reset of our WAL but the other side has a snapshot already.
// If we do not stepdown this can cycle.
n.stepdown.push(noLeader)
n.stepdownLocked(noLeader)
n.Unlock()
arPool.Put(ar)
return
Expand Down Expand Up @@ -2711,7 +2710,7 @@ func (n *raft) applyCommit(index uint64) error {
if err != ErrStoreClosed && err != ErrStoreEOF {
n.warn("Got an error loading %d index: %v - will reset", index, err)
if n.State() == Leader {
n.stepdown.push(n.selectNextLeader())
n.stepdownLocked(n.selectNextLeader())
}
// Reset and cancel any catchup.
n.resetWAL()
Expand Down Expand Up @@ -2789,7 +2788,7 @@ func (n *raft) applyCommit(index uint64) error {

// If this is us and we are the leader we should attempt to stepdown.
if peer == n.id && n.State() == Leader {
n.stepdown.push(n.selectNextLeader())
n.stepdown(n.selectNextLeader())
}

// Remove from string intern map.
Expand Down Expand Up @@ -2922,7 +2921,7 @@ func (n *raft) runAsCandidate() {
// We vote for ourselves.
votes := 1

for {
for n.State() == Candidate {
elect := n.electTimer()
select {
case <-n.entry.ch:
Expand Down Expand Up @@ -2965,20 +2964,15 @@ func (n *raft) runAsCandidate() {
n.term = vresp.term
n.vote = noVote
n.writeTermVote()
n.stepdown.push(noLeader)
n.lxfer = false
n.stepdownLocked(noLeader)
n.Unlock()
}
case <-n.reqs.ch:
// Because of drain() it is possible that we get nil from popOne().
if voteReq, ok := n.reqs.popOne(); ok {
n.processVoteRequest(voteReq)
}
case <-n.stepdown.ch:
if newLeader, ok := n.stepdown.popOne(); ok {
n.switchToFollower(newLeader)
return
}
}
}
}
Expand Down Expand Up @@ -3132,7 +3126,7 @@ func (n *raft) processAppendEntry(ae *appendEntry, sub *subscription) {
n.vote = noVote
n.writeTermVote()
n.debug("Received append entry from another leader, stepping down to %q", ae.leader)
n.stepdown.push(ae.leader)
n.stepdownLocked(ae.leader)
} else {
// Let them know we are the leader.
ar := newAppendEntryResponse(n.term, n.pindex, n.id, false)
Expand All @@ -3155,7 +3149,7 @@ func (n *raft) processAppendEntry(ae *appendEntry, sub *subscription) {
n.vote = noVote
n.writeTermVote()
}
n.stepdown.push(ae.leader)
n.stepdownLocked(ae.leader)
}

// Catching up state.
Expand Down Expand Up @@ -3217,7 +3211,7 @@ func (n *raft) processAppendEntry(ae *appendEntry, sub *subscription) {
}
if n.State() != Follower {
n.debug("Term higher than ours and we are not a follower: %v, stepping down to %q", n.State(), ae.leader)
n.stepdown.push(ae.leader)
n.stepdownLocked(ae.leader)
}
}

Expand Down Expand Up @@ -3458,7 +3452,7 @@ func (n *raft) processAppendEntryResponse(ar *appendEntryResponse) {
n.vote = noVote
n.writeTermVote()
n.warn("Detected another leader with higher term, will stepdown and reset")
n.stepdown.push(noLeader)
n.stepdownLocked(noLeader)
n.resetWAL()
n.Unlock()
arPool.Put(ar)
Expand Down Expand Up @@ -3506,7 +3500,7 @@ func (n *raft) storeToWAL(ae *appendEntry) error {
if index := ae.pindex + 1; index != seq {
n.warn("Wrong index, ae is %+v, index stored was %d, n.pindex is %d, will reset", ae, seq, n.pindex)
if n.State() == Leader {
n.stepdown.push(n.selectNextLeader())
n.stepdownLocked(n.selectNextLeader())
}
// Reset and cancel any catchup.
n.resetWAL()
Expand Down Expand Up @@ -3933,7 +3927,7 @@ func (n *raft) processVoteRequest(vr *voteRequest) error {
if n.State() != Follower {
n.debug("Stepping down from %s, detected higher term: %d vs %d",
strings.ToLower(n.State().String()), vr.term, n.term)
n.stepdown.push(noLeader)
n.stepdownLocked(noLeader)
n.term = vr.term
}
n.vote = noVote
Expand Down Expand Up @@ -4064,13 +4058,17 @@ const (
)

func (n *raft) switchToFollower(leader string) {
n.Lock()
defer n.Unlock()

n.switchToFollowerLocked(leader)
}

func (n *raft) switchToFollowerLocked(leader string) {
if n.State() == Closed {
return
}

n.Lock()
defer n.Unlock()

n.debug("Switching to follower")

n.lxfer = false
Expand Down