Skip to content

Commit

Permalink
NRG: Remove stepdown channel, handle inline
Browse files Browse the repository at this point in the history
The stepdown channel interleaves with other channels such as the apply queue,
leader change notifications etc in the `runAs` goroutines in an unpredictable
order, so processing a stepdown request might be delayed behind other work.
Doing this inline should be safer with stronger guarantees.

Signed-off-by: Neil Twigg <[email protected]>
  • Loading branch information
neilalexander committed Jul 17, 2024
1 parent fd284c8 commit a61d145
Show file tree
Hide file tree
Showing 2 changed files with 74 additions and 67 deletions.
126 changes: 59 additions & 67 deletions server/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,15 +198,14 @@ type raft struct {
hcommit uint64 // The commit at the time that applies were paused
pobserver bool // Whether we were an observer at the time that applies were paused

prop *ipQueue[*Entry] // Proposals
entry *ipQueue[*appendEntry] // Append entries
resp *ipQueue[*appendEntryResponse] // Append entries responses
apply *ipQueue[*CommittedEntry] // Apply queue (committed entries to be passed to upper layer)
reqs *ipQueue[*voteRequest] // Vote requests
votes *ipQueue[*voteResponse] // Vote responses
stepdown *ipQueue[string] // Stepdown requests
leadc chan bool // Leader changes
quit chan struct{} // Raft group shutdown
prop *ipQueue[*Entry] // Proposals
entry *ipQueue[*appendEntry] // Append entries
resp *ipQueue[*appendEntryResponse] // Append entries responses
apply *ipQueue[*CommittedEntry] // Apply queue (committed entries to be passed to upper layer)
reqs *ipQueue[*voteRequest] // Vote requests
votes *ipQueue[*voteResponse] // Vote responses
leadc chan bool // Leader changes
quit chan struct{} // Raft group shutdown
}

