From 017d4caaaa96154697ba8afaca813389313dc50d Mon Sep 17 00:00:00 2001 From: Wei Li Date: Tue, 5 Mar 2024 01:26:31 +0000 Subject: [PATCH] don't treat pruned (epoch,mus) as diverged --- cloud/replication_test.cc | 60 ++++++++++++++++++++++++++++++++++-- db/db_impl/db_impl.cc | 35 ++++++++++++++++----- db/replication_epoch_edit.cc | 2 +- db/replication_epoch_edit.h | 4 +++ 4 files changed, 90 insertions(+), 11 deletions(-) diff --git a/cloud/replication_test.cc b/cloud/replication_test.cc index 5c945f0b76f..18219623678 100644 --- a/cloud/replication_test.cc +++ b/cloud/replication_test.cc @@ -1431,7 +1431,7 @@ TEST_F(ReplicationTest, ReplicationEpochs) { leaderFull()->GetVersionSet()->TEST_GetReplicationEpochSet().empty()); ASSERT_OK(leader->Put(wo(), leaderCF(cf(0)), "k2", "v2")); - ASSERT_OK(leader->Flush(FlushOptions())); + ASSERT_OK(leader->Flush(FlushOptions(), leaderCF(cf(0)))); catchUpFollower(); @@ -1441,7 +1441,7 @@ TEST_F(ReplicationTest, ReplicationEpochs) { UpdateLeaderEpoch(3); ASSERT_OK(leader->Put(wo(), leaderCF(cf(0)), "k3", "v3")); - ASSERT_OK(leader->Flush(FlushOptions())); + ASSERT_OK(leader->Flush(FlushOptions(), leaderCF(cf(0)))); catchUpFollower(); ASSERT_EQ(leaderFull()->GetVersionSet()->TEST_GetReplicationEpochSet().size(), @@ -1455,6 +1455,62 @@ TEST_F(ReplicationTest, ReplicationEpochs) { 1); } +TEST_F(ReplicationTest, MaxNumReplicationEpochs) { + auto options = leaderOptions(); + options.disable_auto_compactions = true; + options.disable_auto_flush = true; + // maintain at most two replication epochs in the set + options.max_num_replication_epochs = 2; + auto leader = openLeader(options); + openFollower(options); + + auto cf = [](int i) { return "cf" + std::to_string(i); }; + + createColumnFamily(cf(0)); + + UpdateLeaderEpoch(2); + + ASSERT_OK(leader->Put(wo(), leaderCF(cf(0)), "k1", "v1")); + ASSERT_OK(leader->Flush(FlushOptions(), leaderCF(cf(0)))); + + ASSERT_EQ(leaderFull()->GetVersionSet()->TEST_GetReplicationEpochSet().size(), + 1); + + catchUpFollower(); + verifyReplicationEpochsEqual(); + + UpdateLeaderEpoch(3); + // generate some manifest writes without changing persisted replication + // sequence + ASSERT_OK(leader->SetOptions(leaderCF(cf(0)), {{"max_write_buffer_number", "3"}})); + + ASSERT_EQ(leaderFull()->GetVersionSet()->TEST_GetReplicationEpochSet().size(), + 2); + + UpdateLeaderEpoch(4); + // generate some manifest writes without changing persisted replication + // sequence + ASSERT_OK(leader->SetOptions(leaderCF(cf(0)), {{"max_write_buffer_number", "2"}})); + + ASSERT_EQ(leaderFull()->GetVersionSet()->TEST_GetReplicationEpochSet().size(), + 2); + ASSERT_EQ(leaderFull() + ->GetVersionSet() + ->TEST_GetReplicationEpochSet() + .GetSmallestEpoch(), + 3); + + catchUpFollower(); + verifyReplicationEpochsEqual(); + + closeLeader(); + leader = openLeader(options); + ASSERT_EQ(leaderFull()->GetVersionSet()->TEST_GetReplicationEpochSet().size(), + 2); + ASSERT_EQ(leaderFull()->GetVersionSet()->TEST_GetReplicationEpochSet().GetSmallestEpoch(), + 3); +} + } // namespace ROCKSDB_NAMESPACE // A black-box test for the cloud wrapper around rocksdb diff --git a/db/db_impl/db_impl.cc b/db/db_impl/db_impl.cc index 96d9f024e80..3083f6a16a4 100644 --- a/db/db_impl/db_impl.cc +++ b/db/db_impl/db_impl.cc @@ -1418,16 +1418,35 @@ Status DBImpl::ApplyReplicationLogRecord(ReplicationLogRecord record, !versions_->IsReplicationEpochsEmpty()) { auto inferred_epoch_of_mus = versions_->GetReplicationEpochForMUS( latest_applied_update_sequence); - if (!inferred_epoch_of_mus || + // If mus is smaller than mus in the epoch set, the replication + // epoch should also be smaller than epoch in the epoch set. + if (!inferred_epoch_of_mus && + replication_epoch >= + versions_->replication_epochs_.GetSmallestEpoch()) { + info->diverged_manifest_writes = true; + ROCKS_LOG_INFO(immutable_db_options_.info_log, + "Diverged manifest found: replication seq: %s, " + "mus: %" PRIu64 ", smallest epoch: %" PRIu64 + ", actual epoch: %" PRIu64, + replication_sequence.c_str(), + latest_applied_update_sequence, + versions_->replication_epochs_.GetSmallestEpoch(), + replication_epoch); + break; + } + // If we can infer epoch, make sure the epoch actually matches + // with epoch in the `replication_sequence` + if (inferred_epoch_of_mus && (*inferred_epoch_of_mus != replication_epoch)) { - // - If inferred_epoch_of_mus is not set, either we are - // recovering manifest writes before persisted replication - // sequence, or there are too many manifest writes after the - // persisted replication sequence. For either case, we report - // divergence and clear local log - // - If inferred_epoch_of_mus doesn't match epoch in - // VersionEdit, the applied version edit is diverged info->diverged_manifest_writes = true; + ROCKS_LOG_INFO( + immutable_db_options_.info_log, + "Diverged manifest found: replication seq: %s, mus: " + "%" PRIu64 ", inferred epoch: %" PRIu64 + ", actual epoch: %" PRIu64, + replication_sequence.c_str(), + latest_applied_update_sequence, *inferred_epoch_of_mus, + replication_epoch); break; } } // else Old manifest write which is not diverged diff --git a/db/replication_epoch_edit.cc b/db/replication_epoch_edit.cc index 81275f2c553..fea8fc9443d 100644 --- a/db/replication_epoch_edit.cc +++ b/db/replication_epoch_edit.cc @@ -51,7 +51,7 @@ Status ReplicationEpochSet::AddEpoch(const ReplicationEpochAddition& epoch, return Status::Corruption(ss.str()); } - while (epochs_.size() + 1 >= max_num_replication_epochs) { + while (epochs_.size() >= max_num_replication_epochs) { epochs_.pop_front(); } epochs_.push_back(epoch); diff --git a/db/replication_epoch_edit.h b/db/replication_epoch_edit.h index 6516a2a7093..e5dee28a7f2 100644 --- a/db/replication_epoch_edit.h +++ b/db/replication_epoch_edit.h @@ -101,10 +101,14 @@ class ReplicationEpochSet { std::optional GetEpochForMUS(uint64_t mus) const; const auto& GetEpochs() const { return epochs_; } + uint64_t GetSmallestEpoch() const { + return epochs_.front().GetEpoch(); + } bool empty() const { return epochs_.empty(); } auto size() const { return epochs_.size(); } + private: Status AddEpoch(const ReplicationEpochAddition& epoch, uint32_t max_num_replication_epochs);