Skip to content

Commit

Permalink
don't treat pruned (epoch,mus) as diverged
Browse files Browse the repository at this point in the history
  • Loading branch information
seckcoder committed Mar 5, 2024
1 parent d119ead commit 017d4ca
Show file tree
Hide file tree
Showing 4 changed files with 90 additions and 11 deletions.
60 changes: 58 additions & 2 deletions cloud/replication_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1431,7 +1431,7 @@ TEST_F(ReplicationTest, ReplicationEpochs) {
leaderFull()->GetVersionSet()->TEST_GetReplicationEpochSet().empty());

ASSERT_OK(leader->Put(wo(), leaderCF(cf(0)), "k2", "v2"));
ASSERT_OK(leader->Flush(FlushOptions()));
ASSERT_OK(leader->Flush(FlushOptions(), leaderCF(cf(0))));

catchUpFollower();

Expand All @@ -1441,7 +1441,7 @@ TEST_F(ReplicationTest, ReplicationEpochs) {

UpdateLeaderEpoch(3);
ASSERT_OK(leader->Put(wo(), leaderCF(cf(0)), "k3", "v3"));
ASSERT_OK(leader->Flush(FlushOptions()));
ASSERT_OK(leader->Flush(FlushOptions(), leaderCF(cf(0))));

catchUpFollower();
ASSERT_EQ(leaderFull()->GetVersionSet()->TEST_GetReplicationEpochSet().size(),
Expand All @@ -1455,6 +1455,62 @@ TEST_F(ReplicationTest, ReplicationEpochs) {
1);
}

TEST_F(ReplicationTest, MaxNumReplicationEpochs) {
auto options = leaderOptions();
options.disable_auto_compactions = true;
options.disable_auto_flush = true;
// maintain at most two replication epochs in the set
options.max_num_replication_epochs = 2;
auto leader = openLeader(options);
openFollower(options);

auto cf = [](int i) { return "cf" + std::to_string(i); };

createColumnFamily(cf(0));

UpdateLeaderEpoch(2);

ASSERT_OK(leader->Put(wo(), leaderCF(cf(0)), "k1", "v1"));
ASSERT_OK(leader->Flush(FlushOptions(), leaderCF(cf(0))));

ASSERT_EQ(leaderFull()->GetVersionSet()->TEST_GetReplicationEpochSet().size(),
1);

catchUpFollower();
verifyReplicationEpochsEqual();

UpdateLeaderEpoch(3);
// generate some manifest writes without changing persisted replication
// sequence
ASSERT_OK(leader->SetOptions(leaderCF(cf(0)), {{"max_write_buffer_number", "3"}}));

ASSERT_EQ(leaderFull()->GetVersionSet()->TEST_GetReplicationEpochSet().size(),
2);

UpdateLeaderEpoch(4);
// generate some manifest writes without changing persisted replication
// sequence
ASSERT_OK(leader->SetOptions(leaderCF(cf(0)), {{"max_write_buffer_number", "2"}}));

ASSERT_EQ(leaderFull()->GetVersionSet()->TEST_GetReplicationEpochSet().size(),
2);
ASSERT_EQ(leaderFull()
->GetVersionSet()
->TEST_GetReplicationEpochSet()
.GetSmallestEpoch(),
3);

catchUpFollower();
verifyReplicationEpochsEqual();

closeLeader();
leader = openLeader(options);
ASSERT_EQ(leaderFull()->GetVersionSet()->TEST_GetReplicationEpochSet().size(),
2);
ASSERT_EQ(leaderFull()->GetVersionSet()->TEST_GetReplicationEpochSet().GetSmallestEpoch(),
3);
}

} // namespace ROCKSDB_NAMESPACE

// A black-box test for the cloud wrapper around rocksdb
Expand Down
35 changes: 27 additions & 8 deletions db/db_impl/db_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1418,16 +1418,35 @@ Status DBImpl::ApplyReplicationLogRecord(ReplicationLogRecord record,
!versions_->IsReplicationEpochsEmpty()) {
auto inferred_epoch_of_mus = versions_->GetReplicationEpochForMUS(
latest_applied_update_sequence);
if (!inferred_epoch_of_mus ||
// If mus is smaller than mus in the epoch set, the replication
// epoch should also be smaller than epoch in the epoch set.
if (!inferred_epoch_of_mus &&
replication_epoch >=
versions_->replication_epochs_.GetSmallestEpoch()) {
info->diverged_manifest_writes = true;
ROCKS_LOG_INFO(immutable_db_options_.info_log,
"Diverged manifest found: replication seq: %s, "
"mus: %" PRIu64 ", smallest epoch: %" PRIu64
", actual epoch: %" PRIu64,
replication_sequence.c_str(),
latest_applied_update_sequence,
versions_->replication_epochs_.GetSmallestEpoch(),
replication_epoch);
break;
}
// If we can infer epoch, make sure the epoch actually matches
// with epoch in the `replication_sequence`
if (inferred_epoch_of_mus &&
(*inferred_epoch_of_mus != replication_epoch)) {
// - If inferred_epoch_of_mus is not set, either we are
// recovering manifest writes before persisted replication
// sequence, or there are too many manifest writes after the
// persisted replication sequence. For either case, we report
// divergence and clear local log
// - If inferred_epoch_of_mus doesn't match epoch in
// VersionEdit, the applied version edit is diverged
info->diverged_manifest_writes = true;
ROCKS_LOG_INFO(
immutable_db_options_.info_log,
"Diverged manifest found: replication seq: %s, mus: "
"%" PRIu64 ", inferred epoch: %" PRIu64
", actual epoch: %" PRIu64,
replication_sequence.c_str(),
latest_applied_update_sequence, *inferred_epoch_of_mus,
replication_epoch);
break;
}
} // else Old manifest write which is not diverged
Expand Down
2 changes: 1 addition & 1 deletion db/replication_epoch_edit.cc
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ Status ReplicationEpochSet::AddEpoch(const ReplicationEpochAddition& epoch,
return Status::Corruption(ss.str());
}

while (epochs_.size() + 1 >= max_num_replication_epochs) {
while (epochs_.size() >= max_num_replication_epochs) {
epochs_.pop_front();
}
epochs_.push_back(epoch);
Expand Down
4 changes: 4 additions & 0 deletions db/replication_epoch_edit.h
Original file line number Diff line number Diff line change
Expand Up @@ -101,10 +101,14 @@ class ReplicationEpochSet {
std::optional<uint64_t> GetEpochForMUS(uint64_t mus) const;

const auto& GetEpochs() const { return epochs_; }
uint64_t GetSmallestEpoch() const {
return epochs_.front().GetEpoch();
}

bool empty() const { return epochs_.empty(); }
auto size() const { return epochs_.size(); }


private:
Status AddEpoch(const ReplicationEpochAddition& epoch,
uint32_t max_num_replication_epochs);
Expand Down

0 comments on commit 017d4ca

Please sign in to comment.