Skip to content

Commit

Permalink
NRG (2.11): Ensure proposal and AE response queues drain after stepdo…
Browse files Browse the repository at this point in the history
…wn (#5666)

This ensures that when a Raft node steps down, any items that remain in
the proposal or append entry response queues are correctly dropped.
Otherwise the group might end up in an inconsistent state if that node
becomes leader again.

Signed-off-by: Neil Twigg <[email protected]>
  • Loading branch information
derekcollison authored Jul 22, 2024
2 parents 48cce69 + 98c2891 commit a14c364
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 6 deletions.
18 changes: 12 additions & 6 deletions server/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -1989,9 +1989,11 @@ func (n *raft) runAsFollower() {
n.debug("Ignoring old vote response, we have stepped down")
n.votes.popOne()
case <-n.resp.ch:
// We're receiving append entry responses from the network, probably because
// we have only just stepped down and they were already in flight. Ignore them.
n.resp.popOne()
// Ignore append entry responses received from before the state change.
n.resp.drain()
case <-n.prop.ch:
// Ignore proposals received from before the state change.
n.prop.drain()
case <-n.reqs.ch:
// We've just received a vote request from the network.
// Because of drain() it is possible that we get nil from popOne().
Expand Down Expand Up @@ -2966,8 +2968,11 @@ func (n *raft) runAsCandidate() {
case <-n.entry.ch:
n.processAppendEntries()
case <-n.resp.ch:
// Ignore
n.resp.popOne()
// Ignore append entry responses received from before the state change.
n.resp.drain()
case <-n.prop.ch:
// Ignore proposals received from before the state change.
n.prop.drain()
case <-n.s.quitCh:
n.shutdown(false)
return
Expand Down Expand Up @@ -4093,8 +4098,9 @@ func (n *raft) switchState(state RaftState) {

if pstate == Leader && state != Leader {
n.updateLeadChange(false)
// Drain the response queue.
// Drain the append entry response and proposal queues.
n.resp.drain()
n.prop.drain()
} else if state == Leader && pstate != Leader {
if len(n.pae) > 0 {
n.pae = make(map[uint64]*appendEntry)
Expand Down
22 changes: 22 additions & 0 deletions server/raft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -342,6 +342,28 @@ func TestNRGSimpleElection(t *testing.T) {
}
}

func TestNRGSwitchStateClearsQueues(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()

rg := c.createMemRaftGroup("TEST", 3, newStateAdder)
rg.waitOnLeader()

sa := rg.leader().(*stateAdder)
n := sa.node().(*raft)

for i := 0; i < 10_000; i++ {
sa.proposeDelta(1)
}

n.Lock()
defer n.Unlock()

n.switchState(Follower)
require_Equal(t, n.prop.len(), 0)
require_Equal(t, n.resp.len(), 0)
}

func TestNRGStepDownOnSameTermDoesntClearVote(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()
Expand Down

0 comments on commit a14c364

Please sign in to comment.