Skip to content

Commit

Permalink
[sys-6995] maintain epochs of first manifest update after persist rep…
Browse files Browse the repository at this point in the history
…lication sequence
  • Loading branch information
seckcoder committed Feb 12, 2024
1 parent 40f265a commit 8dbe107
Show file tree
Hide file tree
Showing 16 changed files with 321 additions and 13 deletions.
1 change: 1 addition & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -762,6 +762,7 @@ set(SOURCES
db/write_batch_base.cc
db/write_controller.cc
db/write_thread.cc
db/replication_epoch_edit.cc
env/composite_env.cc
env/env.cc
env/env_chroot.cc
Expand Down
7 changes: 7 additions & 0 deletions cloud/replication_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include "rocksdb/table.h"
#include "test_util/testharness.h"
#include "util/cast_util.h"
#include "util/coding.h"
#include "util/random.h"
#include "util/string_util.h"

Expand All @@ -36,6 +37,12 @@ class Listener : public ReplicationLogListener {

void setState(State state) { state_ = state; }

uint64_t EpochOfReplicationSequence(Slice replication_seq) override {
uint32_t seq;
assert(GetFixed32(&replication_seq, &seq));
return seq;
}

std::string OnReplicationLogRecord(ReplicationLogRecord record) override {
// We should't be producing replication log records during open
assert(state_ != OPEN);
Expand Down
2 changes: 2 additions & 0 deletions db/db_impl/db_impl_write.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1774,10 +1774,12 @@ Status DBImpl::HandleWriteBufferManagerFlush(WriteContext* write_context) {

MemTableSwitchRecord mem_switch_record;
std::string replication_sequence;
uint64_t next_mus{0};
if (immutable_db_options_.replication_log_listener && !cfds.empty()) {
mem_switch_record.next_log_num = versions_->NewFileNumber();
replication_sequence = RecordMemTableSwitch(
immutable_db_options_.replication_log_listener, mem_switch_record);
next_mus = versions_->GetManifestUpdateSequence() + 1;
}

WriteThread::Writer nonmem_w;
Expand Down
4 changes: 2 additions & 2 deletions db/db_test_util.cc
Original file line number Diff line number Diff line change
Expand Up @@ -694,8 +694,8 @@ Env* DBTestBase::CreateNewAwsEnv(const std::string& prefix, Env* parent) {
coptions.TEST_Initialize("dbtest.", prefix, region);
// Delete cloud files immediately
coptions.cloud_file_deletion_delay = std::nullopt;
Status st = CloudFileSystem::NewAwsFileSystem(parent->GetFileSystem(),
coptions, info_log_, &cfs);
Status st = CloudFileSystemEnv::NewAwsFileSystem(parent->GetFileSystem(),
coptions, info_log_, &cfs);
auto* cimpl = dynamic_cast<CloudFileSystemImpl*>(cfs);
assert(cimpl);
cimpl->TEST_DisableCloudManifest();
Expand Down
85 changes: 85 additions & 0 deletions db/replication_epoch_edit.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
#include "db/replication_epoch_edit.h"
#include <algorithm>
#include <optional>
#include <sstream>

#include "rocksdb/slice.h"
#include "rocksdb/status.h"
#include "util/coding.h"

namespace ROCKSDB_NAMESPACE {

ReplicationEpochAddition::ReplicationEpochAddition(uint64_t epoch,
uint64_t first_mus)
: epoch_(epoch), first_mus_(first_mus) {}

void ReplicationEpochAddition::EncodeTo(std::string* output) const {
PutVarint64(output, epoch_);
PutVarint64(output, first_mus_);
}

Status ReplicationEpochAddition::DecodeFrom(Slice* input) {
if (!GetVarint64(input, &epoch_)) {
return Status::Corruption("ReplicationEpochAddition", "Error decoding epoch");
}
if (!GetVarint64(input, &first_mus_)) {
return Status::Corruption("ReplicationEpochAddition", "Error decoding manifest update sequence");
}
return Status::OK();
}

std::string ReplicationEpochAddition::DebugString() const {
std::ostringstream oss;
oss << *this;
return oss.str();
}

std::ostream& operator<<(std::ostream& os, const ReplicationEpochAddition& ea) {
os << "epoch: " << ea.GetEpoch() << ", mus: " << ea.GetFirstMUS();
return os;
}

Status ReplicationEpochSet::AddEpoch(const ReplicationEpochAddition& epoch) {
if (!epochs_.empty() && epochs_.back() >= epoch) {
std::stringstream ss;
ss << "Misordered replication epoch. prev: " << epochs_.back()
<< ", next: " << epoch;
return Status::Corruption(ss.str());
}
epochs_.push_back(epoch);
return Status::OK();
}

Status ReplicationEpochSet::AddEpochs(const ReplicationEpochAdditions& epochs) {
for (auto& epoch: epochs) {
auto s = AddEpoch(epoch);
if (!s.ok()) {
return s;
}
}
return Status::OK();
}

void ReplicationEpochSet::DeleteEpochsBefore(
uint64_t epoch, uint32_t max_num_replication_epochs) {
while (!epochs_.empty() && (epochs_.front().GetEpoch() < epoch ||
epochs_.size() > max_num_replication_epochs)) {
epochs_.pop_front();
}
}

std::optional<uint64_t> ReplicationEpochSet::GetEpochForMUS(uint64_t mus) {
if (empty() || epochs_.front().GetFirstMUS() > mus || epochs_.back().GetFirstMUS() < mus) {
return std::nullopt;
}

auto it = std::upper_bound(epochs_.begin(), epochs_.end(), mus,
[&](auto mus, const auto& epoch_addition) {
return mus < epoch_addition.GetFirstMUS();
});
assert(it != epochs_.begin());
--it;
return it->GetEpoch();
}

}
75 changes: 75 additions & 0 deletions db/replication_epoch_edit.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
#pragma once

#include <cstdint>
#include <deque>
#include <optional>
#include <string>
#include <vector>

#include "rocksdb/rocksdb_namespace.h"

namespace ROCKSDB_NAMESPACE {

class Slice;
class Status;

// Replication epoch and first manifest update sequence in that epoch
class ReplicationEpochAddition {
public:
ReplicationEpochAddition() = default;
ReplicationEpochAddition(uint64_t epoch, uint64_t first_mus);

uint64_t GetEpoch() const { return epoch_; }
uint64_t GetFirstMUS() const { return first_mus_; }

void EncodeTo(std::string* output) const;
Status DecodeFrom(Slice* input);

std::string DebugString() const;
private:
// The replication epoch
uint64_t epoch_{0};
// First manifest update sequence of the replication epoch
uint64_t first_mus_{0};
};

inline bool operator==(const ReplicationEpochAddition& ea1, const ReplicationEpochAddition& ea2) {
return ea1.GetEpoch() == ea2.GetEpoch() && ea1.GetFirstMUS() == ea2.GetFirstMUS();
}
inline bool operator<(const ReplicationEpochAddition& ea1, const ReplicationEpochAddition& ea2) {
return (ea1.GetEpoch() < ea2.GetEpoch() &&
ea1.GetFirstMUS() < ea2.GetFirstMUS()) ||
(ea1.GetEpoch() == ea2.GetEpoch() &&
ea1.GetFirstMUS() < ea2.GetFirstMUS());
}
inline bool operator>=(const ReplicationEpochAddition& ea1,
const ReplicationEpochAddition ea2) {
return !(ea1 < ea2);
}
std::ostream& operator<<(std::ostream& os, const ReplicationEpochAddition& ea);

using ReplicationEpochAdditions = std::vector<ReplicationEpochAddition>;

class ReplicationEpochSet {
public:
Status AddEpoch(const ReplicationEpochAddition& epoch);
Status AddEpochs(const ReplicationEpochAdditions& epochs);

void DeleteEpochsBefore(uint64_t epoch, uint32_t max_num_replication_epochs);

// Find corresponding epoch for manifest update sequence.
// Return std::nullopt if `mus` is not coverred by the
// `ReplicationEpochSet`.
std::optional<uint64_t> GetEpochForMUS(uint64_t mus);

const auto& GetEpochs() const {
return epochs_;
}

bool empty() const { return epochs_.empty(); }

private:
std::deque<ReplicationEpochAddition> epochs_;
};

}
21 changes: 21 additions & 0 deletions db/version_edit.cc
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
#include "db/version_edit.h"

#include "db/blob/blob_index.h"
#include "db/replication_epoch_edit.h"
#include "db/version_set.h"
#include "logging/event_logger.h"
#include "rocksdb/slice.h"
Expand Down Expand Up @@ -117,6 +118,12 @@ bool VersionEdit::EncodeTo(std::string* dst) const {
PutVarint32Varint64(dst, kManifestUpdateSequence,
manifest_update_sequence_);
}
for (const auto &replication_epoch_addition: replication_epoch_additions_) {
PutVarint32(dst, kReplicationEpochAdd);
std::string encoded;
replication_epoch_addition.EncodeTo(&encoded);
PutLengthPrefixedSlice(dst, encoded);
}
if (has_prev_log_number_) {
PutVarint32Varint64(dst, kPrevLogNumber, prev_log_number_);
}
Expand Down Expand Up @@ -509,7 +516,17 @@ Status VersionEdit::DecodeFrom(const Slice& src) {
msg = "manifest update sequence";
}
break;
case kReplicationEpochAdd: {
ReplicationEpochAddition replication_epoch_addition;
auto s = replication_epoch_addition.DecodeFrom(&input);
if (!s.ok()) {
return s;
}

replication_epoch_additions_.emplace_back(
std::move(replication_epoch_addition));
break;
}
case kPrevLogNumber:
if (GetVarint64(&input, &prev_log_number_)) {
has_prev_log_number_ = true;
Expand Down Expand Up @@ -816,6 +833,10 @@ std::string VersionEdit::DebugString(bool hex_key) const {
r.append("\n ManifestUpdateSequence: ");
AppendNumberTo(&r, manifest_update_sequence_);
}
for (const auto& epoch_addition: replication_epoch_additions_) {
r.append("\n ReplicationEpochAddition: ");
r.append(epoch_addition.DebugString());
}
if (has_prev_log_number_) {
r.append("\n PrevLogNumber: ");
AppendNumberTo(&r, prev_log_number_);
Expand Down
11 changes: 11 additions & 0 deletions db/version_edit.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#include "db/blob/blob_file_addition.h"
#include "db/blob/blob_file_garbage.h"
#include "db/dbformat.h"
#include "db/replication_epoch_edit.h"
#include "db/wal_edit.h"
#include "memory/arena.h"
#include "port/malloc.h"
Expand Down Expand Up @@ -61,6 +62,7 @@ enum Tag : uint32_t {
// RocksDB-Cloud additions to the manifest
kReplicationSequence = 720,
kManifestUpdateSequence = 721,
kReplicationEpochAdd = 722,

// Mask for an unidentified tag from the future which can be safely ignored.
kTagSafeIgnoreMask = 1 << 13,
Expand Down Expand Up @@ -421,6 +423,13 @@ class VersionEdit {
bool HasManifestUpdateSequence() const { return has_manifest_update_sequence_; }
uint64_t GetManifestUpdateSequence() const { return manifest_update_sequence_; }

void addReplicationEpoch(ReplicationEpochAddition epochAddition) {
replication_epoch_additions_.emplace_back(std::move(epochAddition));
}
const ReplicationEpochAdditions& GetReplicationEpochAdditions() const {
return replication_epoch_additions_;
}

void SetPrevLogNumber(uint64_t num) {
has_prev_log_number_ = true;
prev_log_number_ = num;
Expand Down Expand Up @@ -690,6 +699,7 @@ class VersionEdit {
uint64_t log_number_ = 0;
std::string replication_sequence_ = "";
uint64_t manifest_update_sequence_ = 0;
ReplicationEpochAdditions replication_epoch_additions_;
uint64_t prev_log_number_ = 0;
uint64_t next_file_number_ = 0;
uint32_t max_column_family_ = 0;
Expand All @@ -701,6 +711,7 @@ class VersionEdit {
bool has_log_number_ = false;
bool has_replication_sequence_ = false;
bool has_manifest_update_sequence_ = false;
bool has_manifest_update_epoch_ = false;
bool has_prev_log_number_ = false;
bool has_next_file_number_ = false;
bool has_max_column_family_ = false;
Expand Down
1 change: 1 addition & 0 deletions db/version_edit_handler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -483,6 +483,7 @@ void VersionEditHandler::CheckIterationResult(const log::Reader& reader,
version_set_->replication_sequence_ =
version_edit_params_.GetReplicationSequence();
}
// TODO(wei): apply replication epoch changes as well???
}
}

Expand Down
Loading

0 comments on commit 8dbe107

Please sign in to comment.