Skip to content

Commit 6665d1a

Browse files
mohammadzaeempaolococchi
authored andcommitted
MB-70281: Add fusion max pending upload bytes to memcached config
This patch adds the following two fusion configuration parameters to memcached settings along with a stat to expose each one: * fusion_max_pending_upload_bytes * fusion_max_pending_upload_bytes_lwm_ratio Also, expose fusion_pending_upload_bytes as a memcached global stat, which is retrieved from magma's GetFusionPendingUploadBytes() API. Change-Id: If55c029c6f81b4d16f410cde7983343b24e6e628 Reviewed-on: https://review.couchbase.org/c/kv_engine/+/239483 Tested-by: Build Bot <build@couchbase.com> Reviewed-by: Paolo Cocchi <paolo.cocchi@couchbase.com>
1 parent cb8de71 commit 6665d1a

File tree

9 files changed

+234
-32
lines changed

9 files changed

+234
-32
lines changed

daemon/memcached.cc

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -373,6 +373,20 @@ static void settings_init() {
373373
magma::Magma::SetNumThreads(magma::Magma::FusionMigrator,
374374
s.getFusionNumMigratorThreads());
375375
});
376+
settings.addChangeListener(
377+
"fusion_max_pending_upload_bytes",
378+
[&settings](const auto&, auto& s) {
379+
magma::Magma::SetFusionPendingUploadThresholds(
380+
s.getFusionMaxPendingUploadBytes(),
381+
settings.getFusionMaxPendingUploadBytesLwmRatio());
382+
});
383+
settings.addChangeListener(
384+
"fusion_max_pending_upload_bytes_lwm_ratio",
385+
[&settings](const auto&, auto& s) {
386+
magma::Magma::SetFusionPendingUploadThresholds(
387+
settings.getFusionMaxPendingUploadBytes(),
388+
s.getFusionMaxPendingUploadBytesLwmRatio());
389+
});
376390
}
377391

