Skip to content

Commit fec8a68

Browse files
NRG: Invalidate pending append entries cache
Signed-off-by: Maurice van Veen <[email protected]>
1 parent f148459 commit fec8a68

File tree

3 files changed

+45
-3
lines changed

3 files changed

+45
-3
lines changed

server/raft.go

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3562,8 +3562,13 @@ CONTINUE:
35623562
if l > paeWarnThreshold && l%paeWarnModulo == 0 {
35633563
n.warn("%d append entries pending", len(n.pae))
35643564
}
3565-
} else if l%paeWarnModulo == 0 {
3566-
n.debug("Not saving to append entries pending")
3565+
} else {
3566+
// Invalidate cache entry at this index, we might have
3567+
// stored it previously with a different value.
3568+
delete(n.pae, n.pindex)
3569+
if l%paeWarnModulo == 0 {
3570+
n.debug("Not saving to append entries pending")
3571+
}
35673572
}
35683573
} else {
35693574
// This is a replay on startup so just take the appendEntry version.

server/raft_helpers_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -320,7 +320,7 @@ func (a *stateAdder) snapshot(t *testing.T) {
320320
// Helper to wait for a certain state.
321321
func (rg smGroup) waitOnTotal(t *testing.T, expected int64) {
322322
t.Helper()
323-
checkFor(t, 20*time.Second, 200*time.Millisecond, func() error {
323+
checkFor(t, 5*time.Second, 200*time.Millisecond, func() error {
324324
for _, sm := range rg {
325325
asm := sm.(*stateAdder)
326326
if total := asm.total(); total != expected {

server/raft_test.go

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1097,6 +1097,43 @@ func TestNRGTermNoDecreaseAfterWALReset(t *testing.T) {
10971097
}
10981098
}
10991099

1100+
func TestNRGPendingAppendEntryCacheInvalidation(t *testing.T) {
1101+
for _, test := range []struct {
1102+
title string
1103+
entries int
1104+
}{
1105+
{title: "empty", entries: 1},
1106+
{title: "at limit", entries: paeDropThreshold},
1107+
{title: "full", entries: paeDropThreshold + 1},
1108+
} {
1109+
t.Run(test.title, func(t *testing.T) {
1110+
c := createJetStreamClusterExplicit(t, "R3S", 3)
1111+
defer c.shutdown()
1112+
1113+
rg := c.createMemRaftGroup("TEST", 3, newStateAdder)
1114+
rg.waitOnLeader()
1115+
l := rg.leader()
1116+
1117+
l.(*stateAdder).proposeDelta(1)
1118+
rg.waitOnTotal(t, 1)
1119+
1120+
// Fill up the cache with N entries.
1121+
// The contents don't matter as they should never be applied.
1122+
rg.lockAll()
1123+
for _, s := range rg {
1124+
n := s.node().(*raft)
1125+
for i := 0; i < test.entries; i++ {
1126+
n.pae[n.pindex+uint64(1+i)] = newAppendEntry("", 0, 0, 0, 0, nil)
1127+
}
1128+
}
1129+
rg.unlockAll()
1130+
1131+
l.(*stateAdder).proposeDelta(1)
1132+
rg.waitOnTotal(t, 2)
1133+
})
1134+
}
1135+
}
1136+
11001137
func TestNRGCatchupDoesNotTruncateUncommittedEntriesWithQuorum(t *testing.T) {
11011138
n, cleanup := initSingleMemRaftNode(t)
11021139
defer cleanup()

0 commit comments

Comments
 (0)