diff --git a/server/raft.go b/server/raft.go index 5d49b815df..4fbc99a705 100644 --- a/server/raft.go +++ b/server/raft.go @@ -3562,8 +3562,13 @@ CONTINUE: if l > paeWarnThreshold && l%paeWarnModulo == 0 { n.warn("%d append entries pending", len(n.pae)) } - } else if l%paeWarnModulo == 0 { - n.debug("Not saving to append entries pending") + } else { + // Invalidate cache entry at this index, we might have + // stored it previously with a different value. + delete(n.pae, n.pindex) + if l%paeWarnModulo == 0 { + n.debug("Not saving to append entries pending") + } } } else { // This is a replay on startup so just take the appendEntry version. diff --git a/server/raft_helpers_test.go b/server/raft_helpers_test.go index 4807efee28..2a1c541227 100644 --- a/server/raft_helpers_test.go +++ b/server/raft_helpers_test.go @@ -320,7 +320,7 @@ func (a *stateAdder) snapshot(t *testing.T) { // Helper to wait for a certain state. func (rg smGroup) waitOnTotal(t *testing.T, expected int64) { t.Helper() - checkFor(t, 20*time.Second, 200*time.Millisecond, func() error { + checkFor(t, 5*time.Second, 200*time.Millisecond, func() error { for _, sm := range rg { asm := sm.(*stateAdder) if total := asm.total(); total != expected { diff --git a/server/raft_test.go b/server/raft_test.go index 066f34f8a1..1658a3411b 100644 --- a/server/raft_test.go +++ b/server/raft_test.go @@ -1097,6 +1097,43 @@ func TestNRGTermNoDecreaseAfterWALReset(t *testing.T) { } } +func TestNRGPendingAppendEntryCacheInvalidation(t *testing.T) { + for _, test := range []struct { + title string + entries int + }{ + {title: "empty", entries: 1}, + {title: "at limit", entries: paeDropThreshold}, + {title: "full", entries: paeDropThreshold + 1}, + } { + t.Run(test.title, func(t *testing.T) { + c := createJetStreamClusterExplicit(t, "R3S", 3) + defer c.shutdown() + + rg := c.createMemRaftGroup("TEST", 3, newStateAdder) + rg.waitOnLeader() + l := rg.leader() + + l.(*stateAdder).proposeDelta(1) + rg.waitOnTotal(t, 1) + + // Fill up the cache with N entries. + // The contents don't matter as they should never be applied. + rg.lockAll() + for _, s := range rg { + n := s.node().(*raft) + for i := 0; i < test.entries; i++ { + n.pae[n.pindex+uint64(1+i)] = newAppendEntry("", 0, 0, 0, 0, nil) + } + } + rg.unlockAll() + + l.(*stateAdder).proposeDelta(1) + rg.waitOnTotal(t, 2) + }) + } +} + func TestNRGCatchupDoesNotTruncateUncommittedEntriesWithQuorum(t *testing.T) { n, cleanup := initSingleMemRaftNode(t) defer cleanup()