Skip to content

Commit

Permalink
NRG: Ignore AEs from older terms
Browse files Browse the repository at this point in the history
Co-authored-by: Reuben Ninan <[email protected]>
Signed-off-by: Neil Twigg <[email protected]>
  • Loading branch information
neilalexander and ReubenMathew committed Jul 16, 2024
1 parent b7a3df8 commit 74932a7
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 0 deletions.
4 changes: 4 additions & 0 deletions server/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -3271,6 +3271,10 @@ func (n *raft) processAppendEntry(ae *appendEntry, sub *subscription) {
n.debug("Term higher than ours and we are not a follower: %v, stepping down to %q", n.State(), ae.leader)
n.stepdown.push(ae.leader)
}
} else if ae.term < n.term && !catchingUp {
n.debug("Ignoring AppendEntry from a leader (%s) with term %d which is less than ours", ae.leader, ae.term)
n.Unlock()
return
}

if isNew && n.leader != ae.leader && n.State() == Follower {
Expand Down
47 changes: 47 additions & 0 deletions server/raft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -588,3 +588,50 @@ func TestNRGLeavesObserverAfterPause(t *testing.T) {
n.ResumeApply()
checkState(false, false)
}

func TestNRGAEFromOldLeader(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()

nc, _ := jsClientConnect(t, c.leader(), nats.UserInfo("admin", "s3cr3t!"))
defer nc.Close()

rg := c.createMemRaftGroup("TEST", 3, newStateAdder)
rg.waitOnLeader()

// Listen out for catchup requests.
ch := make(chan *nats.Msg, 16)
_, err := nc.ChanSubscribe(fmt.Sprintf(raftCatchupReply, ">"), ch)
require_NoError(t, err)

// Start next term so that we can reuse term 1 in the next step.
leader := rg.leader().node().(*raft)
leader.StepDown()
time.Sleep(time.Millisecond * 100)
rg.waitOnLeader()
require_Equal(t, leader.Term(), 2)
leader = rg.leader().node().(*raft)

// Send an append entry with an outdated term. Beforehand, doing
// so would have caused a WAL reset and then would have triggered
// a Raft-level catchup.
ae := &appendEntry{
term: 1,
pindex: 0,
leader: leader.id,
reply: nc.NewRespInbox(),
}
payload, err := ae.encode(nil)
require_NoError(t, err)
resp, err := nc.Request(leader.asubj, payload, time.Second)
require_NoError(t, err)

// Wait for the response, the server should have rejected it.
ar := leader.decodeAppendEntryResponse(resp.Data)
require_NotNil(t, ar)
require_Equal(t, ar.success, false)

// No catchup should happen at this point because no reset should
// have happened.
require_NoChanRead(t, ch, time.Second*2)
}

0 comments on commit 74932a7

Please sign in to comment.