Skip to content

Commit a530459

Browse files
committed
MB-69328: Add FBR/Fusion stat to interlock with cache transfer
Provide a new stat group and stat to return if the cache transfer has completed and the vbucket move can continue. vb_1:snapshot_rebalance_continue true The expectation is that ns_server is only reading/using this stat after a vbucket was created from a snapshot. E.g. when a setvbucket used the "use_snapshot=fbr|fusion" option. In those cases the vbucket is expected to have an empty cache and require cache population (network cache transfer). The flag remains false until KV concludes the transfer (CacheTransferEnd received). ns_server will wait on this stat in the fusion/fbr vbucket move path before proceeding to takover or before concluding a replica move is ready. If a fbr/fusion vbucket changes state we also flip this flag to true. For any vbucket created by warmup or setvbucket then this value defaults to true. Change-Id: I66354b4f44d49b543612989497613328ab8ccdfe Reviewed-on: https://review.couchbase.org/c/kv_engine/+/236359 Reviewed-by: Trond Norbye <[email protected]> Tested-by: Build Bot <[email protected]>
1 parent a98aa3b commit a530459

File tree

17 files changed

+132
-11
lines changed

17 files changed

+132
-11
lines changed

daemon/buckets_test.cc

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,9 +24,9 @@ TEST(BucketTest, Reset) {
2424
// check the main ones we run against.
2525
static constexpr size_t expectedBucketSize =
2626
#if defined(__linux) && defined(__x86_64__)
27-
5680;
27+
5696;
2828
#elif defined(__APPLE__)
29-
5776;
29+
5792;
3030
#else
3131
0;
3232
#endif

engines/ep/src/dcp/consumer.cc

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -365,6 +365,14 @@ cb::engine_errc DcpConsumer::addStream(uint32_t opaque,
365365
snap_end_seqno,
366366
high_seqno,
367367
vb_manifest_uid);
368+
369+
// This stream never did a cache transfer we can flag any snapshot type
370+
// rebalance can continue
371+
if (!isFlagSet(stream->getFlags(),
372+
cb::mcbp::DcpAddStreamFlag::CacheTransfer)) {
373+
vb->setSnapshotRebalanceCanContinue();
374+
}
375+
368376
registerStream(stream);
369377
readyStreamsVBQueue.lock()->push_back(vbucket);
370378
opaqueMap_[new_opaque] = std::make_pair(opaque, vbucket);

engines/ep/src/dcp/passive_stream.cc

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1530,14 +1530,8 @@ cb::engine_errc PassiveStream::processCacheTransferEnd(
15301530
return cb::engine_errc::not_my_vbucket;
15311531
}
15321532

1533-
// Log this for now, will remove and switch to stats later.
1534-
OBJ_LOG_INFO_CTX(*this,
1535-
"PassiveStream::processCacheTransferEnd: Cache transfer "
1536-
"end received",
1537-
{"vb", vb_});
1533+
vb->setSnapshotRebalanceCanContinue();
15381534

1539-
// @todo: update some state (Vbucket?) to indicate that the cache transfer
1540-
// is complete.
15411535
return cb::engine_errc::success;
15421536
}
15431537

engines/ep/src/ep_bucket.cc

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3312,6 +3312,42 @@ cb::engine_errc EPBucket::doSnapshotStatus(const StatCollector& collector,
33123312
return cb::engine_errc::success;
33133313
}
33143314

