Skip to content

Commit 39db4d1

Browse files
committed
MB-69674: Yield/Cancel when over the backfill threshold
As the CacheTransfer task can raise memory usage. all_keys:true yield (don't even visit) if we're over the backfill threshold. This would mean a value-eviction rebalance is delayed, but similarly any DCP rebalance which is backfilling under high memory would be delayed until high memory resolves. all_keys:false cancel the transfer, allowing a rebalance to complete rather than waiting for high memory situation to resolve. This is in my view preferable, a rebalance maybe occuring to resolve some problem, and waiting for memory to reduce just so we can populate the remote cache to the maximum is of a less priority. Change-Id: I838d93bd53e74a2f8d832f925c0900b5138214d5 Reviewed-on: https://review.couchbase.org/c/kv_engine/+/238356 Tested-by: Build Bot <build@couchbase.com> Reviewed-by: Paolo Cocchi <paolo.cocchi@couchbase.com>
1 parent e9d3995 commit 39db4d1

File tree

2 files changed

+110
-0
lines changed

2 files changed

+110
-0
lines changed

engines/ep/src/dcp/cache_transfer_stream.cc

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -252,6 +252,27 @@ bool CacheTransferTask::run() {
252252
return false;
253253
}
254254

255+
// Yield an all_keys transfer when memory usage is over the backfill
256+
// threshold, or cancel when this is not an all_keys transfer.
257+
if (engine->getKVBucket()->isMemUsageAboveBackfillThreshold()) {
258+
if (isAllKeys) {
259+
snooze(maybeLogHighMemoryPressure(
260+
visitor.getStream(),
261+
"task yielded due to backfill threshold"));
262+
// run again
263+
return true;
264+
}
265+
auto& stream = visitor.getStream();
266+
OBJ_LOG_WARN_CTX(stream,
267+
"CacheTransferTask::run: cancelling transfer due "
268+
"to backfill "
269+
"threshold",
270+
{"vb", vbid});
271+
stream.cancelTransfer();
272+
// stop running the task
273+
return false;
274+
}
275+
255276
// Ensure we always teardown the hash table visit (drops stream reference)
256277
auto guard = folly::makeGuard([this] { visitor.tearDownHashTableVisit(); });
257278

engines/ep/tests/module_tests/dcp_cache_transfer_test.cc

Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,12 @@ class DcpCacheTransferTest : public STParameterizedBucketTest {
8888
"for vb:0");
8989
}
9090

91+
/**
92+
* Helper method to test backfill threshold behavior.
93+
* @param allKeys Whether to use all_keys mode
94+
*/
95+
void testBackfillThresholdBehavior(bool allKeys);
96+
9197
std::unordered_set<Item> expectedItems;
9298
};
9399

@@ -656,6 +662,89 @@ TEST_P(DcpCacheTransferTest, key_only_transfer) {
656662
EXPECT_EQ(cb::mcbp::DcpStreamEndStatus::Ok, producers.last_end_status);
657663
}
658664

665+
void DcpCacheTransferTest::testBackfillThresholdBehavior(bool allKeys) {
666+
// This test relies on precise memory tracking
667+
const auto& stats = engine->getEpStats();
668+
if (!stats.isMemoryTrackingEnabled()) {
669+
GTEST_SKIP();
670+
}
671+
672+
// Need static clock for this test so we can run the yielded task
673+
cb::time::steady_clock::use_chrono = false;
674+
auto scopeGuard = folly::makeGuard(
675+
[]() { cb::time::steady_clock::use_chrono = true; });
676+
677+
// Store items for testing
678+
expectedItems.insert(store_item(Vbid(0), makeStoredDocKey("k2"), "2"));
679+
680+
// Remember original max data size
681+
const auto originalMaxDataSize = stats.getMaxDataSize();
682+
683+
// Set bucket quota using current memory usage such that we exceed the
684+
// backfill threshold
685+
double backfillThreshold =
686+
engine->getConfiguration().getBackfillMemThreshold() / 100.0;
687+
engine->setMaxDataSize(stats.getPreciseTotalMemoryUsed() /
688+
backfillThreshold);
689+
690+
// Verify we're actually above the threshold
691+
ASSERT_TRUE(store->isMemUsageAboveBackfillThreshold());
692+
693+
// Create stream with or without all_keys based on parameter
694+
auto stream = createStream(
695+
*producer,
696+
1,
697+
Vbid(0),
698+
store->getVBucket(vbid)->getHighSeqno(),
699+
store->getVBucket(vbid)->getHighSeqno(),
700+
nlohmann::json{{"cts", {{"all_keys", allKeys}}}}.dump());
701+
702+
// Run the cache transfer task
703+
runCacheTransferTask();
704+
705+
if (allKeys) {
706+
// Yield case: stream should remain active, no items queued yet
707+
EXPECT_TRUE(stream->isActive());
708+
EXPECT_EQ(0, stream->getItemsRemaining())
709+
<< "Task should yield without queueing items or stream-end";
710+
711+
// Restore memory to normal
712+
engine->setMaxDataSize(originalMaxDataSize);
713+
ASSERT_FALSE(store->isMemUsageAboveBackfillThreshold());
714+
715+
// Advance time so the yielded task can run again
716+
cb::time::steady_clock::advance(std::chrono::seconds(1));
717+
718+
// Now the transfer should complete successfully
719+
runCacheTransferTask();
720+
ASSERT_EQ(3, stream->getItemsRemaining());
721+
EXPECT_TRUE(stream->validateNextResponse(expectedItems));
722+
EXPECT_TRUE(stream->validateNextResponse(expectedItems));
723+
EXPECT_TRUE(stream->validateNextResponseIsEnd());
724+
} else {
725+
// Cancel case: stream should be cancelled with stream-end queued
726+
EXPECT_FALSE(stream->isActive());
727+
ASSERT_EQ(1, stream->getItemsRemaining())
728+
<< "Only stream-end should be queued, no items transferred";
729+
EXPECT_TRUE(stream->validateNextResponseIsEnd());
730+
731+
// Restore original memory limit
732+
engine->setMaxDataSize(originalMaxDataSize);
733+
}
734+
}
735+
736+
// Test that CacheTransferTask yields when all_keys=true and memory usage is
737+
// above backfill threshold
738+
TEST_P(DcpCacheTransferTest, backfill_threshold_all_keys_yields) {
739+
testBackfillThresholdBehavior(true);
740+
}
741+
742+
// Test that CacheTransferTask cancels when all_keys=false and memory usage is
743+
// above backfill threshold
744+
TEST_P(DcpCacheTransferTest, backfill_threshold_cancels) {
745+
testBackfillThresholdBehavior(false);
746+
}
747+
659748
// We only test persistent buckets. Ephemeral doesn't apply nor will it work as
660749
// we cannot transfer the linked list or system events...
661750
INSTANTIATE_TEST_SUITE_P(

0 commit comments

Comments
 (0)