Skip to content

Commit c8eb521

Browse files
NRG (2.11): Fix term handling in candidate state and use higher term from vote request (#5671)
A candidate could incorrectly revert to an older term without resetting if an old AE arrived with a term that is at least newer than the pterm but not necessarily newer than the term. Additionally, we also weren't handling the case that the rest of the cluster should assume the higher term number from the vote requests once an isolated candidate node rejoins the cluster. This PR also rewrites `TestNRGCandidateStepsDownAfterAE` and replaces it with `TestNRGAssumeHighTermAfterCandidateIsolation` as the old test was checking for flawed behaviour: it was relying on the term being able to go backwards after a period of isolation, which is not correct. Instead we should prove that the rest of the cluster assumes the higher term from the isolated node. Co-authored-by: Reuben Ninan <[email protected]> Signed-off-by: Neil Twigg <[email protected]>
2 parents fee0522 + dcbe0c8 commit c8eb521

File tree

2 files changed

+70
-25
lines changed

2 files changed

+70
-25
lines changed

server/raft.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3201,11 +3201,11 @@ func (n *raft) processAppendEntry(ae *appendEntry, sub *subscription) {
32013201
// If the append entry term is newer than the current term, erase our
32023202
// vote.
32033203
if ae.term > n.term {
3204+
n.term = ae.term
32043205
n.vote = noVote
3206+
n.writeTermVote()
32053207
}
32063208
n.debug("Received append entry in candidate state from %q, converting to follower", ae.leader)
3207-
n.term = ae.term
3208-
n.writeTermVote()
32093209
n.stepdown.push(ae.leader)
32103210
}
32113211
}
@@ -3978,8 +3978,8 @@ func (n *raft) processVoteRequest(vr *voteRequest) error {
39783978
n.debug("Stepping down from %s, detected higher term: %d vs %d",
39793979
strings.ToLower(n.State().String()), vr.term, n.term)
39803980
n.stepdown.push(noLeader)
3981-
n.term = vr.term
39823981
}
3982+
n.term = vr.term
39833983
n.vote = noVote
39843984
n.writeTermVote()
39853985
}

server/raft_test.go

Lines changed: 67 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -477,7 +477,7 @@ func TestNRGInvalidTAVDoesntPanic(t *testing.T) {
477477
c.waitOnAllCurrent()
478478
}
479479

480-
func TestNRGCandidateStepsDownAfterAE(t *testing.T) {
480+
func TestNRGAssumeHighTermAfterCandidateIsolation(t *testing.T) {
481481
c := createJetStreamClusterExplicit(t, "R3S", 3)
482482
defer c.shutdown()
483483
c.waitOnLeader()
@@ -488,32 +488,38 @@ func TestNRGCandidateStepsDownAfterAE(t *testing.T) {
488488
rg := c.createRaftGroup("TEST", 3, newStateAdder)
489489
rg.waitOnLeader()
490490

491-
// Pick a random follower node. Bump the term up by a considerable
491+
// Bump the term up on one of the follower nodes by a considerable
492492
// amount and force it into the candidate state. This is what happens
493493
// after a period of time in isolation.
494-
n := rg.nonLeader().node().(*raft)
495-
n.Lock()
496-
n.term += 100
497-
n.switchState(Candidate)
498-
n.Unlock()
494+
follower := rg.nonLeader().node().(*raft)
495+
follower.Lock()
496+
follower.term += 100
497+
follower.switchState(Candidate)
498+
follower.Unlock()
499499

500-
// Have the leader push through something on the current term just
501-
// for good measure, although the heartbeats probably work too.
500+
follower.requestVote()
501+
time.Sleep(time.Millisecond * 100)
502+
503+
// The candidate will shortly send a vote request. When that happens,
504+
// the rest of the nodes in the cluster should move up to that term,
505+
// even though they will not grant the vote.
506+
nterm := follower.term
507+
for _, n := range rg {
508+
require_Equal(t, n.node().Term(), nterm)
509+
}
510+
511+
// Have the leader send out a proposal, which will force the candidate
512+
// back into follower state.
513+
rg.waitOnLeader()
502514
rg.leader().(*stateAdder).proposeDelta(1)
515+
rg.waitOnTotal(t, 1)
503516

504-
// Wait for the leader to receive the next append entry from the
505-
// current leader. What should happen is that the node steps down
506-
// and starts following the leader, as nothing in the log of the
507-
// follower is newer than the term of the leader.
508-
checkFor(t, time.Second, 50*time.Millisecond, func() error {
509-
if n.State() == Candidate {
510-
return fmt.Errorf("shouldn't still be candidate state")
511-
}
512-
if nterm, lterm := n.Term(), rg.leader().node().Term(); nterm != lterm {
513-
return fmt.Errorf("follower term %d should match leader term %d", nterm, lterm)
514-
}
515-
return nil
516-
})
517+
// The candidate should have switched to a follower on a term equal to
518+
// or newer than the candidate had.
519+
for _, n := range rg {
520+
require_NotEqual(t, n.node().State(), Candidate)
521+
require_True(t, n.node().Term() >= nterm)
522+
}
517523
}
518524

519525
// Test to make sure this does not cause us to truncate our wal or enter catchup state.
@@ -635,3 +641,42 @@ func TestNRGLeavesObserverAfterPause(t *testing.T) {
635641
n.ResumeApply()
636642
checkState(false, false)
637643
}
644+
645+
func TestNRGCandidateDoesntRevertTermAfterOldAE(t *testing.T) {
646+
c := createJetStreamClusterExplicit(t, "R3S", 3)
647+
defer c.shutdown()
648+
649+
rg := c.createMemRaftGroup("TEST", 3, newStateAdder)
650+
rg.waitOnLeader()
651+
652+
// Bump the term up a few times.
653+
for i := 0; i < 3; i++ {
654+
rg.leader().node().StepDown()
655+
time.Sleep(time.Millisecond * 50) // Needed because stepdowns not synchronous
656+
rg.waitOnLeader()
657+
}
658+
659+
leader := rg.leader().node().(*raft)
660+
follower := rg.nonLeader().node().(*raft)
661+
662+
// Sanity check that we are where we expect to be.
663+
require_Equal(t, leader.term, 4)
664+
require_Equal(t, follower.term, 4)
665+
666+
// At this point the active term is 4 and pterm is 4, force the
667+
// term up to 9. This won't bump the pterm.
668+
rg.lockAll()
669+
for _, n := range rg {
670+
n.node().(*raft).term += 5
671+
}
672+
rg.unlockAll()
673+
674+
// Build an AE that has a term newer than the pterm but older than
675+
// the term. Give it to the follower in candidate state.
676+
ae := newAppendEntry(leader.id, 6, leader.commit, leader.pterm, leader.pindex, nil)
677+
follower.switchToCandidate()
678+
follower.processAppendEntry(ae, nil)
679+
680+
// The candidate must not have reverted back to term 6.
681+
require_NotEqual(t, follower.term, 6)
682+
}

0 commit comments

Comments
 (0)