Skip to content

Commit 6d8b950

Browse files
NRG: Reset acks from previous term when stepping down
Signed-off-by: Maurice van Veen <[email protected]>
1 parent 703dd8a commit 6d8b950

File tree

2 files changed

+56
-0
lines changed

2 files changed

+56
-0
lines changed

server/raft.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4285,6 +4285,10 @@ func (n *raft) switchToFollowerLocked(leader string) {
42854285

42864286
n.aflr = 0
42874287
n.lxfer = false
4288+
// Reset acks, we can't assume acks from a previous term are still valid in another term.
4289+
if len(n.acks) > 0 {
4290+
n.acks = make(map[uint64]map[string]struct{})
4291+
}
42884292
n.updateLeader(leader)
42894293
n.switchState(Follower)
42904294
}

server/raft_test.go

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2016,6 +2016,58 @@ func TestNRGQuorumAccounting(t *testing.T) {
20162016
require_Equal(t, n.commit, 1)
20172017
}
20182018

2019+
func TestNRGRevalidateQuorumAfterLeaderChange(t *testing.T) {
2020+
n, cleanup := initSingleMemRaftNode(t)
2021+
defer cleanup()
2022+
2023+
// Create a sample entry, the content doesn't matter, just that it's stored.
2024+
esm := encodeStreamMsgAllowCompress("foo", "_INBOX.foo", nil, nil, 0, 0, true)
2025+
entries := []*Entry{newEntry(EntryNormal, esm)}
2026+
2027+
nats1 := "yrzKKRBu" // "nats-1"
2028+
nats2 := "cnrtt3eg" // "nats-2"
2029+
2030+
// Timeline
2031+
aeHeartbeat1Response := &appendEntryResponse{term: 1, index: 1, peer: nats1, success: true}
2032+
aeHeartbeat2Response := &appendEntryResponse{term: 1, index: 1, peer: nats2, success: true}
2033+
2034+
// Adjust cluster size, so we need at least 2 responses from other servers to establish quorum.
2035+
require_NoError(t, n.AdjustBootClusterSize(5))
2036+
require_Equal(t, n.csz, 5)
2037+
require_Equal(t, n.qn, 3)
2038+
2039+
// Switch this node to leader, and send an entry.
2040+
n.term++
2041+
n.switchToLeader()
2042+
require_Equal(t, n.term, 1)
2043+
require_Equal(t, n.pindex, 0)
2044+
n.sendAppendEntry(entries)
2045+
require_Equal(t, n.pindex, 1)
2046+
2047+
// We have one server that signals the message was stored. The leader will add 1 to the acks count.
2048+
n.processAppendEntryResponse(aeHeartbeat1Response)
2049+
require_Equal(t, n.commit, 0)
2050+
require_Len(t, len(n.acks), 1)
2051+
2052+
// We stepdown now and don't know if we will have quorum on the first entry.
2053+
n.stepdown(noLeader)
2054+
2055+
// Let's assume there are a bunch of leader elections now, data being added to the log, being truncated, etc.
2056+
// We don't know what happened, maybe we were partitioned, but we can't know for sure if the first entry has quorum.
2057+
2058+
// We now become leader again.
2059+
n.term = 6
2060+
n.switchToLeader()
2061+
require_Equal(t, n.term, 6)
2062+
2063+
// We now receive a successful response from another server saying they have stored it.
2064+
// Anything can have happened to the replica that said success before, we can't assume that's still valid.
2065+
// So our commit must stay the same and we restart counting for quorum.
2066+
n.processAppendEntryResponse(aeHeartbeat2Response)
2067+
require_Equal(t, n.commit, 0)
2068+
require_Len(t, len(n.acks), 1)
2069+
}
2070+
20192071
// This is a RaftChainOfBlocks test where a block is proposed and then we wait for all replicas to apply it before
20202072
// proposing the next one.
20212073
// The test may fail if:

0 commit comments

Comments
 (0)