Skip to content

Commit 9266be8

Browse files
MauriceVanVeenneilalexander
authored andcommitted
NRG: Use correct sequence when truncating to previous pterm/pindex
Signed-off-by: Maurice van Veen <[email protected]>
1 parent a838553 commit 9266be8

File tree

2 files changed

+73
-160
lines changed

2 files changed

+73
-160
lines changed

server/raft.go

Lines changed: 8 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -3287,23 +3287,17 @@ func (n *raft) processAppendEntry(ae *appendEntry, sub *subscription) {
32873287
n.updateLeadChange(false)
32883288
}
32893289

3290+
RETRY:
32903291
if ae.pterm != n.pterm || ae.pindex != n.pindex {
32913292
// Check if this is a lower or equal index than what we were expecting.
32923293
if ae.pindex <= n.pindex {
32933294
n.debug("AppendEntry detected pindex less than/equal to ours: %d:%d vs %d:%d", ae.pterm, ae.pindex, n.pterm, n.pindex)
32943295
var ar *appendEntryResponse
32953296
var success bool
32963297

3297-
if n.commit > 0 && ae.pindex <= n.commit {
3298-
// Check if only our terms do not match here.
3299-
if ae.pindex == n.pindex {
3300-
// Make sure pterms match and we take on the leader's.
3301-
// This prevents constant spinning.
3302-
n.truncateWAL(ae.pterm, ae.pindex)
3303-
} else {
3304-
// If we have already committed this entry, just mark success.
3305-
success = true
3306-
}
3298+
if ae.pindex < n.commit {
3299+
// If we have already committed this entry, just mark success.
3300+
success = true
33073301
} else if eae, _ := n.loadEntry(ae.pindex); eae == nil {
33083302
// If terms are equal, and we are not catching up, we have simply already processed this message.
33093303
// So we will ACK back to the leader. This can happen on server restarts based on timings of snapshots.
@@ -3317,6 +3311,10 @@ func (n *raft) processAppendEntry(ae *appendEntry, sub *subscription) {
33173311
} else {
33183312
n.resetWAL()
33193313
}
3314+
} else if eae.term == ae.pterm {
3315+
// If terms match we can delete all entries past this one, and then continue storing the current entry.
3316+
n.truncateWAL(eae.term, eae.pindex+1)
3317+
goto RETRY
33203318
} else {
33213319
// If terms mismatched, delete that entry and all others past it.
33223320
// Make sure to cancel any catchups in progress.

server/raft_test.go

Lines changed: 65 additions & 150 deletions
Original file line numberDiff line numberDiff line change
@@ -461,6 +461,14 @@ func TestNRGUnsuccessfulVoteRequestDoesntResetElectionTimer(t *testing.T) {
461461
defer nc.Close()
462462

463463
rg := c.createRaftGroup("TEST", 3, newStateAdder)
464+
465+
// Because the election timer is quite high, we want to kick a node into
466+
// campaigning before it naturally needs to, otherwise the test takes a
467+
// long time just to pick a leader.
468+
for _, n := range rg {
469+
n.node().Campaign()
470+
break
471+
}
464472
rg.waitOnLeader()
465473
leader := rg.leader().node().(*raft)
466474
follower := rg.nonLeader().node().(*raft)
@@ -739,52 +747,6 @@ func TestNRGCandidateDoesntRevertTermAfterOldAE(t *testing.T) {
739747
require_NotEqual(t, follower.term, 6)
740748
}
741749

742-
func TestNRGDontRemoveSnapshotIfTruncateToApplied(t *testing.T) {
743-
n, cleanup := initSingleMemRaftNode(t)
744-
defer cleanup()
745-
746-
// Create a sample entry, the content doesn't matter, just that it's stored.
747-
esm := encodeStreamMsgAllowCompress("foo", "_INBOX.foo", nil, nil, 0, 0, true)
748-
entries := []*Entry{newEntry(EntryNormal, esm)}
749-
750-
nats0 := "S1Nunr6R" // "nats-0"
751-
752-
// Timeline.
753-
aeMsg := encode(t, &appendEntry{leader: nats0, term: 1, commit: 0, pterm: 0, pindex: 0, entries: entries})
754-
aeHeartbeat := encode(t, &appendEntry{leader: nats0, term: 1, commit: 1, pterm: 1, pindex: 1, entries: nil})
755-
756-
// Initial case is simple, just store the entry.
757-
n.processAppendEntry(aeMsg, n.aesub)
758-
require_Equal(t, n.wal.State().Msgs, 1)
759-
entry, err := n.loadEntry(1)
760-
require_NoError(t, err)
761-
require_Equal(t, entry.leader, nats0)
762-
763-
// Heartbeat, makes sure commit moves up.
764-
n.processAppendEntry(aeHeartbeat, n.aesub)
765-
require_Equal(t, n.commit, 1)
766-
require_Equal(t, n.pterm, 1)
767-
768-
// Simulate upper layer calling down to apply.
769-
n.Applied(1)
770-
771-
// Install snapshot and check it exists.
772-
err = n.InstallSnapshot(nil)
773-
require_NoError(t, err)
774-
775-
snapshots := path.Join(n.sd, snapshotsDir)
776-
files, err := os.ReadDir(snapshots)
777-
require_NoError(t, err)
778-
require_Equal(t, len(files), 1)
779-
780-
// Truncate and check snapshot is kept.
781-
n.truncateWAL(n.pterm, n.applied)
782-
783-
files, err = os.ReadDir(snapshots)
784-
require_NoError(t, err)
785-
require_Equal(t, len(files), 1)
786-
}
787-
788750
func TestNRGTermDoesntRollBackToPtermOnCatchup(t *testing.T) {
789751
c := createJetStreamClusterExplicit(t, "R3S", 3)
790752
defer c.shutdown()
@@ -1111,7 +1073,9 @@ func TestNRGTermNoDecreaseAfterWALReset(t *testing.T) {
11111073
fn.processAppendEntry(ae, fn.aesub)
11121074
require_Equal(t, fn.term, 20) // Follower should reject and the term stays the same.
11131075

1076+
fn.Lock()
11141077
fn.resetWAL()
1078+
fn.Unlock()
11151079
fn.processAppendEntry(ae, fn.aesub)
11161080
require_Equal(t, fn.term, 20) // Follower should reject again, even after reset, term stays the same.
11171081
}
@@ -1310,8 +1274,8 @@ func TestNRGCatchupDoesNotTruncateCommittedEntriesDuringRedelivery(t *testing.T)
13101274

13111275
// Timeline.
13121276
aeMsg1 := encode(t, &appendEntry{leader: nats0, term: 1, commit: 0, pterm: 0, pindex: 0, entries: entries})
1313-
aeHeartbeat1 := encode(t, &appendEntry{leader: nats0, term: 1, commit: 1, pterm: 1, pindex: 1, entries: nil})
13141277
aeMsg2 := encode(t, &appendEntry{leader: nats0, term: 1, commit: 1, pterm: 1, pindex: 1, entries: entries})
1278+
aeHeartbeat1 := encode(t, &appendEntry{leader: nats0, term: 1, commit: 2, pterm: 1, pindex: 2, entries: nil})
13151279
aeMsg3 := encode(t, &appendEntry{leader: nats0, term: 1, commit: 1, pterm: 1, pindex: 2, entries: entries})
13161280
aeHeartbeat2 := encode(t, &appendEntry{leader: nats0, term: 1, commit: 3, pterm: 1, pindex: 3, entries: nil})
13171281

@@ -1322,17 +1286,17 @@ func TestNRGCatchupDoesNotTruncateCommittedEntriesDuringRedelivery(t *testing.T)
13221286
require_NoError(t, err)
13231287
require_Equal(t, entry.leader, nats0)
13241288

1325-
// Heartbeat, makes sure commit moves up.
1326-
n.processAppendEntry(aeHeartbeat1, n.aesub)
1327-
require_Equal(t, n.commit, 1)
1328-
13291289
// Deliver a message.
13301290
n.processAppendEntry(aeMsg2, n.aesub)
13311291
require_Equal(t, n.wal.State().Msgs, 2)
13321292
entry, err = n.loadEntry(2)
13331293
require_NoError(t, err)
13341294
require_Equal(t, entry.leader, nats0)
13351295

1296+
// Heartbeat, makes sure commit moves up.
1297+
n.processAppendEntry(aeHeartbeat1, n.aesub)
1298+
require_Equal(t, n.commit, 2)
1299+
13361300
// Deliver another message.
13371301
n.processAppendEntry(aeMsg3, n.aesub)
13381302
require_Equal(t, n.wal.State().Msgs, 3)
@@ -1342,49 +1306,13 @@ func TestNRGCatchupDoesNotTruncateCommittedEntriesDuringRedelivery(t *testing.T)
13421306

13431307
// Simulate receiving an old entry as a redelivery. We should not truncate as that lowers our commit.
13441308
n.processAppendEntry(aeMsg1, n.aesub)
1345-
require_Equal(t, n.commit, 1)
1309+
require_Equal(t, n.commit, 2)
13461310

13471311
// Heartbeat, makes sure we commit.
13481312
n.processAppendEntry(aeHeartbeat2, n.aesub)
13491313
require_Equal(t, n.commit, 3)
13501314
}
13511315

1352-
func TestNRGNewLeaderWithIncorrectPtermDoesNotTruncateIfCommitted(t *testing.T) {
1353-
n, cleanup := initSingleMemRaftNode(t)
1354-
defer cleanup()
1355-
1356-
// Create a sample entry, the content doesn't matter, just that it's stored.
1357-
esm := encodeStreamMsgAllowCompress("foo", "_INBOX.foo", nil, nil, 0, 0, true)
1358-
entries := []*Entry{newEntry(EntryNormal, esm)}
1359-
1360-
nats0 := "S1Nunr6R" // "nats-0"
1361-
nats1 := "yrzKKRBu" // "nats-1"
1362-
1363-
// Timeline, first leader.
1364-
aeMsg1 := encode(t, &appendEntry{leader: nats0, term: 1, commit: 0, pterm: 0, pindex: 0, entries: entries})
1365-
aeHeartbeat1 := encode(t, &appendEntry{leader: nats0, term: 1, commit: 1, pterm: 1, pindex: 1, entries: nil})
1366-
1367-
// Timeline, leader changed, but pterm got set to term.
1368-
aeHeartbeat2 := encode(t, &appendEntry{leader: nats1, term: 2, commit: 1, pterm: 2, pindex: 1, entries: nil})
1369-
1370-
// Initial case is simple, just store the entry.
1371-
n.processAppendEntry(aeMsg1, n.aesub)
1372-
require_Equal(t, n.wal.State().Msgs, 1)
1373-
entry, err := n.loadEntry(1)
1374-
require_NoError(t, err)
1375-
require_Equal(t, entry.leader, nats0)
1376-
1377-
// Heartbeat, makes sure commit moves up.
1378-
n.processAppendEntry(aeHeartbeat1, n.aesub)
1379-
require_Equal(t, n.commit, 1)
1380-
require_Equal(t, n.pterm, 1)
1381-
1382-
// Heartbeat from another leader, pterm got set to term, make sure to only up our pterm.
1383-
n.processAppendEntry(aeHeartbeat2, n.aesub)
1384-
require_Equal(t, n.commit, 1)
1385-
require_Equal(t, n.pterm, 2)
1386-
}
1387-
13881316
func TestNRGCatchupFromNewLeaderWithIncorrectPterm(t *testing.T) {
13891317
n, cleanup := initSingleMemRaftNode(t)
13901318
defer cleanup()
@@ -1431,68 +1359,6 @@ func TestNRGCatchupFromNewLeaderWithIncorrectPterm(t *testing.T) {
14311359
require_Equal(t, n.commit, 1)
14321360
}
14331361

1434-
func TestNRGCatchupFromNewLeaderWithIncorrectPtermDoesNotTruncateIfCommitted(t *testing.T) {
1435-
n, cleanup := initSingleMemRaftNode(t)
1436-
defer cleanup()
1437-
1438-
// Create a sample entry, the content doesn't matter, just that it's stored.
1439-
esm := encodeStreamMsgAllowCompress("foo", "_INBOX.foo", nil, nil, 0, 0, true)
1440-
entries := []*Entry{newEntry(EntryNormal, esm)}
1441-
1442-
nats0 := "S1Nunr6R" // "nats-0"
1443-
nats1 := "yrzKKRBu" // "nats-1"
1444-
1445-
// Timeline, first leader.
1446-
aeMsg1 := encode(t, &appendEntry{leader: nats0, term: 1, commit: 0, pterm: 0, pindex: 0, entries: entries})
1447-
aeHeartbeat1 := encode(t, &appendEntry{leader: nats0, term: 1, commit: 1, pterm: 1, pindex: 1, entries: nil})
1448-
1449-
// Timeline, leader changed, but pterm got set to term.
1450-
aeMsg2 := encode(t, &appendEntry{leader: nats1, term: 2, commit: 1, pterm: 2, pindex: 1, entries: entries})
1451-
aeHeartbeat2 := encode(t, &appendEntry{leader: nats1, term: 2, commit: 2, pterm: 2, pindex: 2, entries: nil})
1452-
1453-
// Initial case is simple, just store the entry.
1454-
n.processAppendEntry(aeMsg1, n.aesub)
1455-
require_Equal(t, n.wal.State().Msgs, 1)
1456-
entry, err := n.loadEntry(1)
1457-
require_NoError(t, err)
1458-
require_Equal(t, entry.leader, nats0)
1459-
1460-
// Heartbeat, makes sure commit moves up.
1461-
n.processAppendEntry(aeHeartbeat1, n.aesub)
1462-
require_Equal(t, n.commit, 1)
1463-
require_Equal(t, n.pterm, 1)
1464-
1465-
// Heartbeat from another leader, we missed a message so we need catchup.
1466-
n.processAppendEntry(aeHeartbeat2, n.aesub)
1467-
require_Equal(t, n.commit, 1) // Commit should not change, as we missed an item.
1468-
require_True(t, n.catchup != nil)
1469-
require_Equal(t, n.catchup.pterm, 1) // n.pterm
1470-
require_Equal(t, n.catchup.pindex, 1) // n.pindex
1471-
1472-
// We get a message with an incorrect pterm, can only correct pterm and requires re-trigger of catchup.
1473-
n.processAppendEntry(aeMsg2, n.catchup.sub)
1474-
require_True(t, n.catchup == nil)
1475-
require_Equal(t, n.pterm, 2)
1476-
require_Equal(t, n.pindex, 1)
1477-
1478-
// Heartbeat re-triggers catchup.
1479-
n.processAppendEntry(aeHeartbeat2, n.aesub)
1480-
require_Equal(t, n.commit, 1) // Commit should not change, as we missed an item.
1481-
require_True(t, n.catchup != nil)
1482-
require_Equal(t, n.catchup.pterm, 2) // n.pterm
1483-
require_Equal(t, n.catchup.pindex, 1) // n.pindex
1484-
1485-
// Now we get the message again and can continue to store it.
1486-
n.processAppendEntry(aeMsg2, n.catchup.sub)
1487-
require_Equal(t, n.wal.State().Msgs, 2)
1488-
require_True(t, n.catchup != nil)
1489-
1490-
// Heartbeat can now cancel catchup and move up our commit.
1491-
n.processAppendEntry(aeHeartbeat2, n.aesub)
1492-
require_Equal(t, n.commit, 2)
1493-
require_True(t, n.catchup == nil)
1494-
}
1495-
14961362
func TestNRGDontRemoveSnapshotIfTruncateToApplied(t *testing.T) {
14971363
n, cleanup := initSingleMemRaftNode(t)
14981364
defer cleanup()
@@ -1703,3 +1569,52 @@ func TestNRGMultipleStopsDontPanic(t *testing.T) {
17031569
n.Stop()
17041570
}
17051571
}
1572+
1573+
func TestNRGTruncateDownToCommitted(t *testing.T) {
1574+
n, cleanup := initSingleMemRaftNode(t)
1575+
defer cleanup()
1576+
1577+
// Create a sample entry, the content doesn't matter, just that it's stored.
1578+
esm := encodeStreamMsgAllowCompress("foo", "_INBOX.foo", nil, nil, 0, 0, true)
1579+
entries := []*Entry{newEntry(EntryNormal, esm)}
1580+
1581+
nats0 := "S1Nunr6R" // "nats-0"
1582+
nats1 := "yrzKKRBu" // "nats-1"
1583+
1584+
// Timeline, we are leader
1585+
aeMsg1 := encode(t, &appendEntry{leader: nats0, term: 1, commit: 0, pterm: 0, pindex: 0, entries: entries})
1586+
aeMsg2 := encode(t, &appendEntry{leader: nats0, term: 1, commit: 1, pterm: 1, pindex: 1, entries: entries})
1587+
1588+
// Timeline, after leader change
1589+
aeMsg3 := encode(t, &appendEntry{leader: nats1, term: 2, commit: 0, pterm: 1, pindex: 1, entries: entries})
1590+
aeHeartbeat := encode(t, &appendEntry{leader: nats1, term: 2, commit: 2, pterm: 2, pindex: 2, entries: nil})
1591+
1592+
// Simply receive first message.
1593+
n.processAppendEntry(aeMsg1, n.aesub)
1594+
require_Equal(t, n.commit, 0)
1595+
require_Equal(t, n.wal.State().Msgs, 1)
1596+
entry, err := n.loadEntry(1)
1597+
require_NoError(t, err)
1598+
require_Equal(t, entry.leader, nats0)
1599+
1600+
// Receive second message, which commits the first message.
1601+
n.processAppendEntry(aeMsg2, n.aesub)
1602+
require_Equal(t, n.commit, 1)
1603+
require_Equal(t, n.wal.State().Msgs, 2)
1604+
entry, err = n.loadEntry(2)
1605+
require_NoError(t, err)
1606+
require_Equal(t, entry.leader, nats0)
1607+
1608+
// We receive an entry from another leader, should truncate down to commit / remove the second message.
1609+
// After doing so, we should also be able to immediately store the message after.
1610+
n.processAppendEntry(aeMsg3, n.aesub)
1611+
require_Equal(t, n.commit, 1)
1612+
require_Equal(t, n.wal.State().Msgs, 2)
1613+
entry, err = n.loadEntry(2)
1614+
require_NoError(t, err)
1615+
require_Equal(t, entry.leader, nats1)
1616+
1617+
// Heartbeat moves commit up.
1618+
n.processAppendEntry(aeHeartbeat, n.aesub)
1619+
require_Equal(t, n.commit, 2)
1620+
}

0 commit comments

Comments
 (0)