From 9eb81972a66637c595a56e427909beb10c73cc1c Mon Sep 17 00:00:00 2001 From: Priyank Warkhede Date: Mon, 24 Feb 2025 21:55:46 -0800 Subject: [PATCH] Create unique id for new subscriptions Summary: In ServiceHandler, while registering new subscriptions use a unique id. In next diff we will use this unique ID to pull information from Subscription objects in SubscriptionManager into Fsdb Service handler. Reviewed By: wilsonwinhi Differential Revision: D67867191 fbshipit-source-id: b8fea88859e6576f9d3de343a5fdc63ad308997e --- fboss/fsdb/if/fsdb.thrift | 1 + fboss/fsdb/server/BUCK | 1 + fboss/fsdb/server/ServiceHandler.cpp | 152 ++++++++++++++++++--------- fboss/fsdb/server/ServiceHandler.h | 18 +++- 4 files changed, 117 insertions(+), 55 deletions(-) diff --git a/fboss/fsdb/if/fsdb.thrift b/fboss/fsdb/if/fsdb.thrift index c6d4fb4ed5e73..4e37603fc8f6c 100644 --- a/fboss/fsdb/if/fsdb.thrift +++ b/fboss/fsdb/if/fsdb.thrift @@ -42,6 +42,7 @@ struct OperSubscriberInfo { // Paths for Patch apis // TODO: replace path above 7: optional map paths; + 8: optional i64 subscriptionUid; } @cpp.Type{template = "folly::F14FastMap"} diff --git a/fboss/fsdb/server/BUCK b/fboss/fsdb/server/BUCK index 377b82cacbb4d..2413cf95b8588 100644 --- a/fboss/fsdb/server/BUCK +++ b/fboss/fsdb/server/BUCK @@ -40,6 +40,7 @@ cpp_library( "//fboss/fsdb/if:fsdb_model", "//fboss/fsdb/if:fsdb_oper-cpp2-types", "//fboss/fsdb/oper:path_helpers", + "//fboss/fsdb/oper:subscription_common", "//fboss/fsdb/oper/instantiations:fsdb_naive_periodic_subscribable_storage", "//fboss/lib:log_thrift_call", "//fboss/lib:thread_heartbeat", diff --git a/fboss/fsdb/server/ServiceHandler.cpp b/fboss/fsdb/server/ServiceHandler.cpp index f84b1b4b12f48..757428cbc76b5 100644 --- a/fboss/fsdb/server/ServiceHandler.cpp +++ b/fboss/fsdb/server/ServiceHandler.cpp @@ -10,6 +10,7 @@ #include "fboss/fsdb/common/Flags.h" #include "fboss/fsdb/if/gen-cpp2/fsdb_common_constants.h" #include "fboss/fsdb/oper/PathValidator.h" +#include "fboss/fsdb/oper/SubscriptionCommon.h" #include "folly/CancellationToken.h" #include @@ -593,37 +594,47 @@ ServiceHandler::co_publishStats(std::unique_ptr request) { namespace { -OperSubscriberInfo -makeSubscriberInfo(const OperSubRequest& req, PubSubType type, bool isStats) { +OperSubscriberInfo makeSubscriberInfo( + const OperSubRequest& req, + PubSubType type, + bool isStats, + uint64_t uid) { OperSubscriberInfo info; info.subscriberId() = *req.subscriberId(); info.type() = type; info.path() = *req.path(); info.isStats() = isStats; info.subscribedSince() = static_cast(std::time(nullptr)); + info.subscriptionUid() = uid; return info; } OperSubscriberInfo makeSubscriberInfo( const OperSubRequestExtended& req, PubSubType type, - bool isStats) { + bool isStats, + uint64_t uid) { OperSubscriberInfo info; info.subscriberId() = *req.subscriberId(); info.type() = type; info.extendedPaths() = *req.paths(); info.isStats() = isStats; info.subscribedSince() = static_cast(std::time(nullptr)); + info.subscriptionUid() = uid; return info; } -OperSubscriberInfo -makeSubscriberInfo(const SubRequest& req, PubSubType type, bool isStats) { +OperSubscriberInfo makeSubscriberInfo( + const SubRequest& req, + PubSubType type, + bool isStats, + uint64_t uid) { OperSubscriberInfo info; info.subscriberId() = *req.clientId()->instanceId(); info.type() = type; info.paths() = *req.paths(); info.isStats() = isStats; + info.subscriptionUid() = uid; return info; } @@ -639,6 +650,12 @@ void validatePaths( } // namespace +SubscriptionIdentifier ServiceHandler::makeSubscriptionIdentifier( + const OperSubscriberInfo& info) { + CHECK(info.subscriptionUid().has_value()); + return SubscriptionIdentifier(*info.subscriberId(), *info.subscriptionUid()); +} + void ServiceHandler::updateSubscriptionCounters( const OperSubscriberInfo& info, bool isConnected, @@ -754,7 +771,8 @@ void ServiceHandler::unregisterSubscription(const OperSubscriberInfo& info) { folly::coro::AsyncGenerator&&> ServiceHandler::makeStateStreamGenerator( std::unique_ptr request, - bool isStats) { + bool isStats, + SubscriptionIdentifier&& subId) { SubscriptionStorageParams subscriptionParams; if (request->heartbeatInterval().has_value()) { subscriptionParams.heartbeatInterval_ = @@ -762,13 +780,13 @@ ServiceHandler::makeStateStreamGenerator( } return isStats ? operStatsStorage_.subscribe_encoded( - *request->subscriberId(), + std::move(subId), request->path()->raw()->begin(), request->path()->raw()->end(), *request->protocol(), subscriptionParams) : operStorage_.subscribe_encoded( - *request->subscriberId(), + std::move(subId), request->path()->raw()->begin(), request->path()->raw()->end(), *request->protocol(), @@ -778,7 +796,8 @@ ServiceHandler::makeStateStreamGenerator( folly::coro::AsyncGenerator>&&> ServiceHandler::makeExtendedStateStreamGenerator( std::unique_ptr request, - bool isStats) { + bool isStats, + SubscriptionIdentifier&& subId) { SubscriptionStorageParams subscriptionParams; if (request->heartbeatInterval().has_value()) { subscriptionParams.heartbeatInterval_ = @@ -786,12 +805,12 @@ ServiceHandler::makeExtendedStateStreamGenerator( } return isStats ? operStatsStorage_.subscribe_encoded_extended( - *request->subscriberId(), + std::move(subId), std::move(*request->paths()), *request->protocol(), subscriptionParams) : operStorage_.subscribe_encoded_extended( - *request->subscriberId(), + std::move(subId), std::move(*request->paths()), *request->protocol(), subscriptionParams); @@ -800,7 +819,8 @@ ServiceHandler::makeExtendedStateStreamGenerator( folly::coro::AsyncGenerator ServiceHandler::makePatchStreamGenerator( std::unique_ptr request, - bool isStats) { + bool isStats, + SubscriptionIdentifier&& subId) { SubscriptionStorageParams subscriptionParams; if (request->heartbeatInterval().has_value()) { subscriptionParams.heartbeatInterval_ = @@ -808,13 +828,9 @@ ServiceHandler::makePatchStreamGenerator( } return isStats ? operStatsStorage_.subscribe_patch( - *request->clientId()->instanceId(), - *request->paths(), - subscriptionParams) + std::move(subId), *request->paths(), subscriptionParams) : operStorage_.subscribe_patch( - *request->clientId()->instanceId(), - *request->paths(), - subscriptionParams); + std::move(subId), *request->paths(), subscriptionParams); } folly::coro::Task< @@ -824,7 +840,9 @@ ServiceHandler::co_subscribeOperStatePath( auto log = LOG_THRIFT_CALL(INFO, getRequestDetails(*request)); PathValidator::validateStatePath(*request->path()->raw()); - auto subscriberInfo = makeSubscriberInfo(*request, PubSubType::PATH, false); + auto subscriberInfo = makeSubscriberInfo( + *request, PubSubType::PATH, false, lastSubscriptionUid_.fetch_add(1)); + auto subId = makeSubscriptionIdentifier(subscriberInfo); registerSubscription(subscriberInfo, *request->forceSubscribe()); auto cleanupSubscriber = folly::makeGuard([this, subscriberInfo = std::move(subscriberInfo)]() { @@ -836,10 +854,11 @@ ServiceHandler::co_subscribeOperStatePath( folly::coro::co_invoke( [this, request = std::move(request), + subId = std::move(subId), cleanupSubscriber = std::move(cleanupSubscriber)]() mutable -> folly::coro::AsyncGenerator { - auto generator = - makeStateStreamGenerator(std::move(request), false); + auto generator = makeStateStreamGenerator( + std::move(request), false, std::move(subId)); while (auto item = co_await generator.next()) { // got value auto&& delta = *item; @@ -862,7 +881,9 @@ ServiceHandler::co_subscribeOperStatsPath( auto log = LOG_THRIFT_CALL(INFO, getRequestDetails(*request)); PathValidator::validateStatsPath(*request->path()->raw()); - auto subscriberInfo = makeSubscriberInfo(*request, PubSubType::PATH, true); + auto subscriberInfo = makeSubscriberInfo( + *request, PubSubType::PATH, true, lastSubscriptionUid_.fetch_add(1)); + auto subId = makeSubscriptionIdentifier(subscriberInfo); registerSubscription(subscriberInfo, *request->forceSubscribe()); auto cleanupSubscriber = folly::makeGuard([this, subscriberInfo = std::move(subscriberInfo)]() { @@ -874,9 +895,11 @@ ServiceHandler::co_subscribeOperStatsPath( folly::coro::co_invoke( [this, request = std::move(request), + subId = std::move(subId), cleanupSubscriber = std::move(cleanupSubscriber)]() mutable -> folly::coro::AsyncGenerator { - auto generator = makeStateStreamGenerator(std::move(request), true); + auto generator = makeStateStreamGenerator( + std::move(request), true, std::move(subId)); while (auto item = co_await generator.next()) { // got value auto&& delta = *item; @@ -895,7 +918,8 @@ ServiceHandler::co_subscribeOperStatsPath( folly::coro::AsyncGenerator ServiceHandler::makeDeltaStreamGenerator( std::unique_ptr request, - bool isStats) { + bool isStats, + SubscriptionIdentifier&& subId) { SubscriptionStorageParams subscriptionParams; if (request->heartbeatInterval().has_value()) { subscriptionParams.heartbeatInterval_ = @@ -903,13 +927,13 @@ ServiceHandler::makeDeltaStreamGenerator( } return isStats ? operStatsStorage_.subscribe_delta( - *request->subscriberId(), + std::move(subId), request->path()->raw()->begin(), request->path()->raw()->end(), *request->protocol(), subscriptionParams) : operStorage_.subscribe_delta( - *request->subscriberId(), + std::move(subId), request->path()->raw()->begin(), request->path()->raw()->end(), *request->protocol(), @@ -919,7 +943,8 @@ ServiceHandler::makeDeltaStreamGenerator( folly::coro::AsyncGenerator&&> ServiceHandler::makeExtendedDeltaStreamGenerator( std::unique_ptr request, - bool isStats) { + bool isStats, + SubscriptionIdentifier&& subId) { SubscriptionStorageParams subscriptionParams; if (request->heartbeatInterval().has_value()) { subscriptionParams.heartbeatInterval_ = @@ -927,12 +952,12 @@ ServiceHandler::makeExtendedDeltaStreamGenerator( } return isStats ? operStatsStorage_.subscribe_delta_extended( - *request->subscriberId(), + std::move(subId), *request->paths(), *request->protocol(), subscriptionParams) : operStorage_.subscribe_delta_extended( - *request->subscriberId(), + std::move(subId), *request->paths(), *request->protocol(), subscriptionParams); @@ -945,7 +970,9 @@ ServiceHandler::co_subscribeOperStateDelta( auto log = LOG_THRIFT_CALL(INFO, getRequestDetails(*request)); PathValidator::validateStatePath(*request->path()->raw()); - auto subscriberInfo = makeSubscriberInfo(*request, PubSubType::DELTA, false); + auto subscriberInfo = makeSubscriberInfo( + *request, PubSubType::DELTA, false, lastSubscriptionUid_.fetch_add(1)); + auto subId = makeSubscriptionIdentifier(subscriberInfo); registerSubscription(subscriberInfo, *request->forceSubscribe()); auto cleanupSubscriber = folly::makeGuard([this, subscriberInfo = std::move(subscriberInfo)]() { @@ -957,10 +984,11 @@ ServiceHandler::co_subscribeOperStateDelta( folly::coro::co_invoke( [this, request = std::move(request), + subId = std::move(subId), cleanupSubscriber = std::move(cleanupSubscriber)]() mutable -> folly::coro::AsyncGenerator { - auto generator = - makeDeltaStreamGenerator(std::move(request), false); + auto generator = makeDeltaStreamGenerator( + std::move(request), false, std::move(subId)); while (auto item = co_await generator.next()) { // got value co_yield std::move(*item); @@ -977,7 +1005,9 @@ ServiceHandler::co_subscribeOperStatePathExtended( PathValidator::validateExtendedStatePaths(*request->paths()); - auto subscriberInfo = makeSubscriberInfo(*request, PubSubType::PATH, false); + auto subscriberInfo = makeSubscriberInfo( + *request, PubSubType::PATH, false, lastSubscriptionUid_.fetch_add(1)); + auto subId = makeSubscriptionIdentifier(subscriberInfo); registerSubscription(subscriberInfo, *request->forceSubscribe()); auto cleanupSubscriber = folly::makeGuard([this, subscriberInfo = std::move(subscriberInfo)]() { @@ -989,10 +1019,11 @@ ServiceHandler::co_subscribeOperStatePathExtended( folly::coro::co_invoke( [this, request = std::move(request), + subId = std::move(subId), cleanupSubscriber = std::move(cleanupSubscriber)]() mutable -> folly::coro::AsyncGenerator { - auto generator = - makeExtendedStateStreamGenerator(std::move(request), false); + auto generator = makeExtendedStateStreamGenerator( + std::move(request), false, std::move(subId)); while (auto item = co_await generator.next()) { // got item auto&& deltas = *item; @@ -1023,7 +1054,9 @@ ServiceHandler::co_subscribeOperStateDeltaExtended( PathValidator::validateExtendedStatePaths(*request->paths()); - auto subscriberInfo = makeSubscriberInfo(*request, PubSubType::DELTA, false); + auto subscriberInfo = makeSubscriberInfo( + *request, PubSubType::DELTA, false, lastSubscriptionUid_.fetch_add(1)); + auto subId = makeSubscriptionIdentifier(subscriberInfo); registerSubscription(subscriberInfo, *request->forceSubscribe()); auto cleanupSubscriber = folly::makeGuard([this, subscriberInfo = std::move(subscriberInfo)]() { @@ -1035,10 +1068,11 @@ ServiceHandler::co_subscribeOperStateDeltaExtended( folly::coro::co_invoke( [this, request = std::move(request), + subId = std::move(subId), cleanupSubscriber = std::move(cleanupSubscriber)]() mutable -> folly::coro::AsyncGenerator { - auto generator = - makeExtendedDeltaStreamGenerator(std::move(request), false); + auto generator = makeExtendedDeltaStreamGenerator( + std::move(request), false, std::move(subId)); while (auto item = co_await generator.next()) { // got item auto&& delta = *item; @@ -1059,7 +1093,9 @@ ServiceHandler::co_subscribeOperStatsDelta( auto log = LOG_THRIFT_CALL(INFO, getRequestDetails(*request)); PathValidator::validateStatsPath(*request->path()->raw()); - auto subscriberInfo = makeSubscriberInfo(*request, PubSubType::DELTA, true); + auto subscriberInfo = makeSubscriberInfo( + *request, PubSubType::DELTA, true, lastSubscriptionUid_.fetch_add(1)); + auto subId = makeSubscriptionIdentifier(subscriberInfo); registerSubscription(subscriberInfo, *request->forceSubscribe()); auto cleanupSubscriber = folly::makeGuard([this, subscriberInfo = std::move(subscriberInfo)]() { @@ -1071,9 +1107,11 @@ ServiceHandler::co_subscribeOperStatsDelta( folly::coro::co_invoke( [this, request = std::move(request), + subId = std::move(subId), cleanupSubscriber = std::move(cleanupSubscriber)]() mutable -> folly::coro::AsyncGenerator { - auto generator = makeDeltaStreamGenerator(std::move(request), true); + auto generator = makeDeltaStreamGenerator( + std::move(request), true, std::move(subId)); while (auto item = co_await generator.next()) { // got value co_yield std::move(*item); @@ -1090,7 +1128,9 @@ ServiceHandler::co_subscribeOperStatsPathExtended( PathValidator::validateExtendedStatsPaths(*request->paths()); - auto subscriberInfo = makeSubscriberInfo(*request, PubSubType::PATH, true); + auto subscriberInfo = makeSubscriberInfo( + *request, PubSubType::PATH, true, lastSubscriptionUid_.fetch_add(1)); + auto subId = makeSubscriptionIdentifier(subscriberInfo); registerSubscription(subscriberInfo, *request->forceSubscribe()); auto cleanupSubscriber = folly::makeGuard([this, subscriberInfo = std::move(subscriberInfo)]() { @@ -1102,10 +1142,11 @@ ServiceHandler::co_subscribeOperStatsPathExtended( folly::coro::co_invoke( [this, request = std::move(request), + subId = std::move(subId), cleanupSubscriber = std::move(cleanupSubscriber)]() mutable -> folly::coro::AsyncGenerator { - auto generator = - makeExtendedStateStreamGenerator(std::move(request), true); + auto generator = makeExtendedStateStreamGenerator( + std::move(request), true, std::move(subId)); while (auto item = co_await generator.next()) { // got item auto&& deltas = *item; @@ -1136,7 +1177,9 @@ ServiceHandler::co_subscribeOperStatsDeltaExtended( PathValidator::validateExtendedStatsPaths(*request->paths()); - auto subscriberInfo = makeSubscriberInfo(*request, PubSubType::DELTA, true); + auto subscriberInfo = makeSubscriberInfo( + *request, PubSubType::DELTA, true, lastSubscriptionUid_.fetch_add(1)); + auto subId = makeSubscriptionIdentifier(subscriberInfo); registerSubscription(subscriberInfo, *request->forceSubscribe()); auto cleanupSubscriber = folly::makeGuard([this, subscriberInfo = std::move(subscriberInfo)]() { @@ -1148,10 +1191,11 @@ ServiceHandler::co_subscribeOperStatsDeltaExtended( folly::coro::co_invoke( [this, request = std::move(request), + subId = std::move(subId), cleanupSubscriber = std::move(cleanupSubscriber)]() mutable -> folly::coro::AsyncGenerator { - auto generator = - makeExtendedDeltaStreamGenerator(std::move(request), true); + auto generator = makeExtendedDeltaStreamGenerator( + std::move(request), true, std::move(subId)); while (auto item = co_await generator.next()) { // got item auto&& delta = *item; @@ -1171,7 +1215,9 @@ folly::coro::Task request) { auto log = LOG_THRIFT_CALL(INFO, getRequestDetails(*request)); validatePaths(*request->paths(), false); - auto subscriberInfo = makeSubscriberInfo(*request, PubSubType::PATCH, false); + auto subscriberInfo = makeSubscriberInfo( + *request, PubSubType::PATCH, false, lastSubscriptionUid_.fetch_add(1)); + auto subId = makeSubscriptionIdentifier(subscriberInfo); registerSubscription(subscriberInfo, *request->forceSubscribe()); auto cleanupSubscriber = folly::makeGuard([this, subscriberInfo = std::move(subscriberInfo)]() { @@ -1180,9 +1226,11 @@ ServiceHandler::co_subscribeState(std::unique_ptr request) { auto stream = folly::coro::co_invoke( [this, request = std::move(request), + subId = std::move(subId), cleanupSubscriber = std::move(cleanupSubscriber)]() mutable -> folly::coro::AsyncGenerator { - return makePatchStreamGenerator(std::move(request), false); + return makePatchStreamGenerator( + std::move(request), false, std::move(subId)); }); co_return {{}, std::move(stream)}; } @@ -1193,7 +1241,9 @@ folly::coro::Task request) { auto log = LOG_THRIFT_CALL(INFO, getRequestDetails(*request)); validatePaths(*request->paths(), true); - auto subscriberInfo = makeSubscriberInfo(*request, PubSubType::PATCH, true); + auto subscriberInfo = makeSubscriberInfo( + *request, PubSubType::PATCH, true, lastSubscriptionUid_.fetch_add(1)); + auto subId = makeSubscriptionIdentifier(subscriberInfo); registerSubscription(subscriberInfo, *request->forceSubscribe()); auto cleanupSubscriber = folly::makeGuard([this, subscriberInfo = std::move(subscriberInfo)]() { @@ -1202,9 +1252,11 @@ ServiceHandler::co_subscribeStats(std::unique_ptr request) { auto stream = folly::coro::co_invoke( [this, request = std::move(request), + subId = std::move(subId), cleanupSubscriber = std::move(cleanupSubscriber)]() mutable -> folly::coro::AsyncGenerator { - return makePatchStreamGenerator(std::move(request), true); + return makePatchStreamGenerator( + std::move(request), true, std::move(subId)); }); co_return {{}, std::move(stream)}; } diff --git a/fboss/fsdb/server/ServiceHandler.h b/fboss/fsdb/server/ServiceHandler.h index fd903bc8a9f0a..e85ccdf6061ee 100644 --- a/fboss/fsdb/server/ServiceHandler.h +++ b/fboss/fsdb/server/ServiceHandler.h @@ -210,6 +210,8 @@ class ServiceHandler : public FsdbServiceSvIf, void preStart(const folly::SocketAddress* /*address*/) override; private: + SubscriptionIdentifier makeSubscriptionIdentifier( + const OperSubscriberInfo& info); void registerSubscription( const OperSubscriberInfo& info, bool forceSubscribe = false); @@ -230,25 +232,30 @@ class ServiceHandler : public FsdbServiceSvIf, folly::coro::AsyncGenerator&&> makeStateStreamGenerator( std::unique_ptr request, - bool isStats); + bool isStats, + SubscriptionIdentifier&& subId); folly::coro::AsyncGenerator makeDeltaStreamGenerator( std::unique_ptr request, - bool isStats); + bool isStats, + SubscriptionIdentifier&& subId); folly::coro::AsyncGenerator>&&> makeExtendedStateStreamGenerator( std::unique_ptr request, - bool isStats); + bool isStats, + SubscriptionIdentifier&& subId); folly::coro::AsyncGenerator makePatchStreamGenerator( std::unique_ptr request, - bool isStats); + bool isStats, + SubscriptionIdentifier&& subId); folly::coro::AsyncGenerator&&> makeExtendedDeltaStreamGenerator( std::unique_ptr request, - bool isStats); + bool isStats, + SubscriptionIdentifier&& subId); OperPublisherInfo makePublisherInfo( const RawPathT& path, @@ -311,6 +318,7 @@ class ServiceHandler : public FsdbServiceSvIf, folly::Synchronized activeSubscriptions_; folly::Synchronized activePublishers_; std::shared_ptr server_; + std::atomic lastSubscriptionUid_{0}; }; } // namespace facebook::fboss::fsdb