Skip to content

Commit

Permalink
Cherry picks for Release v2.10.11 (#5084)
Browse files Browse the repository at this point in the history
  • Loading branch information
wallyqs committed Feb 14, 2024
2 parents 5325644 + 82ea395 commit 11f5808
Show file tree
Hide file tree
Showing 4 changed files with 72 additions and 60 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ require (
github.com/klauspost/compress v1.17.6
github.com/minio/highwayhash v1.0.2
github.com/nats-io/jwt/v2 v2.5.3
github.com/nats-io/nats.go v1.32.0
github.com/nats-io/nats.go v1.33.0
github.com/nats-io/nkeys v0.4.7
github.com/nats-io/nuid v1.0.1
go.uber.org/automaxprocs v1.5.3
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ github.com/nats-io/jwt/v2 v2.5.3 h1:/9SWvzc6hTfamcgXJ3uYRpgj+QuY2aLNqRiqrKcrpEo=
github.com/nats-io/jwt/v2 v2.5.3/go.mod h1:iysuPemFcc7p4IoYots3IuELSI4EDe9Y0bQMe+I3Bf4=
github.com/nats-io/nats.go v1.32.0 h1:Bx9BZS+aXYlxW08k8Gd3yR2s73pV5XSoAQUyp1Kwvp0=
github.com/nats-io/nats.go v1.32.0/go.mod h1:Ubdu4Nh9exXdSz0RVWRFBbRfrbSxOYd26oF0wkWclB8=
github.com/nats-io/nats.go v1.33.0 h1:rRg0l2F29B30n6EPl0j50hl8eYp7rA2ecoJ74E62US8=
github.com/nats-io/nats.go v1.33.0/go.mod h1:Ubdu4Nh9exXdSz0RVWRFBbRfrbSxOYd26oF0wkWclB8=
github.com/nats-io/nkeys v0.4.7 h1:RwNJbbIdYCoClSDNY7QVKZlyb/wfT6ugvFCiKy6vDvI=
github.com/nats-io/nkeys v0.4.7/go.mod h1:kqXRgRDPlGy7nGaEDMuYzmiJCIAAWDK0IMBtDmGD0nc=
github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw=
Expand Down
22 changes: 17 additions & 5 deletions server/filestore.go
Original file line number Diff line number Diff line change
Expand Up @@ -1466,6 +1466,16 @@ func (fs *fileStore) warn(format string, args ...any) {
fs.srv.Warnf(fmt.Sprintf("Filestore [%s] %s", fs.cfg.Name, format), args...)
}

// For doing debug logging.
// Lock should be held.
func (fs *fileStore) debug(format string, args ...any) {
// No-op if no server configured.
if fs.srv == nil {
return
}
fs.srv.Debugf(fmt.Sprintf("Filestore [%s] %s", fs.cfg.Name, format), args...)
}

// Track local state but ignore timestamps here.
func updateTrackingState(state *StreamState, mb *msgBlock) {
if state.FirstSeq == 0 {
Expand Down Expand Up @@ -2225,8 +2235,11 @@ func (mb *msgBlock) firstMatching(filter string, wc bool, start uint64, sm *Stor

fseq, isAll, subs := start, filter == _EMPTY_ || filter == fwcs, []string{filter}

if err := mb.ensurePerSubjectInfoLoaded(); err != nil {
return nil, false, err
var didLoad bool
if mb.fssNotLoaded() {
// Make sure we have fss loaded.
mb.loadMsgsWithLock()
didLoad = true
}

// If we only have 1 subject currently and it matches our filter we can also set isAll.
Expand Down Expand Up @@ -2271,10 +2284,9 @@ func (mb *msgBlock) firstMatching(filter string, wc bool, start uint64, sm *Stor
}

if fseq > lseq {
return nil, false, ErrStoreMsgNotFound
return nil, didLoad, ErrStoreMsgNotFound
}

var didLoad bool
// Need messages loaded from here on out.
if mb.cacheNotLoaded() {
if err := mb.loadMsgsWithLock(); err != nil {
Expand Down Expand Up @@ -7540,7 +7552,7 @@ func (fs *fileStore) writeFullState() error {
}

if cap(buf) > sz {
fs.warn("WriteFullState reallocated from %d to %d", sz, cap(buf))
fs.debug("WriteFullState reallocated from %d to %d", sz, cap(buf))
}

// Write to a tmp file and rename.
Expand Down
106 changes: 52 additions & 54 deletions server/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,15 +196,14 @@ type raft struct {
hcommit uint64 // The commit at the time that applies were paused
pobserver bool // Whether we were an observer at the time that applies were paused

prop *ipQueue[*Entry] // Proposals
entry *ipQueue[*appendEntry] // Append entries
resp *ipQueue[*appendEntryResponse] // Append entries responses
apply *ipQueue[*CommittedEntry] // Apply queue (committed entries to be passed to upper layer)
reqs *ipQueue[*voteRequest] // Vote requests
votes *ipQueue[*voteResponse] // Vote responses
stepdown *ipQueue[string] // Stepdown requests
leadc chan bool // Leader changes
quit chan struct{} // Raft group shutdown
prop *ipQueue[*Entry] // Proposals
entry *ipQueue[*appendEntry] // Append entries
resp *ipQueue[*appendEntryResponse] // Append entries responses
apply *ipQueue[*CommittedEntry] // Apply queue (committed entries to be passed to upper layer)
reqs *ipQueue[*voteRequest] // Vote requests
votes *ipQueue[*voteResponse] // Vote responses
leadc chan bool // Leader changes
quit chan struct{} // Raft group shutdown
}

// cacthupState structure that holds our subscription, and catchup term and index
Expand Down Expand Up @@ -388,7 +387,6 @@ func (s *Server) startRaftNode(accName string, cfg *RaftConfig, labels pprofLabe
entry: newIPQueue[*appendEntry](s, qpfx+"appendEntry"),
resp: newIPQueue[*appendEntryResponse](s, qpfx+"appendEntryResponse"),
apply: newIPQueue[*CommittedEntry](s, qpfx+"committedEntry"),
stepdown: newIPQueue[string](s, qpfx+"stepdown"),
accName: accName,
leadc: make(chan bool, 1),
observer: cfg.Observer,
Expand Down Expand Up @@ -863,7 +861,7 @@ func (n *raft) PauseApply() error {

// If we are currently a candidate make sure we step down.
if n.State() == Candidate {
n.stepdown.push(noLeader)
n.stepdownLocked(noLeader)
}

n.debug("Pausing our apply channel")
Expand Down Expand Up @@ -1252,6 +1250,21 @@ func (n *raft) Leader() bool {
return n.State() == Leader
}

// stepdown immediately steps down the Raft node to the
// follower state. This will take the lock itself.
func (n *raft) stepdown(newLeader string) {
n.Lock()
defer n.Unlock()
n.stepdownLocked(newLeader)
}

// stepdownLocked immediately steps down the Raft node to the
// follower state. This requires the lock is already held.
func (n *raft) stepdownLocked(newLeader string) {
n.debug("Stepping down")
n.switchToFollowerLocked(newLeader)
}

// isCatchingUp returns true if a catchup is currently taking place.
func (n *raft) isCatchingUp() bool {
n.RLock()
Expand Down Expand Up @@ -1459,7 +1472,6 @@ func (n *raft) StepDown(preferred ...string) error {
n.vote = noVote
n.writeTermVote()

stepdown := n.stepdown
prop := n.prop
n.Unlock()

Expand All @@ -1473,8 +1485,7 @@ func (n *raft) StepDown(preferred ...string) error {
prop.push(newEntry(EntryLeaderTransfer, []byte(maybeLeader)))
} else {
// Force us to stepdown here.
n.debug("Stepping down")
stepdown.push(noLeader)
n.stepdown(noLeader)
}

return nil
Expand Down Expand Up @@ -1651,7 +1662,7 @@ func (n *raft) shutdown(shouldDelete bool) {
// just will remove them from the central monitoring map
queues := []interface {
unregister()
}{n.reqs, n.votes, n.prop, n.entry, n.resp, n.apply, n.stepdown}
}{n.reqs, n.votes, n.prop, n.entry, n.resp, n.apply}
for _, q := range queues {
q.unregister()
}
Expand Down Expand Up @@ -1913,7 +1924,7 @@ func (n *raft) processAppendEntries() {
// runAsFollower is called by run and will block for as long as the node is
// running in the follower state.
func (n *raft) runAsFollower() {
for {
for n.State() == Follower {
elect := n.electTimer()

select {
Expand Down Expand Up @@ -1964,13 +1975,6 @@ func (n *raft) runAsFollower() {
if voteReq, ok := n.reqs.popOne(); ok {
n.processVoteRequest(voteReq)
}
case <-n.stepdown.ch:
// We've received a stepdown request, start following the new leader if
// we can.
if newLeader, ok := n.stepdown.popOne(); ok {
n.switchToFollower(newLeader)
return
}
}
}
}
Expand Down Expand Up @@ -2307,14 +2311,14 @@ func (n *raft) runAsLeader() {
fsub, err := n.subscribe(psubj, n.handleForwardedProposal)
if err != nil {
n.warn("Error subscribing to forwarded proposals: %v", err)
n.stepdown.push(noLeader)
n.stepdown(noLeader)
return
}
rpsub, err := n.subscribe(rpsubj, n.handleForwardedRemovePeerProposal)
if err != nil {
n.warn("Error subscribing to forwarded remove peer proposals: %v", err)
n.unsubscribe(fsub)
n.stepdown.push(noLeader)
n.stepdown(noLeader)
return
}

Expand Down Expand Up @@ -2369,7 +2373,7 @@ func (n *raft) runAsLeader() {
if b.Type == EntryLeaderTransfer {
n.prop.recycle(&es)
n.debug("Stepping down due to leadership transfer")
n.switchToFollower(noLeader)
n.stepdown(noLeader)
return
}
// We need to re-create `entries` because there is a reference
Expand All @@ -2384,7 +2388,7 @@ func (n *raft) runAsLeader() {
}
case <-lq.C:
if n.lostQuorum() {
n.switchToFollower(noLeader)
n.stepdown(noLeader)
return
}
case <-n.votes.ch:
Expand All @@ -2394,7 +2398,7 @@ func (n *raft) runAsLeader() {
continue
}
if vresp.term > n.Term() {
n.switchToFollower(noLeader)
n.stepdown(noLeader)
return
}
n.trackPeer(vresp.peer)
Expand All @@ -2403,11 +2407,6 @@ func (n *raft) runAsLeader() {
if voteReq, ok := n.reqs.popOne(); ok {
n.processVoteRequest(voteReq)
}
case <-n.stepdown.ch:
if newLeader, ok := n.stepdown.popOne(); ok {
n.switchToFollower(newLeader)
return
}
case <-n.entry.ch:
n.processAppendEntries()
}
Expand Down Expand Up @@ -2576,7 +2575,7 @@ func (n *raft) sendSnapshotToFollower(subject string) (uint64, error) {
snap, err := n.loadLastSnapshot()
if err != nil {
// We need to stepdown here when this happens.
n.stepdown.push(noLeader)
n.stepdownLocked(noLeader)
// We need to reset our state here as well.
n.resetWAL()
return 0, err
Expand Down Expand Up @@ -2642,7 +2641,7 @@ func (n *raft) catchupFollower(ar *appendEntryResponse) {
n.warn("Request from follower for entry at index [%d] errored for state %+v - %v", start, state, err)
if err == ErrStoreEOF {
// If we are here we are seeing a request for an item beyond our state, meaning we should stepdown.
n.stepdown.push(noLeader)
n.stepdownLocked(noLeader)
n.Unlock()
arPool.Put(ar)
return
Expand All @@ -2654,7 +2653,7 @@ func (n *raft) catchupFollower(ar *appendEntryResponse) {
// If we are here we are seeing a request for an item we do not have, meaning we should stepdown.
// This is possible on a reset of our WAL but the other side has a snapshot already.
// If we do not stepdown this can cycle.
n.stepdown.push(noLeader)
n.stepdownLocked(noLeader)
n.Unlock()
arPool.Put(ar)
return
Expand Down Expand Up @@ -2711,7 +2710,7 @@ func (n *raft) applyCommit(index uint64) error {
if err != ErrStoreClosed && err != ErrStoreEOF {
n.warn("Got an error loading %d index: %v - will reset", index, err)
if n.State() == Leader {
n.stepdown.push(n.selectNextLeader())
n.stepdownLocked(n.selectNextLeader())
}
// Reset and cancel any catchup.
n.resetWAL()
Expand Down Expand Up @@ -2789,7 +2788,7 @@ func (n *raft) applyCommit(index uint64) error {

// If this is us and we are the leader we should attempt to stepdown.
if peer == n.id && n.State() == Leader {
n.stepdown.push(n.selectNextLeader())
n.stepdown(n.selectNextLeader())
}

// Remove from string intern map.
Expand Down Expand Up @@ -2922,7 +2921,7 @@ func (n *raft) runAsCandidate() {
// We vote for ourselves.
votes := 1

for {
for n.State() == Candidate {
elect := n.electTimer()
select {
case <-n.entry.ch:
Expand Down Expand Up @@ -2965,20 +2964,15 @@ func (n *raft) runAsCandidate() {
n.term = vresp.term
n.vote = noVote
n.writeTermVote()
n.stepdown.push(noLeader)
n.lxfer = false
n.stepdownLocked(noLeader)
n.Unlock()
}
case <-n.reqs.ch:
// Because of drain() it is possible that we get nil from popOne().
if voteReq, ok := n.reqs.popOne(); ok {
n.processVoteRequest(voteReq)
}
case <-n.stepdown.ch:
if newLeader, ok := n.stepdown.popOne(); ok {
n.switchToFollower(newLeader)
return
}
}
}
}
Expand Down Expand Up @@ -3132,7 +3126,7 @@ func (n *raft) processAppendEntry(ae *appendEntry, sub *subscription) {
n.vote = noVote
n.writeTermVote()
n.debug("Received append entry from another leader, stepping down to %q", ae.leader)
n.stepdown.push(ae.leader)
n.stepdownLocked(ae.leader)
} else {
// Let them know we are the leader.
ar := newAppendEntryResponse(n.term, n.pindex, n.id, false)
Expand All @@ -3155,7 +3149,7 @@ func (n *raft) processAppendEntry(ae *appendEntry, sub *subscription) {
n.vote = noVote
n.writeTermVote()
}
n.stepdown.push(ae.leader)
n.stepdownLocked(ae.leader)
}

// Catching up state.
Expand Down Expand Up @@ -3217,7 +3211,7 @@ func (n *raft) processAppendEntry(ae *appendEntry, sub *subscription) {
}
if n.State() != Follower {
n.debug("Term higher than ours and we are not a follower: %v, stepping down to %q", n.State(), ae.leader)
n.stepdown.push(ae.leader)
n.stepdownLocked(ae.leader)
}
}

Expand Down Expand Up @@ -3458,7 +3452,7 @@ func (n *raft) processAppendEntryResponse(ar *appendEntryResponse) {
n.vote = noVote
n.writeTermVote()
n.warn("Detected another leader with higher term, will stepdown and reset")
n.stepdown.push(noLeader)
n.stepdownLocked(noLeader)
n.resetWAL()
n.Unlock()
arPool.Put(ar)
Expand Down Expand Up @@ -3506,7 +3500,7 @@ func (n *raft) storeToWAL(ae *appendEntry) error {
if index := ae.pindex + 1; index != seq {
n.warn("Wrong index, ae is %+v, index stored was %d, n.pindex is %d, will reset", ae, seq, n.pindex)
if n.State() == Leader {
n.stepdown.push(n.selectNextLeader())
n.stepdownLocked(n.selectNextLeader())
}
// Reset and cancel any catchup.
n.resetWAL()
Expand Down Expand Up @@ -3933,7 +3927,7 @@ func (n *raft) processVoteRequest(vr *voteRequest) error {
if n.State() != Follower {
n.debug("Stepping down from %s, detected higher term: %d vs %d",
strings.ToLower(n.State().String()), vr.term, n.term)
n.stepdown.push(noLeader)
n.stepdownLocked(noLeader)
n.term = vr.term
}
n.vote = noVote
Expand Down Expand Up @@ -4064,13 +4058,17 @@ const (
)

func (n *raft) switchToFollower(leader string) {
n.Lock()
defer n.Unlock()

n.switchToFollowerLocked(leader)
}

func (n *raft) switchToFollowerLocked(leader string) {
if n.State() == Closed {
return
}

n.Lock()
defer n.Unlock()

n.debug("Switching to follower")

n.lxfer = false
Expand Down

0 comments on commit 11f5808

Please sign in to comment.