378392
/**

daemon/settings.cc

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -420,6 +420,23 @@ void Settings::reconfigure(const nlohmann::json& json) {
420420
{"value", value.dump()},
421421
{"reason", "Fusion support is not enabled"});
422422
}
423+
} else if (key == "fusion_max_pending_upload_bytes") {
424+
if (isFusionSupportEnabled()) {
425+
setFusionMaxPendingUploadBytes(value.get<size_t>());
426+
} else {
427+
LOG_WARNING_CTX("Ignore fusion_max_pending_upload_bytes",
428+
{"value", value.dump()},
429+
{"reason", "Fusion support is not enabled"});
430+
}
431+
} else if (key == "fusion_max_pending_upload_bytes_lwm_ratio") {
432+
if (isFusionSupportEnabled()) {
433+
setFusionMaxPendingUploadBytesLwmRatio(value.get<double>());
434+
} else {
435+
LOG_WARNING_CTX(
436+
"Ignore fusion_max_pending_upload_bytes_lwm_ratio",
437+
{"value", value.dump()},
438+
{"reason", "Fusion support is not enabled"});
439+
}
423440
} else if (key == "phosphor_config"sv) {
424441
auto config = value.get<std::string>();
425442
// throw an exception if the config is invalid
@@ -1117,6 +1134,29 @@ void Settings::updateSettings(const Settings& other, bool apply) {
11171134
}
11181135
}
11191136

1137+
if (other.has.fusion_max_pending_upload_bytes) {
1138+
if (other.getFusionMaxPendingUploadBytes() !=
1139+
getFusionMaxPendingUploadBytes()) {
1140+
LOG_INFO_CTX("Change fusion max pending upload bytes",
1141+
{"from", getFusionMaxPendingUploadBytes()},
1142+
{"to", other.getFusionMaxPendingUploadBytes()});
1143+
setFusionMaxPendingUploadBytes(
1144+
other.getFusionMaxPendingUploadBytes());
1145+
}
1146+
}
1147+
1148+
if (other.has.fusion_max_pending_upload_bytes_lwm_ratio) {
1149+
if (other.getFusionMaxPendingUploadBytesLwmRatio() !=
1150+
getFusionMaxPendingUploadBytesLwmRatio()) {
1151+
LOG_INFO_CTX(
1152+
"Change fusion max pending upload bytes LWM ratio",
1153+
{"from", getFusionMaxPendingUploadBytesLwmRatio()},
1154+
{"to", other.getFusionMaxPendingUploadBytesLwmRatio()});
1155+
setFusionMaxPendingUploadBytesLwmRatio(
1156+
other.getFusionMaxPendingUploadBytesLwmRatio());
1157+
}
1158+
}
1159+
11201160
if (other.has.num_reader_threads &&
11211161
other.getNumReaderThreads() != getNumReaderThreads()) {
11221162
LOG_INFO_CTX("Change number of reader threads",

daemon/settings.h

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -363,6 +363,28 @@ class Settings {
363363
notify_changed("fusion_num_migrator_threads");
364364
}
365365

366+
size_t getFusionMaxPendingUploadBytes() const {
367+
return fusion_max_pending_upload_bytes.load(std::memory_order_acquire);
368+
}
369+
370+
void setFusionMaxPendingUploadBytes(size_t value) {
371+
fusion_max_pending_upload_bytes.store(value, std::memory_order_release);
372+
has.fusion_max_pending_upload_bytes = true;
373+
notify_changed("fusion_max_pending_upload_bytes");
374+
}
375+
376+
double getFusionMaxPendingUploadBytesLwmRatio() const {
377+
return fusion_max_pending_upload_bytes_lwm_ratio.load(
378+
std::memory_order_acquire);
379+
}
380+
381+
void setFusionMaxPendingUploadBytesLwmRatio(double value) {
382+
fusion_max_pending_upload_bytes_lwm_ratio.store(
383+
value, std::memory_order_release);
384+
has.fusion_max_pending_upload_bytes_lwm_ratio = true;
385+
notify_changed("fusion_max_pending_upload_bytes_lwm_ratio");
386+
}
387+
366388
size_t getMaxUserConnections() const {
367389
return getMaxConnections() - getSystemConnections();
368390
}
@@ -1435,6 +1457,14 @@ class Settings {
14351457
// The number of Fusion Migrator threads
14361458
std::atomic<size_t> fusion_num_migrator_threads{4};
14371459

1460+
// The maximum number of pending upload bytes to be synced across all
1461+
// volumes
1462+
std::atomic<size_t> fusion_max_pending_upload_bytes{0};
1463+
1464+
// The proportion of max_pending_upload_bytes beyond which syncs for volumes
1465+
// with the highest pending bytes are only allowed.
1466+
std::atomic<double> fusion_max_pending_upload_bytes_lwm_ratio{0.6};
1467+
14381468
/**
14391469
* Note that it is not safe to add new listeners after we've spun up
14401470
* new threads as we don't try to lock the object.
@@ -1704,6 +1734,8 @@ class Settings {
17041734
bool fusion_sync_rate_limit = false;
17051735
bool fusion_num_uploader_threads = false;
17061736
bool fusion_num_migrator_threads = false;
1737+
bool fusion_max_pending_upload_bytes = false;
1738+
bool fusion_max_pending_upload_bytes_lwm_ratio = false;
17071739
bool num_reader_threads = false;
17081740
bool num_writer_threads = false;
17091741
bool num_auxio_threads = false;

daemon/settings_test.cc

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1014,6 +1014,32 @@ TEST(SettingsUpdateTest, FusionNumMigratorThreads) {
10141014
EXPECT_EQ(4, settings.getFusionNumMigratorThreads());
10151015
}
10161016

1017+
TEST(SettingsUpdateTest, FusionMaxPendingUploadBytes) {
1018+
Settings updated;
1019+
Settings settings;
1020+
settings.setFusionMaxPendingUploadBytes(10_MiB);
1021+
// Setting to the same value succeeds
1022+
updated.setFusionMaxPendingUploadBytes(10_MiB);
1023+
settings.updateSettings(updated, false);
1024+
// Setting to a different value succeeds
1025+
updated.setFusionMaxPendingUploadBytes(50_MiB);
1026+
settings.updateSettings(updated, true);
1027+
EXPECT_EQ(50_MiB, settings.getFusionMaxPendingUploadBytes());
1028+
}
1029+
1030+
TEST(SettingsUpdateTest, FusionMaxPendingUploadBytesLwmRatio) {
1031+
Settings updated;
1032+
Settings settings;
1033+
settings.setFusionMaxPendingUploadBytesLwmRatio(0.7);
1034+
// Setting to the same value succeeds
1035+
updated.setFusionMaxPendingUploadBytesLwmRatio(0.7);
1036+
settings.updateSettings(updated, false);
1037+
// Setting to a different value succeeds
1038+
updated.setFusionMaxPendingUploadBytesLwmRatio(0.5);
1039+
settings.updateSettings(updated, true);
1040+
EXPECT_EQ(0.5, settings.getFusionMaxPendingUploadBytesLwmRatio());
1041+
}
1042+
10171043
TEST(SettingsUpdateTest, DefaultReqIsDynamic) {
10181044
Settings updated;
10191045
Settings settings;

daemon/stats.cc

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,17 @@ static void server_global_stats(const StatCollector& collector) {
8181
collector.addStat(
8282
Key::fusion_num_migrator_threads,
8383
magma::Magma::GetNumThreads(magma::Magma::FusionMigrator));
84+
const auto [maxPendingUploadBytes, lwmRatio] =
85+
magma::Magma::GetFusionPendingUploadThresholds();
86+
collector.addStat(Key::fusion_max_pending_upload_bytes,
87+
maxPendingUploadBytes);
88+
89+
// The lwm ratio will return 0 if maxPendingUploadBytes is 0,
90+
// else the ratio configured
91+
collector.addStat(Key::fusion_max_pending_upload_bytes_lwm_ratio,
92+
lwmRatio);
93+
collector.addStat(Key::fusion_pending_upload_bytes,
94+
magma::Magma::GetFusionPendingUploadBytes());
8495
}
8596

8697
auto sdks = SdkConnectionManager::instance().getConnectedSdks();

statistics/stat_definitions.json

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2095,6 +2095,24 @@
20952095
"added": "8.0.0"
20962096

20972097
},
2098+
{
2099+
"key": "fusion_max_pending_upload_bytes",
2100+
"unit": "bytes",
2101+
"description": "The maximum number of pending upload bytes to be synced across all volumes.",
2102+
"added": "8.1.0"
2103+
},
2104+
{
2105+
"key": "fusion_max_pending_upload_bytes_lwm_ratio",
2106+
"unit": "none",
2107+
"description": "The proportion of max_pending_upload_bytes beyond which syncs for volumes with highest pending bytes are only allowed.",
2108+
"added": "8.1.0"
2109+
},
2110+
{
2111+
"key": "fusion_pending_upload_bytes",
2112+
"unit": "bytes",
2113+
"description": "The total pending upload bytes across all fusion volumes.",
2114+
"added": "8.1.0"
2115+
},
20982116
/* EPBucket::getFileStats */
20992117
{
21002118
"key": "ep_db_data_size",

tests/testapp/testapp.cc

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -560,6 +560,8 @@ nlohmann::json TestappTest::generate_config() {
560560
ret["fusion_sync_rate_limit"] = 75_MiB;
561561
ret["fusion_num_uploader_threads"] = 1;
562562
ret["fusion_num_migrator_threads"] = 1;
563+
ret["fusion_max_pending_upload_bytes"] = 0;
564+
ret["fusion_max_pending_upload_bytes_lwm_ratio"] = 0.6;
563565
}
564566

565567
return ret;

0 commit comments

Comments
 (0)