From f7885403d957b7cf2e6617b55f96e03f240f5b8a Mon Sep 17 00:00:00 2001 From: Priyank Warkhede Date: Mon, 24 Feb 2025 21:55:46 -0800 Subject: [PATCH] SubscribableStorage APIs with SubscriptionIdentifier Summary: Update SubscribableStorage subscription method impls to use SubscriptionIdentifier, and add corresponding flavors of subscribe_*() methods. Reviewed By: wilsonwinhi Differential Revision: D67867192 fbshipit-source-id: 2625679bc271ebabe1ca2556d23eeff120945393 --- fboss/fsdb/oper/BUCK | 1 + .../NaivePeriodicSubscribableStorageBase.cpp | 24 +++--- .../NaivePeriodicSubscribableStorageBase.h | 22 ++--- fboss/fsdb/oper/SubscribableStorage.h | 83 +++++++++++++++++-- 4 files changed, 102 insertions(+), 28 deletions(-) diff --git a/fboss/fsdb/oper/BUCK b/fboss/fsdb/oper/BUCK index 300799d9860ed..5df65f5a9e942 100644 --- a/fboss/fsdb/oper/BUCK +++ b/fboss/fsdb/oper/BUCK @@ -88,6 +88,7 @@ cpp_library( exported_deps = [ ":delta_value", ":path_helpers", + ":subscription_common", ":subscription_manager", "//common/base:proc", "//fb303:thread_cached_service_data", diff --git a/fboss/fsdb/oper/NaivePeriodicSubscribableStorageBase.cpp b/fboss/fsdb/oper/NaivePeriodicSubscribableStorageBase.cpp index 285adf9ebe4fc..95eec1724d9d4 100644 --- a/fboss/fsdb/oper/NaivePeriodicSubscribableStorageBase.cpp +++ b/fboss/fsdb/oper/NaivePeriodicSubscribableStorageBase.cpp @@ -286,7 +286,7 @@ NaivePeriodicSubscribableStorageBase::convertExtPaths( folly::coro::AsyncGenerator&&> NaivePeriodicSubscribableStorageBase::subscribe_encoded_impl( - SubscriberId subscriber, + SubscriptionIdentifier&& subscriber, PathIter begin, PathIter end, OperProtocol protocol, @@ -298,7 +298,7 @@ NaivePeriodicSubscribableStorageBase::subscribe_encoded_impl( heartbeatInterval = subscriptionParams->heartbeatInterval_.value(); } auto [gen, subscription] = PathSubscription::create( - SubscriptionIdentifier(std::move(subscriber)), + std::move(subscriber), path.begin(), path.end(), protocol, @@ -311,7 +311,7 @@ NaivePeriodicSubscribableStorageBase::subscribe_encoded_impl( folly::coro::AsyncGenerator NaivePeriodicSubscribableStorageBase::subscribe_delta_impl( - SubscriberId subscriber, + SubscriptionIdentifier&& subscriber, PathIter begin, PathIter end, OperProtocol protocol, @@ -323,7 +323,7 @@ NaivePeriodicSubscribableStorageBase::subscribe_delta_impl( heartbeatInterval = subscriptionParams->heartbeatInterval_.value(); } auto [gen, subscription] = DeltaSubscription::create( - SubscriptionIdentifier(std::move(subscriber)), + std::move(subscriber), path.begin(), path.end(), protocol, @@ -336,7 +336,7 @@ NaivePeriodicSubscribableStorageBase::subscribe_delta_impl( folly::coro::AsyncGenerator>&&> NaivePeriodicSubscribableStorageBase::subscribe_encoded_extended_impl( - SubscriberId subscriber, + SubscriptionIdentifier&& subscriber, std::vector paths, OperProtocol protocol, std::optional subscriptionParams) { @@ -348,7 +348,7 @@ NaivePeriodicSubscribableStorageBase::subscribe_encoded_extended_impl( } auto publisherRoot = getPublisherRoot(paths); auto [gen, subscription] = ExtendedPathSubscription::create( - SubscriptionIdentifier(std::move(subscriber)), + std::move(subscriber), std::move(paths), std::move(publisherRoot), protocol, @@ -360,7 +360,7 @@ NaivePeriodicSubscribableStorageBase::subscribe_encoded_extended_impl( folly::coro::AsyncGenerator&&> NaivePeriodicSubscribableStorageBase::subscribe_delta_extended_impl( - SubscriberId subscriber, + SubscriptionIdentifier&& subscriber, std::vector paths, OperProtocol protocol, std::optional subscriptionParams) { @@ -372,7 +372,7 @@ NaivePeriodicSubscribableStorageBase::subscribe_delta_extended_impl( } auto publisherRoot = getPublisherRoot(paths); auto [gen, subscription] = ExtendedDeltaSubscription::create( - SubscriptionIdentifier(std::move(subscriber)), + std::move(subscriber), std::move(paths), std::move(publisherRoot), protocol, @@ -384,7 +384,7 @@ NaivePeriodicSubscribableStorageBase::subscribe_delta_extended_impl( folly::coro::AsyncGenerator NaivePeriodicSubscribableStorageBase::subscribe_patch_impl( - SubscriberId subscriber, + SubscriptionIdentifier&& subscriber, std::map rawPaths, std::optional subscriptionParams) { for (auto& [key, path] : rawPaths) { @@ -398,7 +398,7 @@ NaivePeriodicSubscribableStorageBase::subscribe_patch_impl( } auto root = getPublisherRoot(rawPaths); auto [gen, subscription] = ExtendedPatchSubscription::create( - SubscriptionIdentifier(std::move(subscriber)), + std::move(subscriber), std::move(rawPaths), patchOperProtocol_, std::move(root), @@ -410,7 +410,7 @@ NaivePeriodicSubscribableStorageBase::subscribe_patch_impl( folly::coro::AsyncGenerator NaivePeriodicSubscribableStorageBase::subscribe_patch_extended_impl( - SubscriberId subscriber, + SubscriptionIdentifier&& subscriber, std::map paths, std::optional subscriptionParams) { for (auto& [key, path] : paths) { @@ -424,7 +424,7 @@ NaivePeriodicSubscribableStorageBase::subscribe_patch_extended_impl( } auto root = getPublisherRoot(paths); auto [gen, subscription] = ExtendedPatchSubscription::create( - SubscriptionIdentifier(std::move(subscriber)), + std::move(subscriber), std::move(paths), patchOperProtocol_, std::move(root), diff --git a/fboss/fsdb/oper/NaivePeriodicSubscribableStorageBase.h b/fboss/fsdb/oper/NaivePeriodicSubscribableStorageBase.h index 8b6994c21dbd8..15f4579bc3fa2 100644 --- a/fboss/fsdb/oper/NaivePeriodicSubscribableStorageBase.h +++ b/fboss/fsdb/oper/NaivePeriodicSubscribableStorageBase.h @@ -82,10 +82,12 @@ class NaivePeriodicSubscribableStorageBase { FsdbErrorCode disconnectReason = FsdbErrorCode::ALL_PUBLISHERS_GONE); template - folly::coro::AsyncGenerator&&> - subscribe_impl(SubscriberId subscriber, PathIter begin, PathIter end) { - auto sourceGen = - subscribe_encoded_impl(subscriber, begin, end, OperProtocol::BINARY); + folly::coro::AsyncGenerator&&> subscribe_impl( + SubscriptionIdentifier&& subscriber, + PathIter begin, + PathIter end) { + auto sourceGen = subscribe_encoded_impl( + std::move(subscriber), begin, end, OperProtocol::BINARY); return folly::coro::co_invoke( [&, gen = std::move(sourceGen)]() mutable -> folly::coro::AsyncGenerator&&> { @@ -110,7 +112,7 @@ class NaivePeriodicSubscribableStorageBase { } folly::coro::AsyncGenerator&&> subscribe_encoded_impl( - SubscriberId subscriber, + SubscriptionIdentifier&& subscriber, PathIter begin, PathIter end, OperProtocol protocol, @@ -118,7 +120,7 @@ class NaivePeriodicSubscribableStorageBase { std::nullopt); folly::coro::AsyncGenerator subscribe_delta_impl( - SubscriberId subscriber, + SubscriptionIdentifier&& subscriber, PathIter begin, PathIter end, OperProtocol protocol, @@ -127,7 +129,7 @@ class NaivePeriodicSubscribableStorageBase { folly::coro::AsyncGenerator>&&> subscribe_encoded_extended_impl( - SubscriberId subscriber, + SubscriptionIdentifier&& subscriber, std::vector paths, OperProtocol protocol, std::optional subscriptionParams = @@ -135,21 +137,21 @@ class NaivePeriodicSubscribableStorageBase { folly::coro::AsyncGenerator&&> subscribe_delta_extended_impl( - SubscriberId subscriber, + SubscriptionIdentifier&& subscriber, std::vector paths, OperProtocol protocol, std::optional subscriptionParams = std::nullopt); folly::coro::AsyncGenerator subscribe_patch_impl( - SubscriberId subscriber, + SubscriptionIdentifier&& subscriber, std::map rawPaths, std::optional subscriptionParams = std::nullopt); folly::coro::AsyncGenerator subscribe_patch_extended_impl( - SubscriberId subscriber, + SubscriptionIdentifier&& subscriber, std::map paths, std::optional subscriptionParams = std::nullopt); diff --git a/fboss/fsdb/oper/SubscribableStorage.h b/fboss/fsdb/oper/SubscribableStorage.h index f8f1d706f94f1..8a561f7a1f48c 100644 --- a/fboss/fsdb/oper/SubscribableStorage.h +++ b/fboss/fsdb/oper/SubscribableStorage.h @@ -5,6 +5,7 @@ #include #include +#include #include #include #include @@ -77,7 +78,7 @@ class SubscribableStorage { folly::coro::AsyncGenerator&&> subscribe(SubscriberId subscriber, PathIter begin, PathIter end) { return static_cast(this)->template subscribe_impl( - subscriber, begin, end); + SubscriptionIdentifier(std::move(subscriber)), begin, end); } template @@ -117,7 +118,33 @@ class SubscribableStorage { std::optional subscriptionParams = std::nullopt) { return static_cast(this)->subscribe_encoded_impl( - subscriber, begin, end, protocol, std::move(subscriptionParams)); + SubscriptionIdentifier(std::move(subscriber)), + begin, + end, + protocol, + subscriptionParams); + } + + folly::coro::AsyncGenerator&&> subscribe_encoded( + SubscriptionIdentifier&& subscriber, + PathIter begin, + PathIter end, + OperProtocol protocol, + std::optional subscriptionParams = + std::nullopt) { + return static_cast(this)->subscribe_encoded_impl( + std::move(subscriber), begin, end, protocol, subscriptionParams); + } + + folly::coro::AsyncGenerator>&&> + subscribe_encoded_extended( + SubscriptionIdentifier&& subscriber, + std::vector paths, + OperProtocol protocol, + std::optional subscriptionParams = + std::nullopt) { + return static_cast(this)->subscribe_encoded_extended_impl( + std::move(subscriber), std::move(paths), protocol, subscriptionParams); } folly::coro::AsyncGenerator>&&> @@ -128,7 +155,10 @@ class SubscribableStorage { std::optional subscriptionParams = std::nullopt) { return static_cast(this)->subscribe_encoded_extended_impl( - subscriber, std::move(paths), protocol, std::move(subscriptionParams)); + SubscriptionIdentifier(std::move(subscriber)), + std::move(paths), + protocol, + subscriptionParams); } template @@ -147,6 +177,7 @@ class SubscribableStorage { protocol, std::move(subscriptionParams)); } + folly::coro::AsyncGenerator subscribe_delta( SubscriberId subscriber, const ConcretePath& path, @@ -160,6 +191,7 @@ class SubscribableStorage { protocol, std::move(subscriptionParams)); } + folly::coro::AsyncGenerator subscribe_delta( SubscriberId subscriber, PathIter begin, @@ -168,7 +200,22 @@ class SubscribableStorage { std::optional subscriptionParams = std::nullopt) { return static_cast(this)->subscribe_delta_impl( - subscriber, begin, end, protocol, std::move(subscriptionParams)); + SubscriptionIdentifier(std::move(subscriber)), + begin, + end, + protocol, + subscriptionParams); + } + + folly::coro::AsyncGenerator subscribe_delta( + SubscriptionIdentifier&& subscriber, + PathIter begin, + PathIter end, + OperProtocol protocol, + std::optional subscriptionParams = + std::nullopt) { + return static_cast(this)->subscribe_delta_impl( + std::move(subscriber), begin, end, protocol, subscriptionParams); } folly::coro::AsyncGenerator&&> @@ -179,7 +226,21 @@ class SubscribableStorage { std::optional subscriptionParams = std::nullopt) { return static_cast(this)->subscribe_delta_extended_impl( - subscriber, std::move(paths), protocol, std::move(subscriptionParams)); + SubscriptionIdentifier(std::move(subscriber)), + std::move(paths), + protocol, + subscriptionParams); + } + + folly::coro::AsyncGenerator&&> + subscribe_delta_extended( + SubscriptionIdentifier&& subscriber, + std::vector paths, + OperProtocol protocol, + std::optional subscriptionParams = + std::nullopt) { + return static_cast(this)->subscribe_delta_extended_impl( + std::move(subscriber), std::move(paths), protocol, subscriptionParams); } template @@ -227,6 +288,16 @@ class SubscribableStorage { std::map rawPaths, std::optional subscriptionParams = std::nullopt) { + return static_cast(this)->subscribe_patch_impl( + SubscriptionIdentifier(std::move(subscriber)), + std::move(rawPaths), + subscriptionParams); + } + folly::coro::AsyncGenerator subscribe_patch( + SubscriptionIdentifier&& subscriber, + std::map rawPaths, + std::optional subscriptionParams = + std::nullopt) { return static_cast(this)->subscribe_patch_impl( std::move(subscriber), std::move(rawPaths), @@ -238,7 +309,7 @@ class SubscribableStorage { std::optional subscriptionParams = std::nullopt) { return static_cast(this)->subscribe_patch_extended_impl( - std::move(subscriber), + SubscriptionIdentifier(std::move(subscriber)), std::move(rawPaths), std::move(subscriptionParams)); }