3315+
cb::engine_errc EPBucket::doSnapshotMoveStats(const StatCollector& collector,
3316+
std::string_view input) {
3317+
const std::string_view stat_key = "snapshot-move ";
3318+
// Configure for 1 vbid or all known vbuckets
3319+
std::vector<Vbid> ids;
3320+
if (input.size() > stat_key.size()) {
3321+
// input should be a vbid
3322+
input.remove_prefix(stat_key.size());
3323+
uint16_t vbucket_id(0);
3324+
if (!safe_strtous(input, vbucket_id)) {
3325+
return cb::engine_errc::invalid_arguments;
3326+
}
3327+
ids.emplace_back(vbucket_id);
3328+
} else {
3329+
// Populate for all possible vbuckets.
3330+
for (size_t i = 0; i < vbMap.getSize(); i++) {
3331+
ids.emplace_back(i);
3332+
}
3333+
}
3334+
3335+
for (const auto id : ids) {
3336+
auto vb = getVBucket(id);
3337+
if (!vb) {
3338+
continue;
3339+
}
3340+
fmt::memory_buffer key;
3341+
fmt::format_to(std::back_inserter(key),
3342+
"vb_{}:snapshot_rebalance_continue",
3343+
id.get());
3344+
collector.addStat(std::string_view(key.data(), key.size()),
3345+
vb->canSnapshotRebalanceContinue());
3346+
}
3347+
3348+
return cb::engine_errc::success;
3349+
}
3350+
33153351
cb::engine_errc EPBucket::doSnapshotDeks(const StatCollector& collector) {
33163352
// Populate for all possible vbuckets.
33173353
for (Vbid::id_type i = 0; i < vbMap.getSize(); i++) {

engines/ep/src/ep_bucket.h

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -490,6 +490,13 @@ class EPBucket : public KVBucket {
490490
cb::engine_errc doSnapshotStatus(const StatCollector&,
491491
std::string_view) override;
492492

493+
/**
494+
* Handle the brief snapshot-move stat which is used by ns_server to
495+
* monitor moves.
496+
*/
497+
cb::engine_errc doSnapshotMoveStats(const StatCollector&,
498+
std::string_view) override;
499+
493500
/**
494501
* Handle the brief snapshot-deks stat which is used by ns_server to
495502
* determine which deks are in use in snapshots. Deks are returned

engines/ep/src/ep_engine.cc

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5340,6 +5340,9 @@ cb::engine_errc EventuallyPersistentEngine::getStats(
53405340
if (key == "snapshot-deks"sv) {
53415341
return getKVBucket()->doSnapshotDeks(bucketCollector);
53425342
}
5343+
if (key.starts_with("snapshot-move")) {
5344+
return getKVBucket()->doSnapshotMoveStats(bucketCollector, key);
5345+
}
53435346

53445347
// Unknown stat requested
53455348
return cb::engine_errc::no_such_key;

engines/ep/src/ep_vb.cc

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,19 @@
4747
#include <statistics/collector.h>
4848
#include <utilities/logtags.h>
4949

50+
static bool getInitialSnapshotRebalanceCanContinue(
51+
CreateVbucketMethod creationMethod) {
52+
switch (creationMethod) {
53+
case CreateVbucketMethod::FBR:
54+
case CreateVbucketMethod::Fusion:
55+
return false;
56+
case CreateVbucketMethod::Warmup:
57+
case CreateVbucketMethod::SetVbucket:
58+
return true;
59+
}
60+
folly::assume_unreachable();
61+
}
62+
5063
EPVBucket::EPVBucket(Vbid i,
5164
vbucket_state_t newState,
5265
EPStats& st,
@@ -103,7 +116,9 @@ EPVBucket::EPVBucket(Vbid i,
103116
maxPrepareSeqno),
104117
shard(kvshard),
105118
rangeScans(static_cast<EPBucket*>(bucket), *this),
106-
canReceiveCacheTransfer(newState == vbucket_state_replica) {
119+
canReceiveCacheTransfer(newState == vbucket_state_replica),
120+
snapshotRebalanceCanContinue(
121+
getInitialSnapshotRebalanceCanContinue(creationMethod)) {
107122
if (config.isBfilterEnabled()) {
108123
bFilterData.lock()->kvStoreBfilterEnabled = true;
109124
}

engines/ep/src/ep_vb.h

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -387,6 +387,14 @@ class EPVBucket : public VBucket {
387387
canReceiveCacheTransfer = false;
388388
}
389389

390+
void setSnapshotRebalanceCanContinue() override {
391+
snapshotRebalanceCanContinue = true;
392+
}
393+
394+
bool canSnapshotRebalanceContinue() const override {
395+
return snapshotRebalanceCanContinue.load();
396+
}
397+
390398
protected:
391399
/**
392400
* queue a background fetch of the specified item.
@@ -564,6 +572,10 @@ class EPVBucket : public VBucket {
564572
/// Flag to indicate if the vbucket can receive a DCP cache transfer.
565573
std::atomic<bool> canReceiveCacheTransfer{true};
566574

575+
/// Flag to indicate if the vbucket can proceed in a snapshot based
576+
/// rebalance, i.e. FBR/Fusion
577+
std::atomic<bool> snapshotRebalanceCanContinue{false};
578+
567579
friend class EPVBucketTest;
568580
};
569581

engines/ep/src/kv_bucket_iface.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -985,6 +985,11 @@ class KVBucketIface {
985985
return cb::engine_errc::not_supported;
986986
}
987987

988+
[[nodiscard]] virtual cb::engine_errc doSnapshotMoveStats(
989+
const StatCollector&, std::string_view) {
990+
return cb::engine_errc::not_supported;
991+
}
992+
988993
[[nodiscard]] virtual cb::engine_errc doSnapshotDeks(const StatCollector&) {
989994
return cb::engine_errc::not_supported;
990995
}

engines/ep/src/vbucket.cc

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -699,7 +699,6 @@ void VBucket::setState_UNLOCKED(
699699
const nlohmann::json* meta,
700700
const std::unique_lock<folly::SharedMutex>& vbStateLock) {
701701
vbucket_state_t oldstate = state;
702-
703702
const bool changingState = to != oldstate;
704703

705704
// Validate (optional) meta content.
@@ -747,6 +746,10 @@ void VBucket::setState_UNLOCKED(
747746
// to replica and do a cache transfer in situations we've not
748747
// considered.
749748
disableCacheTransfer();
749+
750+
// Any state change and this vbucket should no longer block a future
751+
// rebalance
752+
setSnapshotRebalanceCanContinue();
750753
}
751754
}
752755

@@ -3471,6 +3474,10 @@ void VBucket::_addStats(VBucketStatsDetailLevel detail,
34713474
addStat("persistence_seqno", getPersistenceSeqno(), add_stat, c);
34723475
hlc.addStats(statPrefix, add_stat, c);
34733476
addStat("creation_method", to_string(creationMethod), add_stat, c);
3477+
addStat("snapshot_rebalance_continue",
3478+
canSnapshotRebalanceContinue(),
3479+
add_stat,
3480+
c);
34743481
}
34753482
// fallthrough
34763483
case VBucketStatsDetailLevel::Durability:

0 commit comments

Comments
 (0)