Skip to content

Commit 927b544

Browse files
committed
kvserver: use StateEngine durability for log truncations
Loosely coupled log truncations happen when the applied index is durable on the state machine engine. This commit makes all subscriptions to durability depend on the StateEngine, for that matter. Epic: none Release note: none
1 parent 7c9dd4b commit 927b544

File tree

6 files changed

+12
-11
lines changed

6 files changed

+12
-11
lines changed

pkg/kv/kvserver/client_merge_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -756,9 +756,9 @@ func mergeCheckingTimestampCaches(
756756
// the result to apply on the majority quorum.
757757
testutils.SucceedsSoon(t, func() error {
758758
for _, r := range lhsRepls[1:] {
759-
// Loosely-coupled truncation requires an engine flush to advance
760-
// guaranteed durability.
761-
require.NoError(t, r.Store().TODOEngine().Flush())
759+
// Loosely-coupled truncation requires the state engine flush to
760+
// advance guaranteed durability.
761+
require.NoError(t, r.Store().StateEngine().Flush())
762762
if firstIndex := r.GetCompactedIndex() + 1; firstIndex < truncIndex {
763763
return errors.Errorf("truncate not applied, %d < %d", firstIndex, truncIndex)
764764
}

pkg/kv/kvserver/client_raft_log_queue_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -114,7 +114,7 @@ func TestRaftLogQueue(t *testing.T) {
114114
tc.GetFirstStoreFromServer(t, i).MustForceRaftLogScanAndProcess()
115115
}
116116
// Flush the engine to advance durability, which triggers truncation.
117-
require.NoError(t, raftLeaderRepl.Store().TODOEngine().Flush())
117+
require.NoError(t, raftLeaderRepl.Store().StateEngine().Flush())
118118
// Ensure that compacted index has increased indicating that the log
119119
// truncation has occurred.
120120
afterTruncationIndex = raftLeaderRepl.GetCompactedIndex()

pkg/kv/kvserver/client_raft_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -840,7 +840,7 @@ func TestSnapshotAfterTruncation(t *testing.T) {
840840
func waitForTruncationForTesting(t *testing.T, r *kvserver.Replica, compacted kvpb.RaftIndex) {
841841
testutils.SucceedsSoon(t, func() error {
842842
// Flush the engine to advance durability, which triggers truncation.
843-
require.NoError(t, r.Store().TODOEngine().Flush())
843+
require.NoError(t, r.Store().StateEngine().Flush())
844844
if index := r.GetCompactedIndex(); index != compacted {
845845
return errors.Errorf("expected compacted index == %d, got %d", compacted, index)
846846
}

pkg/kv/kvserver/raft_log_queue_test.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -587,7 +587,7 @@ func TestProactiveRaftLogTruncate(t *testing.T) {
587587
testutils.SucceedsSoon(t, func() error {
588588
if looselyCoupled {
589589
// Flush the engine to advance durability, which triggers truncation.
590-
require.NoError(t, store.TODOEngine().Flush())
590+
require.NoError(t, store.StateEngine().Flush())
591591
}
592592
if newCompIndex := r.GetCompactedIndex(); newCompIndex <= oldCompIndex {
593593
return errors.Errorf("log was not correctly truncated, old compacted index:%d, current:%d",
@@ -884,8 +884,9 @@ func waitForTruncationForTesting(
884884
) {
885885
testutils.SucceedsSoon(t, func() error {
886886
if looselyCoupled {
887-
// Flush the engine to advance durability, which triggers truncation.
888-
require.NoError(t, r.store.TODOEngine().Flush())
887+
// Flush the state machine engine to advance durability, which triggers
888+
// truncation.
889+
require.NoError(t, r.store.StateEngine().Flush())
889890
}
890891
// First index should have changed.
891892
if firstIndex := r.GetCompactedIndex() + 1; firstIndex != newFirstIndex {

pkg/kv/kvserver/replica_application_state_machine_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -280,7 +280,7 @@ func TestReplicaStateMachineRaftLogTruncationLooselyCoupled(t *testing.T) {
280280
looselyCoupledTruncationEnabled.Override(ctx, &st.SV, true)
281281
// Remove the flush completed callback since we don't want a
282282
// non-deterministic flush to cause the test to fail.
283-
tc.store.TODOEngine().RegisterFlushCompletedCallback(func() {})
283+
tc.store.StateEngine().RegisterFlushCompletedCallback(func() {})
284284
r := tc.repl
285285

286286
{
@@ -375,7 +375,7 @@ func TestReplicaStateMachineRaftLogTruncationLooselyCoupled(t *testing.T) {
375375
require.True(t, trunc.isDeltaTrusted)
376376
return raftLogSize, truncatedIndex
377377
}()
378-
require.NoError(t, tc.store.TODOEngine().Flush())
378+
require.NoError(t, tc.store.StateEngine().Flush())
379379
// Asynchronous call to advance durability.
380380
tc.store.raftTruncator.durabilityAdvancedCallback()
381381
expectedSize := raftLogSize - 1

pkg/kv/kvserver/store.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2236,7 +2236,7 @@ func (s *Store) Start(ctx context.Context, stopper *stop.Stopper) error {
22362236
{
22372237
truncator := s.raftTruncator
22382238
// When state machine has persisted new RaftAppliedIndex, fire callback.
2239-
s.TODOEngine().RegisterFlushCompletedCallback(func() {
2239+
s.StateEngine().RegisterFlushCompletedCallback(func() {
22402240
truncator.durabilityAdvancedCallback()
22412241
})
22422242
}

0 commit comments

Comments
 (0)