diff --git a/fdbclient/ServerKnobs.cpp b/fdbclient/ServerKnobs.cpp index 895017222e1..204afb62eeb 100644 --- a/fdbclient/ServerKnobs.cpp +++ b/fdbclient/ServerKnobs.cpp @@ -609,7 +609,7 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi init( SHARDED_ROCKSDB_MAX_OPEN_FILES, 50000 ); // Should be smaller than OS's fd limit. init (SHARDED_ROCKSDB_READ_ASYNC_IO, false ); if (isSimulated) SHARDED_ROCKSDB_READ_ASYNC_IO = deterministicRandom()->coinflip(); init( SHARDED_ROCKSDB_PREFIX_LEN, 0 ); if( randomize && BUGGIFY ) SHARDED_ROCKSDB_PREFIX_LEN = deterministicRandom()->randomInt(1, 20); - + init ( SHARDED_ROCKSDB_CLEAR_RANGE_COMPACTION_LIMIT, 10 ); if (isSimulated) SHARDED_ROCKSDB_CLEAR_RANGE_COMPACTION_LIMIT = deterministicRandom()->randomInt(0, 100); // Leader election bool longLeaderElection = randomize && BUGGIFY; diff --git a/fdbclient/include/fdbclient/IKeyValueStore.actor.h b/fdbclient/include/fdbclient/IKeyValueStore.actor.h index c821bae366d..8176224c892 100644 --- a/fdbclient/include/fdbclient/IKeyValueStore.actor.h +++ b/fdbclient/include/fdbclient/IKeyValueStore.actor.h @@ -85,6 +85,11 @@ class IKeyValueStore : public IClosable { // Adds key range to a physical shard. virtual Future addRange(KeyRangeRef range, std::string id, bool active = true) { return Void(); } + // Creates new physical shards for key ranges. + virtual Future addRanges(std::vector> ranges, bool active = true) { + return Void(); + } + // Removes a key range from KVS and returns a list of empty physical shards after the removal. virtual std::vector removeRange(KeyRangeRef range) { return std::vector(); } diff --git a/fdbclient/include/fdbclient/ServerKnobs.h b/fdbclient/include/fdbclient/ServerKnobs.h index 7bd57d76854..53727e35340 100644 --- a/fdbclient/include/fdbclient/ServerKnobs.h +++ b/fdbclient/include/fdbclient/ServerKnobs.h @@ -573,6 +573,7 @@ class SWIFT_CXX_IMMORTAL_SINGLETON_TYPE ServerKnobs : public KnobsImpl SERVER_KNOBS->ROCKSDB_CF_RANGE_DELETION_LIMIT; } + bool shouldCompact() { + return SERVER_KNOBS->SHARDED_ROCKSDB_SUGGEST_COMPACT_CLEAR_RANGE || + numRangeDeletions > SERVER_KNOBS->SHARDED_ROCKSDB_CLEAR_RANGE_COMPACTION_LIMIT; + } + std::string toString() { std::string ret = "[ID]: " + this->id + ", [CF]: "; if (initialized()) { @@ -936,16 +941,18 @@ struct PhysicalShard { isInitialized.store(false); readIterPool.reset(); + /* // Deleting default column family is not allowed. if (deletePending && id != DEFAULT_CF_NAME) { - auto s = db->DropColumnFamily(cf); - if (!s.ok()) { - logRocksDBError(s, "DestroyShard"); - logShardEvent(id, ShardOp::DESTROY, SevError, s.ToString()); - return; - } + auto s = db->DropColumnFamily(cf); + if (!s.ok()) { + logRocksDBError(s, "DestroyShard"); + logShardEvent(id, ShardOp::DESTROY, SevError, s.ToString()); + return; + } } auto s = db->DestroyColumnFamilyHandle(cf); + */ if (!s.ok()) { logRocksDBError(s, "DestroyCFHandle"); logShardEvent(id, ShardOp::DESTROY, SevError, s.ToString()); @@ -1116,7 +1123,7 @@ class ShardManager { return Void(); } - rocksdb::Status init() { + rocksdb::Status init(SharedData* sharedData) { const double start = now(); // Open instance. TraceEvent(SevInfo, "ShardedRocksDBInitBegin", this->logId).detail("DataPath", path); @@ -1177,15 +1184,17 @@ class ShardManager { .detail("PhysicalShardCount", handles.size()); std::shared_ptr metadataShard = nullptr; + std::unique_lock lock(sharedData->mu); for (auto handle : handles) { auto shard = std::make_shared(db, handle->GetName(), handle); if (shard->id == METADATA_SHARD_ID) { metadataShard = shard; } physicalShards[shard->id] = shard; - columnFamilyMap[handle->GetID()] = handle; + sharedData->columnFamilyMap[handle->GetID()] = handle; TraceEvent(SevVerbose, "ShardedRocksInitPhysicalShard", this->logId).detail("ShardId", shard->id); } + lock.unlock(); std::set unusedShards(columnFamilies.begin(), columnFamilies.end()); unusedShards.erase(METADATA_SHARD_ID); @@ -1245,18 +1254,18 @@ class ShardManager { } } - for (const auto& name : unusedShards) { - auto it = physicalShards.find(name); - ASSERT(it != physicalShards.end()); - auto shard = it->second; - if (shard->dataShards.size() == 0) { - shard->deleteTimeSec = now(); - pendingDeletionShards.push_back(name); - TraceEvent(SevInfo, "UnusedPhysicalShard", logId).detail("ShardId", name); - } - } if (unusedShards.size() > 0) { TraceEvent("ShardedRocksDB", logId).detail("CleanUpUnusedShards", unusedShards.size()); + for (const auto& name : unusedShards) { + auto it = physicalShards.find(name); + ASSERT(it != physicalShards.end()); + auto shard = it->second; + if (shard->dataShards.size() == 0) { + shard->deleteTimeSec = now(); + pendingDeletionShards.push_back(name); + TraceEvent(SevInfo, "UnusedPhysicalShard", logId).detail("ShardId", name); + } + } } } else { // DB is opened with default shard. @@ -1373,6 +1382,56 @@ class ShardManager { return result; } + PhysicalShard* addRangeInternal(KeyRange range, std::string id, bool active) { + auto ranges = dataShardMap.intersectingRanges(range); + + for (auto it = ranges.begin(); it != ranges.end(); ++it) { + if (it.value()) { + if (it.value()->physicalShard->id == id) { + TraceEvent(SevError, "ShardedRocksDBAddRange") + .detail("ErrorType", "RangeAlreadyExist") + .detail("IntersectingRange", it->range()) + .detail("DataShardRange", it->value()->range) + .detail("ExpectedShardId", id) + .detail("PhysicalShardID", it->value()->physicalShard->toString()); + } else { + TraceEvent(SevError, "ShardedRocksDBAddRange") + .detail("ErrorType", "ConflictingRange") + .detail("IntersectingRange", it->range()) + .detail("DataShardRange", it->value()->range) + .detail("ExpectedShardId", id) + .detail("PhysicalShardID", it->value()->physicalShard->toString()); + } + return nullptr; + } + } + + auto cfOptions = active ? getCFOptions() : getCFOptionsForInactiveShard(); + auto [it, inserted] = physicalShards.emplace(id, std::make_shared(db, id, cfOptions)); + std::shared_ptr& shard = it->second; + + activePhysicalShardIds.emplace(id); + + auto dataShard = std::make_unique(range, shard.get()); + dataShardMap.insert(range, dataShard.get()); + shard->dataShards[range.begin.toString()] = std::move(dataShard); + return shard.get(); + } + + std::unordered_map addRanges(std::vector> ranges, + bool active) override { + TraceEvent(SevVerbose, "ShardedRocksDBAddRanges").detail("Size", range.size()); + std::unordered_map shards; + for (auto& [range, id] : ranges) { + auto shard = addRangeInternal(range, id, active); + if (shard != nullptr) { + shards[id] = shard; + } + } + validate(); + return shards; + } + PhysicalShard* addRange(KeyRange range, std::string id, bool active) { TraceEvent(SevVerbose, "ShardedRocksAddRangeBegin", this->logId).detail("Range", range).detail("ShardId", id); @@ -1854,8 +1913,6 @@ class ShardManager { rocksdb::DB* db = nullptr; std::unordered_map> physicalShards; std::unordered_set activePhysicalShardIds; - // Stores mapping between cf id and cf handle, used during compaction. - std::unordered_map columnFamilyMap; std::unique_ptr writeBatch; std::unique_ptr> dirtyShards; KeyRangeMap dataShardMap; @@ -2524,14 +2581,22 @@ struct ShardedRocksDBKeyValueStore : IKeyValueStore { return Void(); } - struct CompactionWorker : IThreadPoolReceiver { + struct ShardedData { + // Stores mapping between cf id and cf handle, used during compaction. + std::std::unordered_map columnFamilyMap; + std::mutex mu; + }; + + struct LowPriorityWorker : IThreadPoolReceiver { const UID logId; - explicit CompactionWorker(UID logId) : logId(logId) {} + SharedData* sharedData; + + explicit LowPriorityWorker(UID logId) : logId(logId) {} void init() override {} - ~CompactionWorker() override {} + ~LowPriorityWorker() override {} - struct CompactShardsAction : TypedAction { + struct CompactShardsAction : TypedAction { std::vector> shards; std::shared_ptr metadataShard; ThreadReturnPromise done; @@ -2594,15 +2659,16 @@ struct ShardedRocksDBKeyValueStore : IKeyValueStore { struct Writer : IThreadPoolReceiver { const UID logId; int threadIndex; - std::unordered_map* columnFamilyMap; + SharedData* sharedData; std::shared_ptr rocksDBMetrics; double sampleStartTime; explicit Writer(UID logId, int threadIndex, - std::unordered_map* columnFamilyMap, + std::unordered_map < uint32_t, + SharedData* sharedData, std::shared_ptr rocksDBMetrics) - : logId(logId), threadIndex(threadIndex), columnFamilyMap(columnFamilyMap), rocksDBMetrics(rocksDBMetrics), + : logId(logId), threadIndex(threadIndex), sharedData(sharedData), rocksDBMetrics(rocksDBMetrics), sampleStartTime(now()) {} ~Writer() override {} @@ -2648,22 +2714,38 @@ struct ShardedRocksDBKeyValueStore : IKeyValueStore { } struct AddShardAction : TypedAction { - PhysicalShard* shard; + std::unordered_map shards; + bool active; ThreadReturnPromise done; - AddShardAction(PhysicalShard* shard) : shard(shard) { ASSERT(shard); } + AddShardAction(std::unordered_map& shards, bool active) + : shards(shards), active(active) {} double getTimeEstimate() const override { return SERVER_KNOBS->COMMIT_TIME_ESTIMATE; } }; void action(AddShardAction& a) { - auto s = a.shard->init(); - if (!s.ok()) { - TraceEvent(SevError, "AddShardError").detail("Status", s.ToString()).detail("ShardId", a.shard->id); - a.done.sendError(statusToError(s)); + std::vector* handles; + // Bulk creates column families to avoid extra overhead. + auto cfOptions = a.active ? getCFOptions() : getCFOptionsForInactiveShard(); + std::vector ids; + for (auto& [id, _] : a.shards) { + ids.push_back(id); + } + db->CreateColumnFamilies(cfOptions, a.ids, &handles); + if (handles.size() != ids.size()) { + // Reboot SS. + a.done.send(internal_error()); return; } - ASSERT(a.shard->cf); - (*columnFamilyMap)[a.shard->cf->GetID()] = a.shard->cf; + + { + std::unique_lock lock(sharedData->mu); + for (auto* handle : handles) { + ASSERT(a.shards.contains(handle->GetID())); + a.shards[handle->GetID()]->cf = handle; + sharedData->columnFamilyMap[handle->GetID()] = handle; + } + } a.done.send(Void()); } @@ -2679,13 +2761,34 @@ struct ShardedRocksDBKeyValueStore : IKeyValueStore { void action(RemoveShardAction& a) { auto start = now(); - for (auto& shard : a.shards) { - shard->deletePending = true; - columnFamilyMap->erase(shard->cf->GetID()); - a.metadataShard->db->Delete( - rocksdb::WriteOptions(), a.metadataShard->cf, compactionTimestampPrefix.toString() + shard->id); + std::vector cfHandles; + { + std::unique_lock lock(sharedData->mu); + for (auto& shard : a.shards) { + cfHandles.push_back(shard->cf); + // shard->deletePending = true; + columnFamilyMap->erase(shard->cf->GetID()); + a.metadataShard->db->Delete( + rocksdb::WriteOptions(), a.metadataShard->cf, compactionTimestampPrefix.toString() + shard->id); + } + } + + auto s = a.metadataShard->db->DropColumnFamilies(cfHandles); + if (!s.ok()) { + a.done.send(internal_error()); + return; } TraceEvent("RemoveShardTime").detail("Duration", now() - start).detail("Size", a.shards.size()); + + for (auto& shard : a.shards) { + auto s = db->DestroyColumnFamilyHandle(cf); + if (!s.ok()) { + logRocksDBError(s, "DestroyCFHandle"); + logShardEvent(id, ShardOp::DESTROY, SevError, s.ToString()); + return; + } + } + a.shards.clear(); a.done.send(Void()); } @@ -2694,7 +2797,6 @@ struct ShardedRocksDBKeyValueStore : IKeyValueStore { rocksdb::DB* db; std::unique_ptr writeBatch; std::unique_ptr> dirtyShards; - const std::unordered_map* columnFamilyMap; ThreadReturnPromise done; double startTime; bool getHistograms; @@ -2702,10 +2804,8 @@ struct ShardedRocksDBKeyValueStore : IKeyValueStore { double getTimeEstimate() const override { return SERVER_KNOBS->COMMIT_TIME_ESTIMATE; } CommitAction(rocksdb::DB* db, std::unique_ptr writeBatch, - std::unique_ptr> dirtyShards, - std::unordered_map* columnFamilyMap) - : db(db), writeBatch(std::move(writeBatch)), dirtyShards(std::move(dirtyShards)), - columnFamilyMap(columnFamilyMap) { + std::unique_ptr> dirtyShards) + : db(db), writeBatch(std::move(writeBatch)), dirtyShards(std::move(dirtyShards)) { if (deterministicRandom()->random01() < SERVER_KNOBS->ROCKSDB_HISTOGRAMS_SAMPLE_RATE) { getHistograms = true; startTime = timer_monotonic(); @@ -2802,9 +2902,10 @@ struct ShardedRocksDBKeyValueStore : IKeyValueStore { } if (SERVER_KNOBS->SHARDED_ROCKSDB_SUGGEST_COMPACT_CLEAR_RANGE) { + std::unique_lock lock(sharedData->mu); for (const auto& [id, range] : deletes) { - auto cf = columnFamilyMap->find(id); - ASSERT(cf != columnFamilyMap->end()); + auto cf = sharedData->columnFamilyMap.find(id); + ASSERT(cf != sharedData->columnFamilyMap.end()); auto begin = toSlice(range.begin); auto end = toSlice(range.end); @@ -3591,15 +3692,15 @@ struct ShardedRocksDBKeyValueStore : IKeyValueStore { if (g_network->isSimulated()) { TraceEvent(SevDebug, "ShardedRocksDB").detail("Info", "Use Coro threads in simulation."); writeThread = CoroThreadPool::createThreadPool(); - compactionThread = CoroThreadPool::createThreadPool(); + lowPriorityThread = CoroThreadPool::createThreadPool(); readThreads = CoroThreadPool::createThreadPool(); } else { writeThread = createGenericThreadPool(/*stackSize=*/0, SERVER_KNOBS->ROCKSDB_WRITER_THREAD_PRIORITY); - compactionThread = createGenericThreadPool(0, SERVER_KNOBS->ROCKSDB_COMPACTION_THREAD_PRIORITY); + lowPriorityThread = createGenericThreadPool(0, SERVER_KNOBS->ROCKSDB_COMPACTION_THREAD_PRIORITY); readThreads = createGenericThreadPool(/*stackSize=*/0, SERVER_KNOBS->ROCKSDB_READER_THREAD_PRIORITY); } - writeThread->addThread(new Writer(id, 0, shardManager.getColumnFamilyMap(), rocksDBMetrics), "fdb-rocksdb-wr"); - compactionThread->addThread(new CompactionWorker(id), "fdb-rocksdb-cw"); + writeThread->addThread(new Writer(id, 0, &shardedData, rocksDBMetrics), "fdb-rocksdb-wr"); + lowPriorityThread->addThread(new LowPriorityWorker(id, &shardedData), "fdb-rocksdb-cw"); TraceEvent("ShardedRocksDBReadThreads", id) .detail("KnobRocksDBReadParallelism", SERVER_KNOBS->ROCKSDB_READ_PARALLELISM); for (unsigned i = 0; i < SERVER_KNOBS->ROCKSDB_READ_PARALLELISM; ++i) { @@ -3636,7 +3737,7 @@ struct ShardedRocksDBKeyValueStore : IKeyValueStore { try { wait(self->writeThread->stop()); - wait(self->compactionThread->stop()); + wait(self->lowPriorityThread->stop()); } catch (Error& e) { TraceEvent(SevError, "ShardedRocksCloseWriteThreadError").errorUnsuppressed(e); } @@ -3674,7 +3775,7 @@ struct ShardedRocksDBKeyValueStore : IKeyValueStore { this->metrics = ShardManager::shardMetricsLogger(this->rState, openFuture, &shardManager) && rocksDBAggregatedMetricsLogger(this->rState, openFuture, rocksDBMetrics, &shardManager, this->path); - this->compactionJob = compactShards(this->rState, openFuture, &shardManager, compactionThread); + this->lowPriorityJob = compactShards(this->rState, openFuture, &shardManager, lowPriorityThread); this->refreshHolder = refreshReadIteratorPools(this->rState, openFuture, shardManager.getAllShards()); this->refreshRocksDBBackgroundWorkHolder = refreshRocksDBBackgroundEventCounter(this->id, this->eventListener); @@ -3686,14 +3787,25 @@ struct ShardedRocksDBKeyValueStore : IKeyValueStore { } Future addRange(KeyRangeRef range, std::string id, bool active) override { + /* auto shard = shardManager.addRange(range, id, active); if (shard->initialized()) { - return Void(); + return Void(); } auto a = new Writer::AddShardAction(shard); Future res = a->done.getFuture(); writeThread->post(a); return res; + */ + return Void(); + } + + Future addRanges(std::vector> ranges, bool active) override { + auto shards = shardManager.addRanges(ranges, active); + auto a = new Writer::AddShardAction(shards); + Future res = a->done.getFuture(); + writeThread->post(a); + return res; } void markRangeAsActive(KeyRangeRef range) override { shardManager.markRangeAsActive(range); } @@ -3938,7 +4050,7 @@ struct ShardedRocksDBKeyValueStore : IKeyValueStore { } if (shards.size() > 0) { - auto a = new CompactionWorker::CompactShardsAction(shards, shardManager->getMetaDataShard()); + auto a = new LowPriorityWorker::CompactShardsAction(shards, shardManager->getMetaDataShard()); auto res = a->done.getFuture(); thread->post(a); wait(res); @@ -4035,8 +4147,9 @@ struct ShardedRocksDBKeyValueStore : IKeyValueStore { std::string path; UID id; std::set keysSet; + ShardedData sharedData; Reference writeThread; - Reference compactionThread; + Reference lowPriorityThread; Reference readThreads; Future errorFuture; Promise closePromise; @@ -4047,7 +4160,7 @@ struct ShardedRocksDBKeyValueStore : IKeyValueStore { FlowLock fetchSemaphore; int numFetchWaiters; Counters counters; - Future compactionJob; + Future lowPriorityJob; Future refreshHolder; Future refreshRocksDBBackgroundWorkHolder; Future cleanUpJob; diff --git a/fdbserver/storageserver.actor.cpp b/fdbserver/storageserver.actor.cpp index 1b6aba9fdee..edb9ab7cde2 100644 --- a/fdbserver/storageserver.actor.cpp +++ b/fdbserver/storageserver.actor.cpp @@ -578,6 +578,10 @@ struct StorageServerDisk { void writeKeyValue(KeyValueRef kv); void clearRange(KeyRangeRef keys); + Future addRanges(std::vector> ranges) { + return storage->addRanges(ranges, !SERVER_KNOBS->SHARDED_ROCKSDB_DELAY_COMPACTION_FOR_DATA_MOVE); + } + Future addRange(KeyRangeRef range, std::string id) { return storage->addRange(range, id, !SERVER_KNOBS->SHARDED_ROCKSDB_DELAY_COMPACTION_FOR_DATA_MOVE); } @@ -1017,24 +1021,11 @@ struct StorageServer : public IStorageMetricsService { WatchMap_t watchMap; // keep track of server watches public: - struct PendingNewShard { - PendingNewShard(uint64_t shardId, KeyRangeRef range) : shardId(format("%016llx", shardId)), range(range) {} - - std::string toString() const { - return fmt::format("PendingNewShard: [ShardID]: {} [Range]: {}", - this->shardId, - Traceable::toString(this->range)); - } - - std::string shardId; - KeyRange range; - }; - std::map> pendingCheckpoints; // Pending checkpoint requests std::unordered_map checkpoints; // Existing and deleting checkpoints std::unordered_map liveCheckpointReaders; // Active checkpoint readers VersionedMap tenantMap; - std::map> + std::map>> pendingAddRanges; // Pending requests to add ranges to physical shards std::map> pendingRemoveRanges; // Pending requests to remove ranges from physical shards @@ -10285,7 +10276,7 @@ void changeServerKeysWithPhysicalShards(StorageServer* data, .detail("Version", cVer); newEmptyRanges.push_back(range); updatedShards.emplace_back(range, cVer, desiredId, desiredId, StorageServerShard::ReadWrite); - data->pendingAddRanges[cVer].emplace_back(desiredId, range); + data->pendingAddRanges[cVer].emplace_back(range, desiredId); } else if (!nowAssigned) { if (dataAvailable) { ASSERT(data->newestAvailableVersion[range.begin] == @@ -10319,7 +10310,7 @@ void changeServerKeysWithPhysicalShards(StorageServer* data, setAvailableStatus(data, range, true); // Note: The initial range is available, however, the shard won't be created in the storage engine // until version is committed. - data->pendingAddRanges[cVer].emplace_back(desiredId, range); + data->pendingAddRanges[cVer].emplace_back(range, desiredId); TraceEvent(sevDm, "SSInitialShard", data->thisServerID) .detail("Range", range) .detail("NowAssigned", nowAssigned) @@ -10337,7 +10328,7 @@ void changeServerKeysWithPhysicalShards(StorageServer* data, } else { updatedShards.push_back( StorageServerShard(range, cVer, desiredId, desiredId, StorageServerShard::Adding)); - data->pendingAddRanges[cVer].emplace_back(desiredId, range); + data->pendingAddRanges[cVer].emplace_back(range, desiredId); } data->newestDirtyVersion.insert(range, cVer); TraceEvent(sevDm, "SSAssignShard", data->thisServerID) @@ -10370,7 +10361,7 @@ void changeServerKeysWithPhysicalShards(StorageServer* data, if (context == CSK_FALL_BACK) { updatedShards.push_back( StorageServerShard(range, cVer, desiredId, desiredId, StorageServerShard::Adding)); - data->pendingAddRanges[cVer].emplace_back(desiredId, range); + data->pendingAddRanges[cVer].emplace_back(range, desiredId); data->newestDirtyVersion.insert(range, cVer); // TODO: removeDataRange if the moveInShard has written to the kvs. } @@ -11966,6 +11957,7 @@ ACTOR Future updateStorage(StorageServer* data) { state bool addedRanges = false; if (!data->pendingAddRanges.empty()) { const Version aVer = data->pendingAddRanges.begin()->first; + const auto& ranges = data->pendingAddRanges.begin()->second; if (aVer <= desiredVersion) { TraceEvent(SevDebug, "AddRangeVersionSatisfied", data->thisServerID) .detail("DesiredVersion", desiredVersion) @@ -11976,15 +11968,10 @@ ACTOR Future updateStorage(StorageServer* data) { TraceEvent(SevVerbose, "SSAddKVSRangeBegin", data->thisServerID) .detail("Version", data->pendingAddRanges.begin()->first) .detail("DurableVersion", data->durableVersion.get()) - .detail("NewRanges", describe(data->pendingAddRanges.begin()->second)); + .detail("NewRanges", data->pendingAddRanges.begin()->second.size()); state std::vector> fAddRanges; - for (const auto& shard : data->pendingAddRanges.begin()->second) { - TraceEvent(SevInfo, "SSAddKVSRange", data->thisServerID) - .detail("Range", shard.range) - .detail("PhysicalShardID", shard.shardId); - fAddRanges.push_back(data->storage.addRange(shard.range, shard.shardId)); - } - wait(waitForAll(fAddRanges)); + Future complete = data->storage.addRanges(ranges); + wait(complete); TraceEvent(SevVerbose, "SSAddKVSRangeEnd", data->thisServerID) .detail("Version", data->pendingAddRanges.begin()->first) .detail("DurableVersion", data->durableVersion.get());