// cacthupState structure that holds our subscription, and catchup term and index
Expand Down Expand Up @@ -391,7 +390,6 @@ func (s *Server) startRaftNode(accName string, cfg *RaftConfig, labels pprofLabe
entry: newIPQueue[*appendEntry](s, qpfx+"appendEntry"),
resp: newIPQueue[*appendEntryResponse](s, qpfx+"appendEntryResponse"),
apply: newIPQueue[*CommittedEntry](s, qpfx+"committedEntry"),
stepdown: newIPQueue[string](s, qpfx+"stepdown"),
accName: accName,
leadc: make(chan bool, 32),
observer: cfg.Observer,
Expand Down Expand Up @@ -868,7 +866,7 @@ func (n *raft) PauseApply() error {

// If we are currently a candidate make sure we step down.
if n.State() == Candidate {
n.stepdown.push(noLeader)
n.stepdownLocked(noLeader)
}

n.debug("Pausing our apply channel")
Expand Down Expand Up @@ -1256,6 +1254,21 @@ func (n *raft) Leader() bool {
return n.State() == Leader
}

// stepdown immediately steps down the Raft node to the
// follower state. This will take the lock itself.
func (n *raft) stepdown(newLeader string) {
n.Lock()
defer n.Unlock()
n.stepdownLocked(newLeader)
}

// stepdownLocked immediately steps down the Raft node to the
// follower state. This requires the lock is already held.
func (n *raft) stepdownLocked(newLeader string) {
n.debug("Stepping down")
n.switchToFollowerLocked(newLeader)
}

// isCatchingUp returns true if a catchup is currently taking place.
func (n *raft) isCatchingUp() bool {
n.RLock()
Expand Down Expand Up @@ -1463,24 +1476,25 @@ func (n *raft) StepDown(preferred ...string) error {
n.vote = noVote
n.writeTermVote()

stepdown := n.stepdown
prop := n.prop
n.Unlock()

if len(preferred) > 0 && maybeLeader == noLeader {
n.debug("Can not transfer to preferred peer %q", preferred[0])
}

// If we have a new leader selected, transfer over to them.
// Send the append entry directly rather than via the proposals queue,
// as we will switch to follower state immediately and will blow away
// the contents of the proposal queue in the process.
if maybeLeader != noLeader {
n.debug("Selected %q for new leader", maybeLeader)
prop.push(newEntry(EntryLeaderTransfer, []byte(maybeLeader)))
} else {
// Force us to stepdown here.
n.debug("Stepping down")
stepdown.push(noLeader)
n.debug("Selected %q for new leader, stepping down due to leadership transfer", maybeLeader)
ae := newEntry(EntryLeaderTransfer, []byte(maybeLeader))
n.sendAppendEntry([]*Entry{ae})
}

// Force us to stepdown here.
n.stepdown(noLeader)

return nil
}

Expand Down Expand Up @@ -1659,7 +1673,7 @@ func (n *raft) shutdown(shouldDelete bool) {
queues := []interface {
unregister()
drain()
}{n.reqs, n.votes, n.prop, n.entry, n.resp, n.apply, n.stepdown}
}{n.reqs, n.votes, n.prop, n.entry, n.resp, n.apply}
for _, q := range queues {
q.drain()
q.unregister()
Expand Down Expand Up @@ -1947,7 +1961,7 @@ func (n *raft) processAppendEntries() {
// runAsFollower is called by run and will block for as long as the node is
// running in the follower state.
func (n *raft) runAsFollower() {
for {
for n.State() == Follower {
elect := n.electTimer()

select {
Expand Down Expand Up @@ -1998,13 +2012,6 @@ func (n *raft) runAsFollower() {
if voteReq, ok := n.reqs.popOne(); ok {
n.processVoteRequest(voteReq)
}
case <-n.stepdown.ch:
// We've received a stepdown request, start following the new leader if
// we can.
if newLeader, ok := n.stepdown.popOne(); ok {
n.switchToFollower(newLeader)
return
}
}
}
}
Expand Down Expand Up @@ -2340,15 +2347,15 @@ func (n *raft) runAsLeader() {
fsub, err := n.subscribe(psubj, n.handleForwardedProposal)
if err != nil {
n.warn("Error subscribing to forwarded proposals: %v", err)
n.stepdown.push(noLeader)
n.stepdownLocked(noLeader)
n.Unlock()
return
}
rpsub, err := n.subscribe(rpsubj, n.handleForwardedRemovePeerProposal)
if err != nil {
n.warn("Error subscribing to forwarded remove peer proposals: %v", err)
n.unsubscribe(fsub)
n.stepdown.push(noLeader)
n.stepdownLocked(noLeader)
n.Unlock()
return
}
Expand Down Expand Up @@ -2395,15 +2402,6 @@ func (n *raft) runAsLeader() {
n.doRemovePeerAsLeader(string(b.Data))
}
entries = append(entries, b)
// If this is us sending out a leadership transfer stepdown inline here.
if b.Type == EntryLeaderTransfer {
// Send out what we have and switch to follower.
n.sendAppendEntry(entries)
n.prop.recycle(&es)
n.debug("Stepping down due to leadership transfer")
n.switchToFollower(noLeader)
return
}
// Increment size.
sz += len(b.Data) + 1
// If below thresholds go ahead and send.
Expand All @@ -2427,7 +2425,7 @@ func (n *raft) runAsLeader() {
}
case <-lq.C:
if n.lostQuorum() {
n.switchToFollower(noLeader)
n.stepdown(noLeader)
return
}
case <-n.votes.ch:
Expand All @@ -2437,7 +2435,7 @@ func (n *raft) runAsLeader() {
continue
}
if vresp.term > n.Term() {
n.switchToFollower(noLeader)
n.stepdown(noLeader)
return
}
n.trackPeer(vresp.peer)
Expand All @@ -2446,11 +2444,6 @@ func (n *raft) runAsLeader() {
if voteReq, ok := n.reqs.popOne(); ok {
n.processVoteRequest(voteReq)
}
case <-n.stepdown.ch:
if newLeader, ok := n.stepdown.popOne(); ok {
n.switchToFollower(newLeader)
return
}
case <-n.entry.ch:
n.processAppendEntries()
}
Expand Down Expand Up @@ -2621,7 +2614,7 @@ func (n *raft) sendSnapshotToFollower(subject string) (uint64, error) {
snap, err := n.loadLastSnapshot()
if err != nil {
// We need to stepdown here when this happens.
n.stepdown.push(noLeader)
n.stepdownLocked(noLeader)
// We need to reset our state here as well.
n.resetWAL()
return 0, err
Expand Down Expand Up @@ -2687,7 +2680,7 @@ func (n *raft) catchupFollower(ar *appendEntryResponse) {
n.warn("Request from follower for entry at index [%d] errored for state %+v - %v", start, state, err)
if err == ErrStoreEOF {
// If we are here we are seeing a request for an item beyond our state, meaning we should stepdown.
n.stepdown.push(noLeader)
n.stepdownLocked(noLeader)
n.Unlock()
arPool.Put(ar)
return
Expand All @@ -2699,7 +2692,7 @@ func (n *raft) catchupFollower(ar *appendEntryResponse) {
// If we are here we are seeing a request for an item we do not have, meaning we should stepdown.
// This is possible on a reset of our WAL but the other side has a snapshot already.
// If we do not stepdown this can cycle.
n.stepdown.push(noLeader)
n.stepdownLocked(noLeader)
n.Unlock()
arPool.Put(ar)
return
Expand Down Expand Up @@ -2752,7 +2745,7 @@ func (n *raft) applyCommit(index uint64) error {
if err != ErrStoreClosed && err != ErrStoreEOF {
n.warn("Got an error loading %d index: %v - will reset", index, err)
if n.State() == Leader {
n.stepdown.push(n.selectNextLeader())
n.stepdownLocked(n.selectNextLeader())
}
// Reset and cancel any catchup.
n.resetWAL()
Expand Down Expand Up @@ -2829,7 +2822,7 @@ func (n *raft) applyCommit(index uint64) error {

// If this is us and we are the leader we should attempt to stepdown.
if peer == n.id && n.State() == Leader {
n.stepdown.push(n.selectNextLeader())
n.stepdown(n.selectNextLeader())
}

// Remove from string intern map.
Expand Down Expand Up @@ -2960,7 +2953,7 @@ func (n *raft) runAsCandidate() {
n.ID(): {},
}

for {
for n.State() == Candidate {
elect := n.electTimer()
select {
case <-n.entry.ch:
Expand Down Expand Up @@ -3003,20 +2996,15 @@ func (n *raft) runAsCandidate() {
n.term = vresp.term
n.vote = noVote
n.writeTermVote()
n.stepdown.push(noLeader)
n.lxfer = false
n.stepdownLocked(noLeader)
n.Unlock()
}
case <-n.reqs.ch:
// Because of drain() it is possible that we get nil from popOne().
if voteReq, ok := n.reqs.popOne(); ok {
n.processVoteRequest(voteReq)
}
case <-n.stepdown.ch:
if newLeader, ok := n.stepdown.popOne(); ok {
n.switchToFollower(newLeader)
return
}
}
}
}
Expand Down Expand Up @@ -3177,7 +3165,7 @@ func (n *raft) processAppendEntry(ae *appendEntry, sub *subscription) {
n.writeTermVote()
}
n.debug("Received append entry from another leader, stepping down to %q", ae.leader)
n.stepdown.push(ae.leader)
n.stepdownLocked(ae.leader)
} else {
// Let them know we are the leader.
ar := newAppendEntryResponse(n.term, n.pindex, n.id, false)
Expand Down Expand Up @@ -3206,7 +3194,7 @@ func (n *raft) processAppendEntry(ae *appendEntry, sub *subscription) {
n.debug("Received append entry in candidate state from %q, converting to follower", ae.leader)
n.term = ae.term
n.writeTermVote()
n.stepdown.push(ae.leader)
n.stepdownLocked(ae.leader)
}
}

Expand Down Expand Up @@ -3269,7 +3257,7 @@ func (n *raft) processAppendEntry(ae *appendEntry, sub *subscription) {
}
if n.State() != Follower {
n.debug("Term higher than ours and we are not a follower: %v, stepping down to %q", n.State(), ae.leader)
n.stepdown.push(ae.leader)
n.stepdownLocked(ae.leader)
}
}

Expand Down Expand Up @@ -3525,7 +3513,7 @@ func (n *raft) processAppendEntryResponse(ar *appendEntryResponse) {
n.vote = noVote
n.writeTermVote()
n.warn("Detected another leader with higher term, will stepdown and reset")
n.stepdown.push(noLeader)
n.stepdownLocked(noLeader)
n.resetWAL()
n.Unlock()
arPool.Put(ar)
Expand Down Expand Up @@ -3573,7 +3561,7 @@ func (n *raft) storeToWAL(ae *appendEntry) error {
if index := ae.pindex + 1; index != seq {
n.warn("Wrong index, ae is %+v, index stored was %d, n.pindex is %d, will reset", ae, seq, n.pindex)
if n.State() == Leader {
n.stepdown.push(n.selectNextLeader())
n.stepdownLocked(n.selectNextLeader())
}
// Reset and cancel any catchup.
n.resetWAL()
Expand Down Expand Up @@ -3973,7 +3961,7 @@ func (n *raft) processVoteRequest(vr *voteRequest) error {
if n.State() != Follower {
n.debug("Stepping down from %s, detected higher term: %d vs %d",
strings.ToLower(n.State().String()), vr.term, n.term)
n.stepdown.push(noLeader)
n.stepdownLocked(noLeader)
n.term = vr.term
}
n.vote = noVote
Expand Down Expand Up @@ -4107,13 +4095,17 @@ const (
)

func (n *raft) switchToFollower(leader string) {
n.Lock()
defer n.Unlock()

n.switchToFollowerLocked(leader)
}

func (n *raft) switchToFollowerLocked(leader string) {
if n.State() == Closed {
return
}

n.Lock()
defer n.Unlock()

n.debug("Switching to follower")

n.lxfer = false
Expand Down
15 changes: 15 additions & 0 deletions server/raft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -588,3 +588,18 @@ func TestNRGLeavesObserverAfterPause(t *testing.T) {
n.ResumeApply()
checkState(false, false)
}

func TestNRGInlineStepdown(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()

rg := c.createMemRaftGroup("TEST", 3, newStateAdder)
rg.waitOnLeader()

// When StepDown() completes, we should not be the leader. Before,
// this would not be guaranteed as the stepdown could be processed
// some time later.
n := rg.leader().node().(*raft)
require_NoError(t, n.StepDown())
require_NotEqual(t, n.State(), Leader)
}

0 comments on commit a61d145

Please sign in to comment.