Skip to content

Commit 3c575bf

Browse files
NRG: Invalidate pending append entries cache (#6513)
The `n.pae` is an in-memory cache of pending but not yet applied entries. When applying commits we can pull from this cache so we don't need to pull them from disk for example. However, the cache has a bounded size. So if the cache would be fully filled and we'd store a different entry at an index that was cached, we'd apply the wrong (cached) entry. If we get an entry that we can't cache because it's full, we can simply drop the entry from the cache if it exists. If an entry at this index doesn't exist it's a noop, but if it did exist then it clears up room in the cache for the next entries to be stored. Signed-off-by: Maurice van Veen <[email protected]>
2 parents 9e3464c + fec8a68 commit 3c575bf

File tree

3 files changed

+45
-3
lines changed

3 files changed

+45
-3
lines changed

server/raft.go

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3567,8 +3567,13 @@ CONTINUE:
35673567
if l > paeWarnThreshold && l%paeWarnModulo == 0 {
35683568
n.warn("%d append entries pending", len(n.pae))
35693569
}
3570-
} else if l%paeWarnModulo == 0 {
3571-
n.debug("Not saving to append entries pending")
3570+
} else {
3571+
// Invalidate cache entry at this index, we might have
3572+
// stored it previously with a different value.
3573+
delete(n.pae, n.pindex)
3574+
if l%paeWarnModulo == 0 {
3575+
n.debug("Not saving to append entries pending")
3576+
}
35723577
}
35733578
} else {
35743579
// This is a replay on startup so just take the appendEntry version.

server/raft_helpers_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -320,7 +320,7 @@ func (a *stateAdder) snapshot(t *testing.T) {
320320
// Helper to wait for a certain state.
321321
func (rg smGroup) waitOnTotal(t *testing.T, expected int64) {
322322
t.Helper()
323-
checkFor(t, 20*time.Second, 200*time.Millisecond, func() error {
323+
checkFor(t, 5*time.Second, 200*time.Millisecond, func() error {
324324
for _, sm := range rg {
325325
asm := sm.(*stateAdder)
326326
if total := asm.total(); total != expected {

server/raft_test.go

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1151,6 +1151,43 @@ func TestNRGTermNoDecreaseAfterWALReset(t *testing.T) {
11511151
}
11521152
}
11531153

1154+
func TestNRGPendingAppendEntryCacheInvalidation(t *testing.T) {
1155+
for _, test := range []struct {
1156+
title string
1157+
entries int
1158+
}{
1159+
{title: "empty", entries: 1},
1160+
{title: "at limit", entries: paeDropThreshold},
1161+
{title: "full", entries: paeDropThreshold + 1},
1162+
} {
1163+
t.Run(test.title, func(t *testing.T) {
1164+
c := createJetStreamClusterExplicit(t, "R3S", 3)
1165+
defer c.shutdown()
1166+
1167+
rg := c.createMemRaftGroup("TEST", 3, newStateAdder)
1168+
rg.waitOnLeader()
1169+
l := rg.leader()
1170+
1171+
l.(*stateAdder).proposeDelta(1)
1172+
rg.waitOnTotal(t, 1)
1173+
1174+
// Fill up the cache with N entries.
1175+
// The contents don't matter as they should never be applied.
1176+
rg.lockAll()
1177+
for _, s := range rg {
1178+
n := s.node().(*raft)
1179+
for i := 0; i < test.entries; i++ {
1180+
n.pae[n.pindex+uint64(1+i)] = newAppendEntry("", 0, 0, 0, 0, nil)
1181+
}
1182+
}
1183+
rg.unlockAll()
1184+
1185+
l.(*stateAdder).proposeDelta(1)
1186+
rg.waitOnTotal(t, 2)
1187+
})
1188+
}
1189+
}
1190+
11541191
func TestNRGCatchupDoesNotTruncateUncommittedEntriesWithQuorum(t *testing.T) {
11551192
n, cleanup := initSingleMemRaftNode(t)
11561193
defer cleanup()

0 commit comments

Comments
 (0)