Skip to content

Commit 9e3464c

Browse files
NRG (2.11): Heartbeat can establish quorum & reset acks during stepdown (#6512)
If the leader changed, a heartbeat would not be able to signal there's quorum on messages. This is because the leader only tracked quorum for entries it sent out itself AND were stored in its own log. Heartbeats are sent by the leader but are not stored in its log, which means they can't be used to track quorum. To solve this we don't need to track the leader in `n.acks`, which would not get populated after a leader change. Instead we only track the replicas that signal success and we always count ourselves anyway. This ensures that heartbeats, although not stored, can still signal quorum properly. Also, acks from previous terms would not be reset during stepdown. Which meant that successful acks from previous terms could be combined with acks from newer terms, making the leader believe it has quorum whereas it didn't properly check all of those acks came from the same term. Signed-off-by: Maurice van Veen <[email protected]>
2 parents 69ff59e + 6d8b950 commit 9e3464c

File tree

2 files changed

+141
-12
lines changed

2 files changed

+141
-12
lines changed

server/raft.go

Lines changed: 19 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -3020,18 +3020,23 @@ func (n *raft) trackResponse(ar *appendEntryResponse) {
30203020
// See if we have items to apply.
30213021
var sendHB bool
30223022

3023-
if results := n.acks[ar.index]; results != nil {
3024-
results[ar.peer] = struct{}{}
3025-
if nr := len(results); nr >= n.qn {
3026-
// We have a quorum.
3027-
for index := n.commit + 1; index <= ar.index; index++ {
3028-
if err := n.applyCommit(index); err != nil && err != errNodeClosed {
3029-
n.error("Got an error applying commit for %d: %v", index, err)
3030-
break
3031-
}
3023+
results := n.acks[ar.index]
3024+
if results == nil {
3025+
results = make(map[string]struct{})
3026+
n.acks[ar.index] = results
3027+
}
3028+
results[ar.peer] = struct{}{}
3029+
3030+
// We don't count ourselves to account for leader changes, so add 1.
3031+
if nr := len(results); nr+1 >= n.qn {
3032+
// We have a quorum.
3033+
for index := n.commit + 1; index <= ar.index; index++ {
3034+
if err := n.applyCommit(index); err != nil && err != errNodeClosed {
3035+
n.error("Got an error applying commit for %d: %v", index, err)
3036+
break
30323037
}
3033-
sendHB = n.prop.len() == 0
30343038
}
3039+
sendHB = n.prop.len() == 0
30353040
}
30363041
n.Unlock()
30373042

@@ -3765,8 +3770,6 @@ func (n *raft) sendAppendEntry(entries []*Entry) {
37653770
if err := n.storeToWAL(ae); err != nil {
37663771
return
37673772
}
3768-
// We count ourselves.
3769-
n.acks[n.pindex] = map[string]struct{}{n.id: {}}
37703773
n.active = time.Now()
37713774

37723775
// Save in memory for faster processing during applyCommit.
@@ -4281,6 +4284,10 @@ func (n *raft) switchToFollowerLocked(leader string) {
42814284

42824285
n.aflr = 0
42834286
n.lxfer = false
4287+
// Reset acks, we can't assume acks from a previous term are still valid in another term.
4288+
if len(n.acks) > 0 {
4289+
n.acks = make(map[uint64]map[string]struct{})
4290+
}
42844291
n.updateLeader(leader)
42854292
n.switchState(Follower)
42864293
}

server/raft_test.go

Lines changed: 122 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2000,6 +2000,128 @@ func TestNRGHealthCheckWaitForPendingCommitsWhenPaused(t *testing.T) {
20002000
require_True(t, n.Healthy())
20012001
}
20022002

2003+
func TestNRGHeartbeatCanEstablishQuorumAfterLeaderChange(t *testing.T) {
2004+
n, cleanup := initSingleMemRaftNode(t)
2005+
defer cleanup()
2006+
2007+
// Create a sample entry, the content doesn't matter, just that it's stored.
2008+
esm := encodeStreamMsgAllowCompress("foo", "_INBOX.foo", nil, nil, 0, 0, true)
2009+
entries := []*Entry{newEntry(EntryNormal, esm)}
2010+
2011+
nats0 := "S1Nunr6R" // "nats-0"
2012+
2013+
// Timeline
2014+
aeMsg := encode(t, &appendEntry{leader: nats0, term: 1, commit: 0, pterm: 0, pindex: 0, entries: entries})
2015+
aeHeartbeatResponse := &appendEntryResponse{term: 1, index: 1, peer: nats0, success: true}
2016+
2017+
// Process first message.
2018+
n.processAppendEntry(aeMsg, n.aesub)
2019+
require_Equal(t, n.pindex, 1)
2020+
require_Equal(t, n.aflr, 0)
2021+
2022+
// Simulate becoming leader, and not knowing if the stored entry has quorum and can be committed.
2023+
// Switching to leader should send a heartbeat.
2024+
n.switchToLeader()
2025+
require_Equal(t, n.aflr, 1)
2026+
require_Equal(t, n.commit, 0)
2027+
2028+
// We simulate receiving the successful heartbeat response here. It should move the commit up.
2029+
n.processAppendEntryResponse(aeHeartbeatResponse)
2030+
require_Equal(t, n.commit, 1)
2031+
require_Equal(t, n.aflr, 1)
2032+
2033+
// Once the entry is applied, it should reset the applied floor.
2034+
n.Applied(1)
2035+
require_Equal(t, n.aflr, 0)
2036+
}
2037+
2038+
func TestNRGQuorumAccounting(t *testing.T) {
2039+
n, cleanup := initSingleMemRaftNode(t)
2040+
defer cleanup()
2041+
2042+
// Create a sample entry, the content doesn't matter, just that it's stored.
2043+
esm := encodeStreamMsgAllowCompress("foo", "_INBOX.foo", nil, nil, 0, 0, true)
2044+
entries := []*Entry{newEntry(EntryNormal, esm)}
2045+
2046+
nats1 := "yrzKKRBu" // "nats-1"
2047+
nats2 := "cnrtt3eg" // "nats-2"
2048+
2049+
// Timeline
2050+
aeHeartbeat1Response := &appendEntryResponse{term: 1, index: 1, peer: nats1, success: true}
2051+
aeHeartbeat2Response := &appendEntryResponse{term: 1, index: 1, peer: nats2, success: true}
2052+
2053+
// Adjust cluster size, so we need at least 2 responses from other servers to establish quorum.
2054+
require_NoError(t, n.AdjustBootClusterSize(5))
2055+
require_Equal(t, n.csz, 5)
2056+
require_Equal(t, n.qn, 3)
2057+
2058+
// Switch this node to leader, and send an entry.
2059+
n.switchToLeader()
2060+
require_Equal(t, n.pindex, 0)
2061+
n.sendAppendEntry(entries)
2062+
require_Equal(t, n.pindex, 1)
2063+
2064+
// The first response MUST NOT indicate quorum has been reached.
2065+
n.processAppendEntryResponse(aeHeartbeat1Response)
2066+
require_Equal(t, n.commit, 0)
2067+
2068+
// The second response means we have reached quorum and can move commit up.
2069+
n.processAppendEntryResponse(aeHeartbeat2Response)
2070+
require_Equal(t, n.commit, 1)
2071+
}
2072+
2073+
func TestNRGRevalidateQuorumAfterLeaderChange(t *testing.T) {
2074+
n, cleanup := initSingleMemRaftNode(t)
2075+
defer cleanup()
2076+
2077+
// Create a sample entry, the content doesn't matter, just that it's stored.
2078+
esm := encodeStreamMsgAllowCompress("foo", "_INBOX.foo", nil, nil, 0, 0, true)
2079+
entries := []*Entry{newEntry(EntryNormal, esm)}
2080+
2081+
nats1 := "yrzKKRBu" // "nats-1"
2082+
nats2 := "cnrtt3eg" // "nats-2"
2083+
2084+
// Timeline
2085+
aeHeartbeat1Response := &appendEntryResponse{term: 1, index: 1, peer: nats1, success: true}
2086+
aeHeartbeat2Response := &appendEntryResponse{term: 1, index: 1, peer: nats2, success: true}
2087+
2088+
// Adjust cluster size, so we need at least 2 responses from other servers to establish quorum.
2089+
require_NoError(t, n.AdjustBootClusterSize(5))
2090+
require_Equal(t, n.csz, 5)
2091+
require_Equal(t, n.qn, 3)
2092+
2093+
// Switch this node to leader, and send an entry.
2094+
n.term++
2095+
n.switchToLeader()
2096+
require_Equal(t, n.term, 1)
2097+
require_Equal(t, n.pindex, 0)
2098+
n.sendAppendEntry(entries)
2099+
require_Equal(t, n.pindex, 1)
2100+
2101+
// We have one server that signals the message was stored. The leader will add 1 to the acks count.
2102+
n.processAppendEntryResponse(aeHeartbeat1Response)
2103+
require_Equal(t, n.commit, 0)
2104+
require_Len(t, len(n.acks), 1)
2105+
2106+
// We stepdown now and don't know if we will have quorum on the first entry.
2107+
n.stepdown(noLeader)
2108+
2109+
// Let's assume there are a bunch of leader elections now, data being added to the log, being truncated, etc.
2110+
// We don't know what happened, maybe we were partitioned, but we can't know for sure if the first entry has quorum.
2111+
2112+
// We now become leader again.
2113+
n.term = 6
2114+
n.switchToLeader()
2115+
require_Equal(t, n.term, 6)
2116+
2117+
// We now receive a successful response from another server saying they have stored it.
2118+
// Anything can have happened to the replica that said success before, we can't assume that's still valid.
2119+
// So our commit must stay the same and we restart counting for quorum.
2120+
n.processAppendEntryResponse(aeHeartbeat2Response)
2121+
require_Equal(t, n.commit, 0)
2122+
require_Len(t, len(n.acks), 1)
2123+
}
2124+
20032125
// This is a RaftChainOfBlocks test where a block is proposed and then we wait for all replicas to apply it before
20042126
// proposing the next one.
20052127
// The test may fail if:

0 commit comments

Comments
 (0)