Skip to content

Commit

Permalink
NRG: Ensure proposal and AE response queues drain after stepdown
Browse files Browse the repository at this point in the history
Signed-off-by: Neil Twigg <[email protected]>
  • Loading branch information
neilalexander committed Jul 19, 2024
1 parent 6c33673 commit 98c2891
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 6 deletions.
18 changes: 12 additions & 6 deletions server/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -1989,9 +1989,11 @@ func (n *raft) runAsFollower() {
n.debug("Ignoring old vote response, we have stepped down")
n.votes.popOne()
case <-n.resp.ch:
// We're receiving append entry responses from the network, probably because
// we have only just stepped down and they were already in flight. Ignore them.
n.resp.popOne()
// Ignore append entry responses received from before the state change.
n.resp.drain()
case <-n.prop.ch:
// Ignore proposals received from before the state change.
n.prop.drain()
case <-n.reqs.ch:
// We've just received a vote request from the network.
// Because of drain() it is possible that we get nil from popOne().
Expand Down Expand Up @@ -2966,8 +2968,11 @@ func (n *raft) runAsCandidate() {
case <-n.entry.ch:
n.processAppendEntries()
case <-n.resp.ch:
// Ignore
n.resp.popOne()
// Ignore append entry responses received from before the state change.
n.resp.drain()
case <-n.prop.ch:
// Ignore proposals received from before the state change.
n.prop.drain()
case <-n.s.quitCh:
n.shutdown(false)
return
Expand Down Expand Up @@ -4089,8 +4094,9 @@ func (n *raft) switchState(state RaftState) {

if pstate == Leader && state != Leader {
n.updateLeadChange(false)
// Drain the response queue.
// Drain the append entry response and proposal queues.
n.resp.drain()
n.prop.drain()
} else if state == Leader && pstate != Leader {
if len(n.pae) > 0 {
n.pae = make(map[uint64]*appendEntry)
Expand Down
22 changes: 22 additions & 0 deletions server/raft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -295,6 +295,28 @@ func TestNRGSimpleElection(t *testing.T) {
}
}

func TestNRGSwitchStateClearsQueues(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()

rg := c.createMemRaftGroup("TEST", 3, newStateAdder)
rg.waitOnLeader()

sa := rg.leader().(*stateAdder)
n := sa.node().(*raft)

for i := 0; i < 10_000; i++ {
sa.proposeDelta(1)
}

n.Lock()
defer n.Unlock()

n.switchState(Follower)
require_Equal(t, n.prop.len(), 0)
require_Equal(t, n.resp.len(), 0)
}

func TestNRGStepDownOnSameTermDoesntClearVote(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()
Expand Down

0 comments on commit 98c2891

Please sign in to comment.