Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix WBM concurrency control, Add SetAllowStall(), Cleanup #11253

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions HISTORY.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
# Rocksdb Change Log
## Unreleased
### New Features
* Allow setting `WriteBufferManager::allow_stall` in runtime by `SetAllowStall()`, which was previously fixed after passed into `WriteBufferManager`'s constructor

## 8.1.0 (03/18/2023)
### Behavior changes
Expand Down
12 changes: 4 additions & 8 deletions db/db_impl/db_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -1200,17 +1200,13 @@ class DBImpl : public DB {
// state_ is changed accordingly.
class WBMStallInterface : public StallInterface {
public:
enum State {
BLOCKED = 0,
RUNNING,
};

WBMStallInterface() : state_cv_(&state_mutex_) {
MutexLock lock(&state_mutex_);
state_ = State::RUNNING;
}

void SetState(State state) {
void SetState(State state) override {
MutexLock lock(&state_mutex_);
state_ = state;
}
Expand Down Expand Up @@ -1871,9 +1867,9 @@ class DBImpl : public DB {
Status DelayWrite(uint64_t num_bytes, WriteThread& write_thread,
const WriteOptions& write_options);

// Begin stalling of writes when memory usage increases beyond a certain
// threshold.
void WriteBufferManagerStallWrites();
// If stall conditions are met, begin stalling of writes with help of
// `WriteBufferManager`
void MaybeWriteBufferManagerStallWrites();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cleanup 7: renaming this function to reflect that we will still do stall condition check again within this function.


Status ThrottleLowPriWritesIfNeeded(const WriteOptions& write_options,
WriteBatch* my_batch);
Expand Down
12 changes: 3 additions & 9 deletions db/db_impl/db_impl_write.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1218,7 +1218,7 @@ Status DBImpl::PreprocessWrite(const WriteOptions& write_options,
status = Status::Incomplete("Write stall");
} else {
InstrumentedMutexLock l(&mutex_);
WriteBufferManagerStallWrites();
MaybeWriteBufferManagerStallWrites();
}
}
InstrumentedMutexLock l(&log_write_mutex_);
Expand Down Expand Up @@ -1900,19 +1900,13 @@ Status DBImpl::DelayWrite(uint64_t num_bytes, WriteThread& write_thread,

// REQUIRES: mutex_ is held
// REQUIRES: this thread is currently at the front of the writer queue
void DBImpl::WriteBufferManagerStallWrites() {
void DBImpl::MaybeWriteBufferManagerStallWrites() {
mutex_.AssertHeld();
// First block future writer threads who want to add themselves to the queue
// of WriteThread.
write_thread_.BeginWriteStall();
mutex_.Unlock();

// Change the state to State::Blocked.
static_cast<WBMStallInterface*>(wbm_stall_.get())
->SetState(WBMStallInterface::State::BLOCKED);
Comment on lines -1911 to -1912
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cleanup 8: make this part of the responsibility of MaybeBeginWriteStall() as the casting static_cast<WBMStallInterface*> indicates we are dealing something too low-level here

// Then WriteBufferManager will add DB instance to its queue
// and block this thread by calling WBMStallInterface::Block().
write_buffer_manager_->BeginWriteStall(wbm_stall_.get());
write_buffer_manager_->MaybeBeginWriteStall(wbm_stall_.get());
wbm_stall_->Block();

mutex_.Lock();
Expand Down
53 changes: 53 additions & 0 deletions db/db_write_buffer_manager_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -846,6 +846,59 @@ TEST_P(DBWriteBufferManagerTest, StopSwitchingMemTablesOnceFlushing) {
delete shared_wbm_db;
}

TEST_F(DBWriteBufferManagerTest, RuntimeChangeableThreadSafeParameters) {
for (std::string test_parameter : {"buffer_size", "allow_stall"}) {
Options options = CurrentOptions();
options.write_buffer_manager.reset(new WriteBufferManager(
10000, nullptr /* cahce */, true /* allow_stall */));
DestroyAndReopen(options);

// Pause flush thread so that the only way to exist write stall
// below is to change the `test_parameter` in the runtime successfully
std::unique_ptr<test::SleepingBackgroundTask> sleeping_task(
new test::SleepingBackgroundTask());
env_->SetBackgroundThreads(1, Env::HIGH);
env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask,
sleeping_task.get(), Env::Priority::HIGH);
sleeping_task->WaitUntilSleeping();

// After completing this write, any future write will be stalled by
// WriteBufferManager
ASSERT_OK(Put(Key(0), DummyString(10000)));

ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
{{"WBMStallInterface::BlockDB",
"DBWriteBufferManagerTest::RuntimeChangeableThreadSafeParameters::"
"ChangeParameter"}});
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();

port::Thread thread1([&] { ASSERT_OK(Put(Key(0), DummyString(10000))); });

port::Thread thread2([&] {
TEST_SYNC_POINT(
"DBWriteBufferManagerTest::RuntimeChangeableThreadSafeParameters::"
"ChangeParameter");
if (test_parameter == "buffer_size") {
options.write_buffer_manager->SetBufferSize(
options.write_buffer_manager->buffer_size() * 200);
} else if (test_parameter == "allow_stall") {
options.write_buffer_manager->SetAllowStall(false);
}
});

// If `test_parameter` is successfully changed in thread2, the write stall
// encountered in thread1 will stop and the test will finish. Othwerwise,
// thread1 will hang forever.
thread1.join();
thread2.join();

ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();

sleeping_task->WakeUp();
sleeping_task->WaitUntilDone();
}
}

INSTANTIATE_TEST_CASE_P(DBWriteBufferManagerTest, DBWriteBufferManagerTest,
testing::Bool());
Expand Down
156 changes: 89 additions & 67 deletions include/rocksdb/write_buffer_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,18 +27,27 @@ class CacheReservationManager;
// internal use only. Each DB instance contains ptr to StallInterface.
class StallInterface {
public:
enum State {
BLOCKED = 0,
RUNNING,
};

virtual ~StallInterface() {}

virtual void Block() = 0;

virtual void Signal() = 0;

virtual void SetState(State /* state */) {}
};

// This class is thread-safe
class WriteBufferManager final {
public:
// Parameters:
// _buffer_size: _buffer_size = 0 indicates no limit. Memory won't be capped.
// memory_usage() won't be valid and ShouldFlush() will always return true.
// _buffer_size: the memory limit. _buffer_size = 0 indicates no limit where
// memory won't be capped, memory_usage() won't be valid and ShouldFlush()
// will always return false.
//
// cache_: if `cache` is provided, we'll put dummy entries in the cache and
// cost the memory allocated to the cache. It can be used even if _buffer_size
Expand All @@ -56,121 +65,134 @@ class WriteBufferManager final {

~WriteBufferManager();

// Returns true if buffer_limit is passed to limit the total memory usage and
// is greater than 0.
// Returns true if memory limit exixts (i.e, buffer size > 0);
//
// WARNING: If running without syncronization with `SetBufferSize()`, this
// function might not return the latest result changed by `SetBufferSize()`
// but an old result.
bool enabled() const { return buffer_size() > 0; }

// Returns true if pointer to cache is passed.
bool cost_to_cache() const { return cache_res_mgr_ != nullptr; }

// Returns the total memory used by memtables.
// Only valid if enabled()
// Returns the total memory used by memtables. Only valid if enabled()=true
//
// WARNING: If running without syncronization with `ReserveMem()/FreeMem()`,
// this function might not return the latest result changed by these functions
// but an old result.
size_t memory_usage() const {
return memory_used_.load(std::memory_order_relaxed);
}

// Returns the total memory used by active memtables.
//
// WARNING: If running without syncronization with
// `ReserveMem()/ScheduleFreeMem()`, this function might not return the latest
// result changed by these functions but an old result.
size_t mutable_memtable_memory_usage() const {
return memory_active_.load(std::memory_order_relaxed);
}

// Return the number of dummy entries put in the cache used to cost the memory
// accounted by this WriteBufferManager to the cache
//
// WARNING: If running without syncronization with `ReserveMem()/FreeMem()`,
// this function might not return the latest result changed by these functions
// but an old result.
size_t dummy_entries_in_cache_usage() const;

// Returns the buffer_size.
//
// WARNING: If running without syncronization with `SetBufferSize()`, this
// function might not return the latest result changed by `SetBufferSize()`
// but an old result.
size_t buffer_size() const {
return buffer_size_.load(std::memory_order_relaxed);
}

void SetBufferSize(size_t new_size) {
buffer_size_.store(new_size, std::memory_order_relaxed);
mutable_limit_.store(new_size * 7 / 8, std::memory_order_relaxed);
// Check if stall is active and can be ended.
MaybeEndWriteStall();
}
// REQUIRED: new_size != 0
void SetBufferSize(size_t new_size);

// Below functions should be called by RocksDB internally.

// Should only be called from write thread
bool ShouldFlush() const {
if (enabled()) {
if (mutable_memtable_memory_usage() >
mutable_limit_.load(std::memory_order_relaxed)) {
return true;
}
size_t local_size = buffer_size();
if (memory_usage() >= local_size &&
mutable_memtable_memory_usage() >= local_size / 2) {
// If the memory exceeds the buffer size, we trigger more aggressive
// flush. But if already more than half memory is being flushed,
// triggering more flush may not help. We will hold it instead.
return true;
}
}
return false;
}
void SetAllowStall(bool new_allow_stall);

// Returns true if total memory usage exceeded buffer_size.
// We stall the writes untill memory_usage drops below buffer_size. When the
// function returns true, all writer threads (including one checking this
// condition) across all DBs will be stalled. Stall is allowed only if user
// pass allow_stall = true during WriteBufferManager instance creation.
// WARNING: Should only be called from write thread
// WARNING: Should only be called by RocksDB internally
bool ShouldFlush() const;

// Returns true if stall conditions are met.
// Stall conditions: stall is allowed AND memory limit is set (i.e, buffer
// size > 0) AND total memory usage accounted by this WriteBufferManager
// exceeds the memory limit.
//
// WARNING: Should only be called by RocksDB internally.
//
// Should only be called by RocksDB internally .
// WARNING: If running without syncronization with any functions that could
// change the stall conditions above, this function might not return the
// latest result changed by these functions but an old result.
bool ShouldStall() const {
if (!allow_stall_ || !enabled()) {
if (allow_stall_.load(std::memory_order_relaxed) && enabled() &&
IsStallThresholdExceeded()) {
return true;
} else {
return false;
}

return IsStallActive() || IsStallThresholdExceeded();
}

// Returns true if stall is active.
bool IsStallActive() const {
return stall_active_.load(std::memory_order_relaxed);
}

// Returns true if stalling condition is met.
bool IsStallThresholdExceeded() const {
return memory_usage() >= buffer_size_;
}

// WARNING: Should only be called by RocksDB internally.
void ReserveMem(size_t mem);

// We are in the process of freeing `mem` bytes, so it is not considered
// when checking the soft limit.
//
// WARNING: Should only be called by RocksDB internally.
void ScheduleFreeMem(size_t mem);

// WARNING: Should only be called by RocksDB internally.
void FreeMem(size_t mem);

// Add the DB instance to the queue and block the DB.
// Should only be called by RocksDB internally.
void BeginWriteStall(StallInterface* wbm_stall);
// If stall conditions are met, WriteBufferManager
// will prepare for write stall (including changing `wbm_stall`'s state
// to be `State::Blocked`). Otherwise, this function does nothing.
//
// WARNING: Should only be called by RocksDB internally.
void MaybeBeginWriteStall(StallInterface* wbm_stall);

// WARNING: Should only be called by RocksDB internally.
void RemoveDBFromQueue(StallInterface* wbm_stall);

private:
Copy link
Contributor Author

@hx235 hx235 Mar 17, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cleanup1: make functions that are marked as "should be called within RocksDB internal" private

// If stall conditions have resolved, remove DB instances from queue and
// signal them to continue.
//
// Called when stall conditions (see `ShouldStall()` API) might have been
// changed
//
// REQUIRED: wbm_mutex_ held
void MaybeEndWriteStall();

void RemoveDBFromQueue(StallInterface* wbm_stall);
// REQUIRED: wbm_mutex_ held
bool IsStallThresholdExceeded() const {
return memory_usage() >= buffer_size_;
}

// WARNING: Should only be called from write thread
// REQUIRED: wbm_mutex_ held
void ReserveMemWithCache(size_t mem);

// REQUIRED: wbm_mutex_ held
void FreeMemWithCache(size_t mem);

// Mutex used to protect WriteBufferManager's data variables.
mutable std::mutex wbm_mutex_;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cleanup 6: use 1 instead of multiple mutex to guard separate groups of WBM data for simplicity.


private:
std::atomic<size_t> buffer_size_;
std::atomic<size_t> mutable_limit_;
std::atomic<size_t> memory_used_;
// Memory that hasn't been scheduled to free.
std::atomic<size_t> memory_active_;
std::shared_ptr<CacheReservationManager> cache_res_mgr_;
// Protects cache_res_mgr_
std::mutex cache_res_mgr_mu_;

std::list<StallInterface*> queue_;
// Protects the queue_ and stall_active_.
std::mutex mu_;
bool allow_stall_;
// Value should only be changed by BeginWriteStall() and MaybeEndWriteStall()
// while holding mu_, but it can be read without a lock.
std::atomic<bool> stall_active_;
Comment on lines -169 to -171
Copy link
Contributor Author

@hx235 hx235 Mar 17, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cleanup2: I don't see how stall_active_ and its related functions are needed. It seems like as long as we have our stall condition check (ShouldStall()) implemented right, then we will always have stall_active_ == ShouldStall(). But let me know if I overlook anything :)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@akankshamahajan15 shared an perspective of stall_active_ might exists as a perf optimization to reduce lock contention for the case of multiple DB using same WBM. She will cite more previous discussion on this soon.

I am not entirely sure about keeping this yet mainly for the reason that such perf optimization makes the concurrency model of WBM harder to understand as some can be access without lock while some can't.

[TODO for me] Understand the previous conversation on having stall_active_ the need of that; reconsider again the perf cost VS model simplicity


void ReserveMemWithCache(size_t mem);
void FreeMemWithCache(size_t mem);
std::atomic<bool> allow_stall_;
};
} // namespace ROCKSDB_NAMESPACE
Loading