From 98c2891b0907e41ab671d392e24ed19e5a512308 Mon Sep 17 00:00:00 2001 From: Neil Twigg Date: Wed, 17 Jul 2024 12:23:53 +0100 Subject: [PATCH] NRG: Ensure proposal and AE response queues drain after stepdown Signed-off-by: Neil Twigg --- server/raft.go | 18 ++++++++++++------ server/raft_test.go | 22 ++++++++++++++++++++++ 2 files changed, 34 insertions(+), 6 deletions(-) diff --git a/server/raft.go b/server/raft.go index 9c6e5bbb9f0..009ee5c9a0e 100644 --- a/server/raft.go +++ b/server/raft.go @@ -1989,9 +1989,11 @@ func (n *raft) runAsFollower() { n.debug("Ignoring old vote response, we have stepped down") n.votes.popOne() case <-n.resp.ch: - // We're receiving append entry responses from the network, probably because - // we have only just stepped down and they were already in flight. Ignore them. - n.resp.popOne() + // Ignore append entry responses received from before the state change. + n.resp.drain() + case <-n.prop.ch: + // Ignore proposals received from before the state change. + n.prop.drain() case <-n.reqs.ch: // We've just received a vote request from the network. // Because of drain() it is possible that we get nil from popOne(). @@ -2966,8 +2968,11 @@ func (n *raft) runAsCandidate() { case <-n.entry.ch: n.processAppendEntries() case <-n.resp.ch: - // Ignore - n.resp.popOne() + // Ignore append entry responses received from before the state change. + n.resp.drain() + case <-n.prop.ch: + // Ignore proposals received from before the state change. + n.prop.drain() case <-n.s.quitCh: n.shutdown(false) return @@ -4089,8 +4094,9 @@ func (n *raft) switchState(state RaftState) { if pstate == Leader && state != Leader { n.updateLeadChange(false) - // Drain the response queue. + // Drain the append entry response and proposal queues. n.resp.drain() + n.prop.drain() } else if state == Leader && pstate != Leader { if len(n.pae) > 0 { n.pae = make(map[uint64]*appendEntry) diff --git a/server/raft_test.go b/server/raft_test.go index beb15d63480..b28fe8b680d 100644 --- a/server/raft_test.go +++ b/server/raft_test.go @@ -295,6 +295,28 @@ func TestNRGSimpleElection(t *testing.T) { } } +func TestNRGSwitchStateClearsQueues(t *testing.T) { + c := createJetStreamClusterExplicit(t, "R3S", 3) + defer c.shutdown() + + rg := c.createMemRaftGroup("TEST", 3, newStateAdder) + rg.waitOnLeader() + + sa := rg.leader().(*stateAdder) + n := sa.node().(*raft) + + for i := 0; i < 10_000; i++ { + sa.proposeDelta(1) + } + + n.Lock() + defer n.Unlock() + + n.switchState(Follower) + require_Equal(t, n.prop.len(), 0) + require_Equal(t, n.resp.len(), 0) +} + func TestNRGStepDownOnSameTermDoesntClearVote(t *testing.T) { c := createJetStreamClusterExplicit(t, "R3S", 3) defer c.shutdown()