diff --git a/fdbclient/NativeAPI.actor.cpp b/fdbclient/NativeAPI.actor.cpp index f44a3058875..97dffdf6f99 100644 --- a/fdbclient/NativeAPI.actor.cpp +++ b/fdbclient/NativeAPI.actor.cpp @@ -1608,6 +1608,28 @@ DatabaseContext::DatabaseContext(Reference( singleKeyRange("fault_tolerance_metrics_json"_sr) .withPrefix(SpecialKeySpace::getModuleRange(SpecialKeySpace::MODULE::METRICS).begin))); + registerSpecialKeysImpl( + SpecialKeySpace::MODULE::BULKLOADING, + SpecialKeySpace::IMPLTYPE::READONLY, + std::make_unique( + KeyRangeRef("status/"_sr, "status0"_sr) + .withPrefix(SpecialKeySpace::getModuleRange(SpecialKeySpace::MODULE::BULKLOADING).begin))); + registerSpecialKeysImpl( + SpecialKeySpace::MODULE::BULKLOADING, + SpecialKeySpace::IMPLTYPE::READWRITE, + std::make_unique( + KeyRangeRef("task/"_sr, "task0"_sr) + .withPrefix(SpecialKeySpace::getModuleRange(SpecialKeySpace::MODULE::BULKLOADING).begin))); + registerSpecialKeysImpl( + SpecialKeySpace::MODULE::BULKLOADING, + SpecialKeySpace::IMPLTYPE::READWRITE, + std::make_unique( + KeyRangeRef("cancel/"_sr, "cancel0"_sr) + .withPrefix(SpecialKeySpace::getModuleRange(SpecialKeySpace::MODULE::BULKLOADING).begin))); + registerSpecialKeysImpl(SpecialKeySpace::MODULE::BULKLOADING, + SpecialKeySpace::IMPLTYPE::READWRITE, + std::make_unique(singleKeyRange("mode"_sr).withPrefix( + SpecialKeySpace::getModuleRange(SpecialKeySpace::MODULE::BULKLOADING).begin))); } if (apiVersion.version() >= 700) { diff --git a/fdbclient/SpecialKeySpace.actor.cpp b/fdbclient/SpecialKeySpace.actor.cpp index 513710c2539..de39be13dda 100644 --- a/fdbclient/SpecialKeySpace.actor.cpp +++ b/fdbclient/SpecialKeySpace.actor.cpp @@ -29,12 +29,16 @@ #include #include "fdbclient/ActorLineageProfiler.h" +#include "fdbclient/AuditUtils.actor.h" +#include "fdbclient/BulkLoading.h" #include "fdbclient/ClusterConnectionMemoryRecord.h" #include "fdbclient/FDBOptions.g.h" +#include "fdbclient/KeyRangeMap.h" #include "fdbclient/Knobs.h" #include "fdbclient/ProcessInterface.h" #include "fdbclient/GlobalConfig.actor.h" #include "fdbclient/SpecialKeySpace.actor.h" +#include "fdbclient/SystemData.h" #include "flow/Arena.h" #include "flow/UnitTest.h" #include "fdbclient/ManagementAPI.actor.h" @@ -74,6 +78,7 @@ std::unordered_map SpecialKeySpace::moduleToB { SpecialKeySpace::MODULE::ACTOR_PROFILER_CONF, KeyRangeRef("\xff\xff/actor_profiler_conf/"_sr, "\xff\xff/actor_profiler_conf0"_sr) }, { SpecialKeySpace::MODULE::CLUSTERID, singleKeyRange("\xff\xff/cluster_id"_sr) }, + { SpecialKeySpace::MODULE::BULKLOADING, KeyRangeRef("\xff\xff/bulk_loading/"_sr, "\xff\xff/bulk_loading0"_sr) }, }; std::unordered_map SpecialKeySpace::managementApiCommandToRange = { @@ -2717,6 +2722,346 @@ Future> DataDistributionImpl::commit(ReadYourWritesTransac return msg; } +bool existingBulkLoadUpdate(ReadYourWritesTransaction* ryw, KeyRange range) { + KeyRange rangeToCheck = + range.withPrefix(SpecialKeySpace::getModuleRange(SpecialKeySpace::MODULE::BULKLOADING).begin); + auto ranges = ryw->getSpecialKeySpaceWriteMap().containedRanges(rangeToCheck); + for (auto iter = ranges.begin(); iter != ranges.end(); ++iter) { + if (iter->value().first && iter->value().second.present()) { + return true; + } + } + return false; +} + +BulkLoadStatusImpl::BulkLoadStatusImpl(KeyRangeRef kr) : SpecialKeyRangeReadImpl(kr) {} + +ACTOR static Future BulkLoadStatusGetRangeActor(ReadYourWritesTransaction* ryw, + KeyRef prefix, + KeyRangeRef kr) { + state Key taskPrefixKey = + SpecialKeySpace::getModuleRange(SpecialKeySpace::MODULE::BULKLOADING).begin.withSuffix("status/"_sr); + state KeyRange range = kr.removePrefix(taskPrefixKey); + if (!normalKeys.contains(range)) { + TraceEvent(SevWarn, "BulkLoadCheckTaskStatusError") + .detail("Reason", "Input range to check is out of normal range") + .detail("InputRange", range); + throw bulkload_check_status_input_error(); + } + // Check if there are existing updates in the current transaction + if (existingBulkLoadUpdate(ryw, Standalone(KeyRangeRef("task/"_sr, "task0"_sr))) || + existingBulkLoadUpdate(ryw, Standalone(KeyRangeRef("cancel/"_sr, "cancel0"_sr)))) { + TraceEvent(SevWarn, "BulkLoadCheckTaskStatusError") + .detail("Reason", "Exist bulk loading update in the same transaction") + .detail("InputRange", range); + throw bulkload_check_status_input_error(); + } + RangeResult result = wait(krmGetRanges(&(ryw->getTransaction()), bulkLoadPrefix, range)); + RangeResult res; + res.more = result.more; + res.readThrough = result.readThrough; + res.readToBegin = result.readToBegin; + res.readThroughEnd = result.readThroughEnd; + for (int i = 0; i < result.size(); ++i) { + Key keyToCopy = result[i].key.withPrefix(taskPrefixKey); + res.push_back_deep(res.arena(), KeyValueRef(keyToCopy, result[i].value)); + } + return rywGetRange(ryw, kr, res); +} + +Future BulkLoadStatusImpl::getRange(ReadYourWritesTransaction* ryw, + KeyRangeRef kr, + GetRangeLimits limitsHint) const { + return BulkLoadStatusGetRangeActor(ryw, getKeyRange().begin, kr); +} + +BulkLoadTaskImpl::BulkLoadTaskImpl(KeyRangeRef kr) : SpecialKeyRangeRWImpl(kr) {} + +Future BulkLoadTaskImpl::getRange(ReadYourWritesTransaction* ryw, + KeyRangeRef kr, + GetRangeLimits limitsHint) const { + throw not_implemented(); +} + +void BulkLoadTaskImpl::clear(ReadYourWritesTransaction* ryw, const KeyRangeRef& range) { + throw not_implemented(); +} + +void BulkLoadTaskImpl::clear(ReadYourWritesTransaction* ryw, const KeyRef& key) { + throw not_implemented(); +} + +void BulkLoadTaskImpl::set(ReadYourWritesTransaction* ryw, const KeyRef& key, const ValueRef& value) { + Key taskPrefixKey = + SpecialKeySpace::getModuleRange(SpecialKeySpace::MODULE::BULKLOADING).begin.withSuffix("task/"_sr); + BulkLoadState bulkLoadState = decodeBulkLoadState(value); + if (key != taskPrefixKey) { + TraceEvent(SevWarn, "BulkLoadSetTaskError") + .detail("Reason", "Input key error") + .detail("CorrectKey", taskPrefixKey) + .detail("InputKey", key) + .detail("InputState", bulkLoadState.toString()); + throw bulkload_add_task_input_error(); + } + ASSERT(bulkLoadState.isValid()); + KeyRangeRef bulkLoadRange = bulkLoadState.range; + ASSERT(!bulkLoadRange.empty()); + if (bulkLoadRange.begin >= normalKeys.end || bulkLoadRange.end >= normalKeys.end) { + TraceEvent(SevWarn, "BulkLoadSetTaskError") + .detail("Reason", "Input range is out of normal key space") + .detail("InputState", bulkLoadState.toString()); + throw bulkload_add_task_input_error(); + } + auto ranges = ryw->getSpecialKeySpaceWriteMap().intersectingRanges(bulkLoadRange.withPrefix(taskPrefixKey)); + for (auto range : ranges) { + if (!range.value().first || !range.value().second.present()) { + continue; + } + BulkLoadState oldBulkLoadState = decodeBulkLoadState(range.value().second.get()); + ASSERT(oldBulkLoadState.isValid()); + TraceEvent(SevWarnAlways, "BulkLoadSetTaskError") + .detail("Reason", "Input task is trying to overwrite the existing task in the same transaction") + .detail("InputState", bulkLoadState.toString()) + .detail("ExistState", oldBulkLoadState.toString()); + throw bulkload_add_task_input_error(); + } + ryw->getSpecialKeySpaceWriteMap().insert(bulkLoadRange.withPrefix(taskPrefixKey), + std::make_pair(true, bulkLoadStateValue(bulkLoadState))); +} + +ACTOR static Future> BulkLoadingTaskCommitActor(ReadYourWritesTransaction* ryw, KeyRangeRef kr) { + // Check if there are existing bulk loading cancellation in the current transaction + if (existingBulkLoadUpdate(ryw, Standalone(KeyRangeRef("cancel/"_sr, "cancel0"_sr)))) { + TraceEvent(SevWarn, "BulkLoadTaskCommitError") + .detail("Reason", "Exist bulk loading cancel in the same transaction"); + throw bulkload_add_task_input_error(); + } + + // Check bulk loading mode + Optional mode = wait(ryw->getTransaction().get(bulkLoadModeKey)); + if (!mode.present()) { + throw bulkload_is_off_when_commit_task(); + } + int bulkLoadMode = BinaryReader::fromStringRef(mode.get(), Unversioned()); + if (bulkLoadMode == 0) { + throw bulkload_is_off_when_commit_task(); + } else if (bulkLoadMode != 1) { + throw bulkload_mode_not_found(); + } + + state KeyRange taskRange = + Standalone(KeyRangeRef("task/"_sr, "task0"_sr)) + .withPrefix(SpecialKeySpace::getModuleRange(SpecialKeySpace::MODULE::BULKLOADING).begin); + + // Validate current transaction bulk loading tasks + state std::vector updateTasks; + state std::vector updateRanges; + auto ranges = ryw->getSpecialKeySpaceWriteMap().containedRanges(taskRange); + for (auto iter = ranges.begin(); iter != ranges.end(); ++iter) { + if (!iter->value().first || !iter->value().second.present()) { + continue; + } + BulkLoadState bulkLoadTask = decodeBulkLoadState(iter->value().second.get()); + ASSERT(iter->range() == bulkLoadTask.range.withPrefix(taskRange.begin)); + for (const auto& updateTask : updateTasks) { + if (updateTask.filePaths == bulkLoadTask.filePaths) { + TraceEvent(SevWarnAlways, "BulkLoadTaskCommitError") + .detail("Reason", "Different ranges are mapped to the same file path set"); + throw bulkload_add_task_input_error(); + } + } + updateTasks.push_back(bulkLoadTask); + updateRanges.push_back(bulkLoadTask.range); + } + updateRanges = coalesceRangeList(updateRanges); + + // Conflict check between local change and global + state int i = 0; + state Key readBeginKey; + state Key readEndKey; + for (; i < updateRanges.size(); i++) { + readBeginKey = updateRanges[i].begin; + readEndKey = updateRanges[i].end; + while (readBeginKey < readEndKey) { + KeyRange rangeToRead = Standalone(KeyRangeRef(readBeginKey, readEndKey)); + RangeResult result = wait(krmGetRanges(&(ryw->getTransaction()), bulkLoadPrefix, rangeToRead)); + for (int j = 0; j < result.size() - 1; j++) { + if (!result[j].value.empty()) { + KeyRange existRange = Standalone(KeyRangeRef(result[i].key, result[i + 1].key)); + BulkLoadState existBulkLoadTask = decodeBulkLoadState(result[i].value); + ASSERT(existBulkLoadTask.isValid()); + TraceEvent(SevWarnAlways, "BulkLoadTaskCommitError") + .detail("Reason", "New range conflicts to existing ones"); + throw bulkload_task_conflict(); + } + } + readBeginKey = result.back().key; + } + } + + // Update to global + i = 0; + for (; i < updateTasks.size(); i++) { + wait(krmSetRange( + &(ryw->getTransaction()), bulkLoadPrefix, updateTasks[i].range, bulkLoadStateValue(updateTasks[i]))); + TraceEvent("BulkLoadCommitEach").detail("Task", updateTasks[i].toString()).detail("KR", kr.toString()); + } + return Optional(); +} + +Future> BulkLoadTaskImpl::commit(ReadYourWritesTransaction* ryw) { + return BulkLoadingTaskCommitActor(ryw, getKeyRange()); +} + +BulkLoadCancelImpl::BulkLoadCancelImpl(KeyRangeRef kr) : SpecialKeyRangeRWImpl(kr) {} + +Future BulkLoadCancelImpl::getRange(ReadYourWritesTransaction* ryw, + KeyRangeRef kr, + GetRangeLimits limitsHint) const { + throw not_implemented(); +} + +void BulkLoadCancelImpl::clear(ReadYourWritesTransaction* ryw, const KeyRangeRef& range) { + Key cancelPrefixKey = + SpecialKeySpace::getModuleRange(SpecialKeySpace::MODULE::BULKLOADING).begin.withSuffix("cancel/"_sr); + KeyRange rangeToCancel = range.removePrefix(cancelPrefixKey); + if (!normalKeys.contains(rangeToCancel)) { + TraceEvent(SevWarn, "BulkLoadCancelTaskError") + .detail("Reason", "Input range to check is out of normal range") + .detail("InputRange", rangeToCancel); + throw bulkload_cancel_task_input_error(); + } + ryw->getSpecialKeySpaceWriteMap().insert(rangeToCancel.withPrefix(cancelPrefixKey), + std::make_pair(true, Optional())); +} + +void BulkLoadCancelImpl::clear(ReadYourWritesTransaction* ryw, const KeyRef& key) { + throw not_implemented(); +} + +void BulkLoadCancelImpl::set(ReadYourWritesTransaction* ryw, const KeyRef& key, const ValueRef& value) { + throw not_implemented(); +} + +ACTOR static Future> BulkLoadingCancelCommitActor(ReadYourWritesTransaction* ryw, + KeyRangeRef kr) { + // Check if there are existing bulk loading new task in the current transaction + if (existingBulkLoadUpdate(ryw, Standalone(KeyRangeRef("task/"_sr, "task0"_sr)))) { + TraceEvent(SevWarn, "BulkLoadTaskCommitError") + .detail("Reason", "Exist bulk loading cancel in the same transaction"); + throw bulkload_cancel_task_input_error(); + } + + // Check bulk loading mode + Optional mode = wait(ryw->getTransaction().get(bulkLoadModeKey)); + if (!mode.present()) { + throw bulkload_is_off_when_commit_task(); + } + int bulkLoadMode = BinaryReader::fromStringRef(mode.get(), Unversioned()); + if (bulkLoadMode == 0) { + throw bulkload_is_off_when_commit_task(); + } else if (bulkLoadMode != 1) { + throw bulkload_mode_not_found(); + } + + // Get all cancelled range in the current transaction + state KeyRange cancelRange = + Standalone(KeyRangeRef("cancel/"_sr, "cancel0"_sr)) + .withPrefix(SpecialKeySpace::getModuleRange(SpecialKeySpace::MODULE::BULKLOADING).begin); + state std::vector cancelRanges; + auto ranges = ryw->getSpecialKeySpaceWriteMap().containedRanges(cancelRange); + for (auto iter = ranges.begin(); iter != ranges.end(); ++iter) { + if (!iter->value().first) { + continue; + } + ASSERT(!iter->value().second.present()); + cancelRanges.push_back(iter->range().removePrefix(cancelRange.begin)); + } + cancelRanges = coalesceRangeList(cancelRanges); + + // Get all ranges to cancel intersecting the global task map + state int i = 0; + state Key readBeginKey; + state Key readEndKey; + state std::vector rangesToRemove; + for (; i < cancelRanges.size(); i++) { + readBeginKey = cancelRanges[i].begin; + readEndKey = cancelRanges[i].end; + while (readBeginKey < readEndKey) { + KeyRange rangeToRead = Standalone(KeyRangeRef(readBeginKey, readEndKey)); + RangeResult result = wait(krmGetRanges(&(ryw->getTransaction()), bulkLoadPrefix, rangeToRead)); + for (int j = 0; j < result.size() - 1; j++) { + if (!result[j].value.empty()) { + KeyRange existRange = Standalone(KeyRangeRef(result[j].key, result[j + 1].key)); + BulkLoadState existBulkLoadTask = decodeBulkLoadState(result[j].value); + ASSERT(existBulkLoadTask.isValid()); + rangesToRemove.push_back(existBulkLoadTask.range); + } + } + readBeginKey = result.back().key; + } + } + rangesToRemove = coalesceRangeList(rangesToRemove); + + // Remove task from the global task map + i = 0; + for (; i < rangesToRemove.size(); i++) { + wait(krmSetRange(&(ryw->getTransaction()), bulkLoadPrefix, rangesToRemove[i], Value())); + TraceEvent("BulkLoadCancelCommitEach") + .detail("Range", rangesToRemove[i].toString()) + .detail("KR", kr.toString()); + } + + return Optional(); +} + +Future> BulkLoadCancelImpl::commit(ReadYourWritesTransaction* ryw) { + return BulkLoadingCancelCommitActor(ryw, getKeyRange()); +} + +BulkLoadModeImpl::BulkLoadModeImpl(KeyRangeRef kr) : SpecialKeyRangeRWImpl(kr) {} + +Future BulkLoadModeImpl::getRange(ReadYourWritesTransaction* ryw, + KeyRangeRef kr, + GetRangeLimits limitsHint) const { + throw not_implemented(); +} + +void BulkLoadModeImpl::clear(ReadYourWritesTransaction* ryw, const KeyRangeRef& range) { + throw not_implemented(); +} + +void BulkLoadModeImpl::clear(ReadYourWritesTransaction* ryw, const KeyRef& key) { + throw not_implemented(); +} + +void BulkLoadModeImpl::set(ReadYourWritesTransaction* ryw, const KeyRef& key, const ValueRef& value) { + Key modeKey = SpecialKeySpace::getModuleRange(SpecialKeySpace::MODULE::BULKLOADING).begin.withSuffix("mode"_sr); + ryw->getSpecialKeySpaceWriteMap().insert(modeKey, std::make_pair(true, value)); + TraceEvent("BulkLoadSetMode").detail("Value", value.toString()); +} + +Future> BulkLoadModeImpl::commit(ReadYourWritesTransaction* ryw) { + KeyRangeRef kr = getKeyRange(); + auto ranges = ryw->getSpecialKeySpaceWriteMap().containedRanges(kr); + for (auto iter = ranges.begin(); iter != ranges.end(); ++iter) { + if (!iter->value().first) + continue; + if (iter->value().second.present()) { + ASSERT(iter->range() == kr); + int mode = BinaryReader::fromStringRef(iter->value().second.get(), Unversioned()); + if (mode == 0 || mode == 1) { + ryw->getTransaction().set(bulkLoadModeKey, BinaryWriter::toValue(mode, Unversioned())); + TraceEvent("BulkLoadSetModeCommit").detail("Value", mode); + } else { + throw bulkload_mode_not_found(); + } + } + } + TraceEvent("BulkLoadSetModeCommitDone").detail("KR", kr.toString()); + return Optional(); +} + // Clears the special management api keys excludeLocality and failedLocality. void includeLocalities(ReadYourWritesTransaction* ryw) { ryw->setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE); diff --git a/fdbclient/SystemData.cpp b/fdbclient/SystemData.cpp index 0878bb0081c..886d66052fa 100644 --- a/fdbclient/SystemData.cpp +++ b/fdbclient/SystemData.cpp @@ -1215,6 +1215,21 @@ const KeyRef moveKeysLockWriteKey = "\xff/moveKeysLock/Write"_sr; const KeyRef dataDistributionModeKey = "\xff/dataDistributionMode"_sr; const UID dataDistributionModeLock = UID(6345, 3425); +// Bulk loading keys +const KeyRef bulkLoadModeKey = "\xff/bulkLoadMode"_sr; +const KeyRef bulkLoadPrefix = "\xff/bulkLoad/"_sr; + +const Value bulkLoadStateValue(const BulkLoadState& bulkLoadState) { + return ObjectWriter::toValue(bulkLoadState, IncludeVersion()); +} + +BulkLoadState decodeBulkLoadState(const ValueRef& value) { + BulkLoadState bulkLoadState; + ObjectReader reader(value.begin(), IncludeVersion()); + reader.deserialize(bulkLoadState); + return bulkLoadState; +} + // Keys to view and control tag throttling const KeyRangeRef tagThrottleKeys = KeyRangeRef("\xff\x02/throttledTags/tag/"_sr, "\xff\x02/throttledTags/tag0"_sr); const KeyRef tagThrottleKeysPrefix = tagThrottleKeys.begin; diff --git a/fdbclient/include/fdbclient/BulkLoading.h b/fdbclient/include/fdbclient/BulkLoading.h new file mode 100644 index 00000000000..3b050c54064 --- /dev/null +++ b/fdbclient/include/fdbclient/BulkLoading.h @@ -0,0 +1,69 @@ +/* + * BulkLoading.h + * + * This source file is part of the FoundationDB open source project + * + * Copyright 2013-2024 Apple Inc. and the FoundationDB project authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef FDBCLIENT_BULKLOADING_H +#define FDBCLIENT_BULKLOADING_H +#pragma once + +#include "fdbclient/FDBTypes.h" +#include "fdbrpc/fdbrpc.h" + +enum class BulkLoadPhase : uint8_t { + Invalid = 0, + Running = 1, + Complete = 2, + Error = 3, +}; + +enum class BulkLoadType : uint8_t { + Invalid = 0, + SQLite = 1, + RocksDB = 2, + ShardedRocksDB = 3, +}; + +struct BulkLoadState { + constexpr static FileIdentifier file_identifier = 1384499; + + BulkLoadState() = default; + + BulkLoadState(BulkLoadType loadType, std::set filePaths) : loadType(loadType), filePaths(filePaths) {} + + BulkLoadState(KeyRange range, BulkLoadType loadType, std::set filePaths) + : range(range), loadType(loadType), filePaths(filePaths) {} + + bool isValid() const { return loadType != BulkLoadType::Invalid; } + + std::string toString() const { + return "BulkLoadState: [Range]: " + Traceable::toString(range) + + ", [Type]: " + std::to_string(static_cast(loadType)) + ", [FilePath]: " + describe(filePaths); + } + + template + void serialize(Ar& ar) { + serializer(ar, range, loadType, filePaths); + } + + KeyRange range; + BulkLoadType loadType; + std::set filePaths; +}; + +#endif diff --git a/fdbclient/include/fdbclient/SpecialKeySpace.actor.h b/fdbclient/include/fdbclient/SpecialKeySpace.actor.h index 0359c985cdd..bdc2ee4bfc8 100644 --- a/fdbclient/include/fdbclient/SpecialKeySpace.actor.h +++ b/fdbclient/include/fdbclient/SpecialKeySpace.actor.h @@ -182,6 +182,7 @@ class SpecialKeySpace { STATUSJSON, UNKNOWN, // default value for all unregistered range WORKERINTERFACE, + BULKLOADING, }; enum class IMPLTYPE { @@ -562,6 +563,50 @@ class DataDistributionImpl : public SpecialKeyRangeRWImpl { Future> commit(ReadYourWritesTransaction* ryw) override; }; +class BulkLoadStatusImpl : public SpecialKeyRangeReadImpl { +public: + explicit BulkLoadStatusImpl(KeyRangeRef kr); + Future getRange(ReadYourWritesTransaction* ryw, + KeyRangeRef kr, + GetRangeLimits limitsHint) const override; +}; + +class BulkLoadTaskImpl : public SpecialKeyRangeRWImpl { +public: + explicit BulkLoadTaskImpl(KeyRangeRef kr); + Future getRange(ReadYourWritesTransaction* ryw, + KeyRangeRef kr, + GetRangeLimits limitsHint) const override; + void set(ReadYourWritesTransaction* ryw, const KeyRef& key, const ValueRef& value) override; + Future> commit(ReadYourWritesTransaction* ryw) override; + void clear(ReadYourWritesTransaction* ryw, const KeyRangeRef& range) override; + void clear(ReadYourWritesTransaction* ryw, const KeyRef& key) override; +}; + +class BulkLoadCancelImpl : public SpecialKeyRangeRWImpl { +public: + explicit BulkLoadCancelImpl(KeyRangeRef kr); + Future getRange(ReadYourWritesTransaction* ryw, + KeyRangeRef kr, + GetRangeLimits limitsHint) const override; + void set(ReadYourWritesTransaction* ryw, const KeyRef& key, const ValueRef& value) override; + Future> commit(ReadYourWritesTransaction* ryw) override; + void clear(ReadYourWritesTransaction* ryw, const KeyRangeRef& range) override; + void clear(ReadYourWritesTransaction* ryw, const KeyRef& key) override; +}; + +class BulkLoadModeImpl : public SpecialKeyRangeRWImpl { +public: + explicit BulkLoadModeImpl(KeyRangeRef kr); + Future getRange(ReadYourWritesTransaction* ryw, + KeyRangeRef kr, + GetRangeLimits limitsHint) const override; + void set(ReadYourWritesTransaction* ryw, const KeyRef& key, const ValueRef& value) override; + Future> commit(ReadYourWritesTransaction* ryw) override; + void clear(ReadYourWritesTransaction* ryw, const KeyRangeRef& range) override; + void clear(ReadYourWritesTransaction* ryw, const KeyRef& key) override; +}; + class WorkerInterfacesSpecialKeyImpl : public SpecialKeyRangeReadImpl { public: explicit WorkerInterfacesSpecialKeyImpl(KeyRangeRef kr); diff --git a/fdbclient/include/fdbclient/SystemData.h b/fdbclient/include/fdbclient/SystemData.h index 62b32649716..b465e9fe3f7 100644 --- a/fdbclient/include/fdbclient/SystemData.h +++ b/fdbclient/include/fdbclient/SystemData.h @@ -25,6 +25,7 @@ // Functions and constants documenting the organization of the reserved keyspace in the database beginning with "\xFF" #include "fdbclient/AccumulativeChecksum.h" +#include "fdbclient/BulkLoading.h" #include "fdbclient/FDBTypes.h" #include "fdbclient/BlobWorkerInterface.h" // TODO move the functions that depend on this out of here and into BlobWorkerInterface.h to remove this dependency #include "fdbclient/StorageServerInterface.h" @@ -518,6 +519,11 @@ extern const KeyRef moveKeysLockOwnerKey, moveKeysLockWriteKey; extern const KeyRef dataDistributionModeKey; extern const UID dataDistributionModeLock; +extern const KeyRef bulkLoadModeKey; +extern const KeyRef bulkLoadPrefix; +const Value bulkLoadStateValue(const BulkLoadState& bulkLoadState); +BulkLoadState decodeBulkLoadState(const ValueRef& value); + // Keys to view and control tag throttling extern const KeyRangeRef tagThrottleKeys; extern const KeyRef tagThrottleKeysPrefix; diff --git a/fdbserver/DataDistribution.actor.cpp b/fdbserver/DataDistribution.actor.cpp index 0f093f97c45..77a22ecf4c4 100644 --- a/fdbserver/DataDistribution.actor.cpp +++ b/fdbserver/DataDistribution.actor.cpp @@ -994,6 +994,26 @@ ACTOR Future serveBlobMigratorRequests(Reference self, } } +ACTOR Future bulkLoadingCore(Reference self) { + loop { + try { + Database cx = self->txnProcessor->context(); + Transaction tr(cx); + Optional mode = wait(tr.get(bulkLoadModeKey)); + if (mode.present()) { + int bulkLoadMode = BinaryReader::fromStringRef(mode.get(), Unversioned()); + if (bulkLoadMode == 1) { + TraceEvent("BulkLoadingCore").detail("Status", "Mode On").log(); + return Void(); + } + } + } catch (Error& e) { + TraceEvent("BulkLoadingCore").detail("Status", "Error").errorUnsuppressed(e).log(); + wait(delay(5.0)); + } + } +} + // Runs the data distribution algorithm for FDB, including the DD Queue, DD tracker, and DD team collection ACTOR Future dataDistribution(Reference self, PromiseStream getShardMetricsList, @@ -1072,6 +1092,8 @@ ACTOR Future dataDistribution(Reference self, actors.push_back(self->pollMoveKeysLock()); + actors.push_back(bulkLoadingCore(self)); + self->context->tracker = makeReference( DataDistributionTrackerInitParams{ .db = self->txnProcessor, .distributorId = self->ddId, diff --git a/fdbserver/workloads/BulkLoading.actor.cpp b/fdbserver/workloads/BulkLoading.actor.cpp new file mode 100644 index 00000000000..99a28b5abe7 --- /dev/null +++ b/fdbserver/workloads/BulkLoading.actor.cpp @@ -0,0 +1,243 @@ +/* + * BulkLoading.actor.cpp + * + * This source file is part of the FoundationDB open source project + * + * Copyright 2013-2024 Apple Inc. and the FoundationDB project authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "fdbclient/BulkLoading.h" +#include "fdbclient/NativeAPI.actor.h" +#include "fdbserver/workloads/workloads.actor.h" +#include "flow/actorcompiler.h" // This must be the last #include. + +struct BulkLoading : TestWorkload { + static constexpr auto NAME = "BulkLoadingWorkload"; + const bool enabled; + bool pass; + + BulkLoading(WorkloadContext const& wcx) : TestWorkload(wcx), enabled(true), pass(true) {} + + Future setup(Database const& cx) override { return Void(); } + + Future start(Database const& cx) override { return _start(this, cx); } + + Future check(Database const& cx) override { return true; } + + void getMetrics(std::vector& m) override {} + + void addBulkLoadTask(ReadYourWritesTransaction* tr, KeyRange range, std::string path) { + std::set paths; + paths.insert(path); + BulkLoadState bulkLoadState(range, BulkLoadType::ShardedRocksDB, paths); + try { + tr->set("\xff\xff/bulk_loading/task/"_sr, bulkLoadStateValue(bulkLoadState)); + TraceEvent("BulkLoadWorkloadAddTask").detail("Task", bulkLoadState.toString()); + } catch (Error& e) { + TraceEvent("BulkLoadWorkloadAddTaskFailed").detail("Task", bulkLoadState.toString()); + throw e; + } + } + + std::string parseReadRangeResult(RangeResult input) { + std::string res; + for (int i = 0; i < input.size() - 1; i++) { + if (!input[i].value.empty()) { + BulkLoadState bulkLoadState = decodeBulkLoadState(input[i].value); + ASSERT(bulkLoadState.isValid()); + if (bulkLoadState.range != Standalone(KeyRangeRef(input[i].key, input[i + 1].key)) + .removePrefix("\xff\xff/bulk_loading/status/"_sr)) { + res = res + "[Not aligned] "; + } + res = res + bulkLoadState.toString() + "; "; + } + } + return res; + } + + ACTOR Future _start(BulkLoading* self, Database cx) { + if (self->clientId != 0) { + return Void(); + } + state ReadYourWritesTransaction tr(cx); + state Version version = wait(tr.getReadVersion()); + TraceEvent("BulkLoadWorkloadStart").detail("Version", version); + state KeyRange range1 = + Standalone(KeyRangeRef("\xff\xff/bulk_loading/status/a"_sr, "\xff\xff/bulk_loading/status/b"_sr)); + tr.setOption(FDBTransactionOptions::SPECIAL_KEY_SPACE_ENABLE_WRITES); + tr.setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE); + tr.setOption(FDBTransactionOptions::READ_SYSTEM_KEYS); + RangeResult res1 = wait(tr.getRange(range1, GetRangeLimits())); + TraceEvent("BulkLoadWorkloadReadRange").detail("Range", range1).detail("Res", self->parseReadRangeResult(res1)); + try { + self->addBulkLoadTask(&tr, Standalone(KeyRangeRef(""_sr, "\xff/2"_sr)), "x"); + ASSERT(false); + } catch (Error& e) { + ASSERT(e.code() == error_code_bulkload_add_task_input_error); + } + self->addBulkLoadTask(&tr, Standalone(KeyRangeRef("1"_sr, "2"_sr)), "1"); + self->addBulkLoadTask(&tr, Standalone(KeyRangeRef("2"_sr, "3"_sr)), "2"); + self->addBulkLoadTask(&tr, Standalone(KeyRangeRef("3"_sr, "4"_sr)), "3"); + try { + self->addBulkLoadTask(&tr, Standalone(KeyRangeRef("2"_sr, "5"_sr)), "2"); + ASSERT(false); + } catch (Error& e) { + TraceEvent("BulkLoadWorkloadAddTaskFailed").errorUnsuppressed(e); + ASSERT(e.code() == error_code_bulkload_add_task_input_error); + } + wait(delay(1.0)); + tr.reset(); + TraceEvent("BulkLoadWorkloadTransactionReset"); + + tr.setOption(FDBTransactionOptions::SPECIAL_KEY_SPACE_ENABLE_WRITES); + tr.setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE); + tr.setOption(FDBTransactionOptions::READ_SYSTEM_KEYS); + self->addBulkLoadTask(&tr, Standalone(KeyRangeRef("1"_sr, "2"_sr)), "1"); + try { + wait(tr.commit()); + ASSERT(false); + } catch (Error& e) { + ASSERT(e.code() == error_code_bulkload_is_off_when_commit_task); + } + tr.reset(); + TraceEvent("BulkLoadWorkloadTransactionReset"); + + tr.setOption(FDBTransactionOptions::SPECIAL_KEY_SPACE_ENABLE_WRITES); + tr.setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE); + tr.setOption(FDBTransactionOptions::READ_SYSTEM_KEYS); + tr.set("\xff\xff/bulk_loading/mode"_sr, BinaryWriter::toValue("1"_sr, Unversioned())); + wait(tr.commit()); + TraceEvent("BulkLoadWorkloadEnableBulkLoad"); + tr.reset(); + TraceEvent("BulkLoadWorkloadTransactionReset"); + + tr.setOption(FDBTransactionOptions::SPECIAL_KEY_SPACE_ENABLE_WRITES); + tr.setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE); + tr.setOption(FDBTransactionOptions::READ_SYSTEM_KEYS); + ASSERT(!tr.getReadVersion().isReady()); + self->addBulkLoadTask(&tr, Standalone(KeyRangeRef("1"_sr, "2"_sr)), "1"); + self->addBulkLoadTask(&tr, Standalone(KeyRangeRef("2"_sr, "3"_sr)), "2"); + self->addBulkLoadTask(&tr, Standalone(KeyRangeRef("3"_sr, "4"_sr)), "3"); + wait(tr.commit()); + TraceEvent("BulkLoadWorkloadTransactionCommitted") + .detail("AtVersion", tr.getReadVersion().get()) + .detail("CommitVersion", tr.getCommittedVersion()); + + tr.reset(); + TraceEvent("BulkLoadWorkloadTransactionReset"); + tr.setOption(FDBTransactionOptions::SPECIAL_KEY_SPACE_ENABLE_WRITES); + tr.setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE); + tr.setOption(FDBTransactionOptions::READ_SYSTEM_KEYS); + self->addBulkLoadTask(&tr, Standalone(KeyRangeRef("11"_sr, "2"_sr)), "4"); + try { + wait(tr.commit()); + ASSERT(false); + } catch (Error& e) { + TraceEvent("BulkLoadWorkloadAddTaskFailed").errorUnsuppressed(e); + ASSERT(e.code() == error_code_bulkload_task_conflict); + } + + tr.reset(); + TraceEvent("BulkLoadWorkloadTransactionReset"); + tr.setOption(FDBTransactionOptions::SPECIAL_KEY_SPACE_ENABLE_WRITES); + tr.setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE); + tr.setOption(FDBTransactionOptions::READ_SYSTEM_KEYS); + + state KeyRange range3 = + Standalone(KeyRangeRef("\xff\xff/bulk_loading/status/"_sr, "\xff\xff/bulk_loading/status/\xff"_sr)); + RangeResult res3 = wait(tr.getRange(range3, GetRangeLimits())); + TraceEvent("BulkLoadWorkloadReadRange") + .detail("AtVersion", tr.getReadVersion().get()) + .detail("Range", range3) + .detail("Res", self->parseReadRangeResult(res3)); + + state KeyRange range4 = + Standalone(KeyRangeRef("\xff\xff/bulk_loading/status/2"_sr, "\xff\xff/bulk_loading/status/3"_sr)); + RangeResult res4 = wait(tr.getRange(range4, GetRangeLimits())); + TraceEvent("BulkLoadWorkloadReadRange") + .detail("AtVersion", tr.getReadVersion().get()) + .detail("Range", range4) + .detail("Res", self->parseReadRangeResult(res4)); + + state KeyRange range5 = + Standalone(KeyRangeRef("\xff\xff/bulk_loading/status/11"_sr, "\xff\xff/bulk_loading/status/12"_sr)); + RangeResult res5 = wait(tr.getRange(range5, GetRangeLimits())); + TraceEvent("BulkLoadWorkloadReadRange") + .detail("AtVersion", tr.getReadVersion().get()) + .detail("Range", range5) + .detail("Res", self->parseReadRangeResult(res5)); + + tr.reset(); + TraceEvent("BulkLoadWorkloadTransactionReset"); + tr.setOption(FDBTransactionOptions::SPECIAL_KEY_SPACE_ENABLE_WRITES); + tr.setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE); + tr.setOption(FDBTransactionOptions::READ_SYSTEM_KEYS); + + state KeyRange range6 = + Standalone(KeyRangeRef("\xff\xff/bulk_loading/cancel/11"_sr, "\xff\xff/bulk_loading/cancel/2"_sr)); + tr.clear(range6); + wait(tr.commit()); + TraceEvent("BulkLoadWorkloadClearRange") + .detail("AtVersion", tr.getReadVersion().get()) + .detail("CommitVersion", tr.getCommittedVersion()) + .detail("Range", range6); + + tr.reset(); + TraceEvent("BulkLoadWorkloadTransactionReset"); + tr.setOption(FDBTransactionOptions::SPECIAL_KEY_SPACE_ENABLE_WRITES); + tr.setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE); + tr.setOption(FDBTransactionOptions::READ_SYSTEM_KEYS); + RangeResult res7 = wait(tr.getRange(range3, GetRangeLimits())); + TraceEvent("BulkLoadWorkloadReadRange") + .detail("AtVersion", tr.getReadVersion().get()) + .detail("Range", range3) + .detail("Res", self->parseReadRangeResult(res7)); + + tr.reset(); + TraceEvent("BulkLoadWorkloadTransactionReset"); + tr.setOption(FDBTransactionOptions::SPECIAL_KEY_SPACE_ENABLE_WRITES); + tr.setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE); + tr.setOption(FDBTransactionOptions::READ_SYSTEM_KEYS); + self->addBulkLoadTask(&tr, Standalone(KeyRangeRef("11"_sr, "2"_sr)), "5"); + + try { + RangeResult res8 = wait(tr.getRange(range3, GetRangeLimits())); + ASSERT(false); + } catch (Error& e) { + TraceEvent("BulkLoadWorkloadReadRangeError").errorUnsuppressed(e); + ASSERT(e.code() == error_code_bulkload_check_status_input_error); + } + + wait(tr.commit()); + TraceEvent("BulkLoadWorkloadTransactionCommitted") + .detail("AtVersion", tr.getReadVersion().get()) + .detail("CommitVersion", tr.getCommittedVersion()); + + tr.reset(); + TraceEvent("BulkLoadWorkloadTransactionReset"); + tr.setOption(FDBTransactionOptions::SPECIAL_KEY_SPACE_ENABLE_WRITES); + tr.setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE); + tr.setOption(FDBTransactionOptions::READ_SYSTEM_KEYS); + RangeResult res9 = wait(tr.getRange(range3, GetRangeLimits())); + TraceEvent("BulkLoadWorkloadReadRange") + .detail("AtVersion", tr.getReadVersion().get()) + .detail("Range", range3) + .detail("Res", self->parseReadRangeResult(res9)); + + return Void(); + } +}; + +WorkloadFactory BulkLoadingFactory; diff --git a/flow/include/flow/error_definitions.h b/flow/include/flow/error_definitions.h index 5ec3bcfd0ad..5b641919b29 100755 --- a/flow/include/flow/error_definitions.h +++ b/flow/include/flow/error_definitions.h @@ -107,6 +107,12 @@ ERROR( unknown_storage_engine, 1082, "Storage engine type is not recognized." ) ERROR( duplicate_snapshot_request, 1083, "A duplicate snapshot request has been sent, the old request is discarded.") ERROR( dd_config_changed, 1084, "DataDistribution configuration changed." ) ERROR( consistency_check_urgent_task_failed, 1085, "Consistency check urgent task is failed") +ERROR( bulkload_add_task_input_error, 1086, "Bulk loading add task input is error" ) +ERROR( bulkload_cancel_task_input_error, 1087, "Bulk loading cancel task input is error" ) +ERROR( bulkload_check_status_input_error, 1088, "Bulk loading check task status input is error" ) +ERROR( bulkload_task_conflict, 1089, "Input bulk loading task is conflicting with existing task" ) +ERROR( bulkload_mode_not_found, 1090, "Cannot find bulk loading mode" ) +ERROR( bulkload_is_off_when_commit_task, 1091, "Bulk load mode is off when committing task" ) ERROR( broken_promise, 1100, "Broken promise" ) ERROR( operation_cancelled, 1101, "Asynchronous operation cancelled" ) diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index 6dbb6350e1f..b960e79fce0 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -152,6 +152,7 @@ if(WITH_PYTHON) add_fdb_test(TEST_FILES fast/BlobRestoreLarge.toml) add_fdb_test(TEST_FILES fast/BlobRestoreToVersion.toml) add_fdb_test(TEST_FILES fast/BlobRestoreTenantMode.toml) + add_fdb_test(TEST_FILES fast/BulkLoading.toml) add_fdb_test(TEST_FILES fast/CacheTest.toml) add_fdb_test(TEST_FILES fast/CloggedSideband.toml) add_fdb_test(TEST_FILES fast/CompressionUtilsUnit.toml IGNORE) diff --git a/tests/fast/BulkLoading.toml b/tests/fast/BulkLoading.toml new file mode 100644 index 00000000000..20c93d1ffee --- /dev/null +++ b/tests/fast/BulkLoading.toml @@ -0,0 +1,16 @@ +[configuration] +config = 'triple' +generateFearless = true +allowDefaultTenant = false +machineCount = 18 + +[[knobs]] +shard_encode_location_metadata = true + + +[[test]] +testTitle = 'BulkLoadingWorkload' +useDB = true + + [[test.workload]] + testName = 'BulkLoadingWorkload'