Skip to content

Commit

Permalink
SubscribableStorage APIs with SubscriptionIdentifier
Browse files Browse the repository at this point in the history
Summary: Update SubscribableStorage subscription method impls to use SubscriptionIdentifier, and add corresponding flavors of subscribe_*() methods.

Reviewed By: wilsonwinhi

Differential Revision: D67867192

fbshipit-source-id: 2625679bc271ebabe1ca2556d23eeff120945393
  • Loading branch information
Priyank Warkhede authored and facebook-github-bot committed Feb 25, 2025
1 parent 814aff5 commit f788540
Show file tree
Hide file tree
Showing 4 changed files with 102 additions and 28 deletions.
1 change: 1 addition & 0 deletions fboss/fsdb/oper/BUCK
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ cpp_library(
exported_deps = [
":delta_value",
":path_helpers",
":subscription_common",
":subscription_manager",
"//common/base:proc",
"//fb303:thread_cached_service_data",
Expand Down
24 changes: 12 additions & 12 deletions fboss/fsdb/oper/NaivePeriodicSubscribableStorageBase.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,7 @@ NaivePeriodicSubscribableStorageBase::convertExtPaths(

folly::coro::AsyncGenerator<DeltaValue<OperState>&&>
NaivePeriodicSubscribableStorageBase::subscribe_encoded_impl(
SubscriberId subscriber,
SubscriptionIdentifier&& subscriber,
PathIter begin,
PathIter end,
OperProtocol protocol,
Expand All @@ -298,7 +298,7 @@ NaivePeriodicSubscribableStorageBase::subscribe_encoded_impl(
heartbeatInterval = subscriptionParams->heartbeatInterval_.value();
}
auto [gen, subscription] = PathSubscription::create(
SubscriptionIdentifier(std::move(subscriber)),
std::move(subscriber),
path.begin(),
path.end(),
protocol,
Expand All @@ -311,7 +311,7 @@ NaivePeriodicSubscribableStorageBase::subscribe_encoded_impl(

folly::coro::AsyncGenerator<OperDelta&&>
NaivePeriodicSubscribableStorageBase::subscribe_delta_impl(
SubscriberId subscriber,
SubscriptionIdentifier&& subscriber,
PathIter begin,
PathIter end,
OperProtocol protocol,
Expand All @@ -323,7 +323,7 @@ NaivePeriodicSubscribableStorageBase::subscribe_delta_impl(
heartbeatInterval = subscriptionParams->heartbeatInterval_.value();
}
auto [gen, subscription] = DeltaSubscription::create(
SubscriptionIdentifier(std::move(subscriber)),
std::move(subscriber),
path.begin(),
path.end(),
protocol,
Expand All @@ -336,7 +336,7 @@ NaivePeriodicSubscribableStorageBase::subscribe_delta_impl(

folly::coro::AsyncGenerator<std::vector<DeltaValue<TaggedOperState>>&&>
NaivePeriodicSubscribableStorageBase::subscribe_encoded_extended_impl(
SubscriberId subscriber,
SubscriptionIdentifier&& subscriber,
std::vector<ExtendedOperPath> paths,
OperProtocol protocol,
std::optional<SubscriptionStorageParams> subscriptionParams) {
Expand All @@ -348,7 +348,7 @@ NaivePeriodicSubscribableStorageBase::subscribe_encoded_extended_impl(
}
auto publisherRoot = getPublisherRoot(paths);
auto [gen, subscription] = ExtendedPathSubscription::create(
SubscriptionIdentifier(std::move(subscriber)),
std::move(subscriber),
std::move(paths),
std::move(publisherRoot),
protocol,
Expand All @@ -360,7 +360,7 @@ NaivePeriodicSubscribableStorageBase::subscribe_encoded_extended_impl(

folly::coro::AsyncGenerator<std::vector<TaggedOperDelta>&&>
NaivePeriodicSubscribableStorageBase::subscribe_delta_extended_impl(
SubscriberId subscriber,
SubscriptionIdentifier&& subscriber,
std::vector<ExtendedOperPath> paths,
OperProtocol protocol,
std::optional<SubscriptionStorageParams> subscriptionParams) {
Expand All @@ -372,7 +372,7 @@ NaivePeriodicSubscribableStorageBase::subscribe_delta_extended_impl(
}
auto publisherRoot = getPublisherRoot(paths);
auto [gen, subscription] = ExtendedDeltaSubscription::create(
SubscriptionIdentifier(std::move(subscriber)),
std::move(subscriber),
std::move(paths),
std::move(publisherRoot),
protocol,
Expand All @@ -384,7 +384,7 @@ NaivePeriodicSubscribableStorageBase::subscribe_delta_extended_impl(

folly::coro::AsyncGenerator<SubscriberMessage&&>
NaivePeriodicSubscribableStorageBase::subscribe_patch_impl(
SubscriberId subscriber,
SubscriptionIdentifier&& subscriber,
std::map<SubscriptionKey, RawOperPath> rawPaths,
std::optional<SubscriptionStorageParams> subscriptionParams) {
for (auto& [key, path] : rawPaths) {
Expand All @@ -398,7 +398,7 @@ NaivePeriodicSubscribableStorageBase::subscribe_patch_impl(
}
auto root = getPublisherRoot(rawPaths);
auto [gen, subscription] = ExtendedPatchSubscription::create(
SubscriptionIdentifier(std::move(subscriber)),
std::move(subscriber),
std::move(rawPaths),
patchOperProtocol_,
std::move(root),
Expand All @@ -410,7 +410,7 @@ NaivePeriodicSubscribableStorageBase::subscribe_patch_impl(

folly::coro::AsyncGenerator<SubscriberMessage&&>
NaivePeriodicSubscribableStorageBase::subscribe_patch_extended_impl(
SubscriberId subscriber,
SubscriptionIdentifier&& subscriber,
std::map<SubscriptionKey, ExtendedOperPath> paths,
std::optional<SubscriptionStorageParams> subscriptionParams) {
for (auto& [key, path] : paths) {
Expand All @@ -424,7 +424,7 @@ NaivePeriodicSubscribableStorageBase::subscribe_patch_extended_impl(
}
auto root = getPublisherRoot(paths);
auto [gen, subscription] = ExtendedPatchSubscription::create(
SubscriptionIdentifier(std::move(subscriber)),
std::move(subscriber),
std::move(paths),
patchOperProtocol_,
std::move(root),
Expand Down
22 changes: 12 additions & 10 deletions fboss/fsdb/oper/NaivePeriodicSubscribableStorageBase.h
Original file line number Diff line number Diff line change
Expand Up @@ -82,10 +82,12 @@ class NaivePeriodicSubscribableStorageBase {
FsdbErrorCode disconnectReason = FsdbErrorCode::ALL_PUBLISHERS_GONE);

template <typename T, typename TC>
folly::coro::AsyncGenerator<DeltaValue<T>&&>
subscribe_impl(SubscriberId subscriber, PathIter begin, PathIter end) {
auto sourceGen =
subscribe_encoded_impl(subscriber, begin, end, OperProtocol::BINARY);
folly::coro::AsyncGenerator<DeltaValue<T>&&> subscribe_impl(
SubscriptionIdentifier&& subscriber,
PathIter begin,
PathIter end) {
auto sourceGen = subscribe_encoded_impl(
std::move(subscriber), begin, end, OperProtocol::BINARY);
return folly::coro::co_invoke(
[&, gen = std::move(sourceGen)]() mutable
-> folly::coro::AsyncGenerator<DeltaValue<T>&&> {
Expand All @@ -110,15 +112,15 @@ class NaivePeriodicSubscribableStorageBase {
}

folly::coro::AsyncGenerator<DeltaValue<OperState>&&> subscribe_encoded_impl(
SubscriberId subscriber,
SubscriptionIdentifier&& subscriber,
PathIter begin,
PathIter end,
OperProtocol protocol,
std::optional<SubscriptionStorageParams> subscriptionParams =
std::nullopt);

folly::coro::AsyncGenerator<OperDelta&&> subscribe_delta_impl(
SubscriberId subscriber,
SubscriptionIdentifier&& subscriber,
PathIter begin,
PathIter end,
OperProtocol protocol,
Expand All @@ -127,29 +129,29 @@ class NaivePeriodicSubscribableStorageBase {

folly::coro::AsyncGenerator<std::vector<DeltaValue<TaggedOperState>>&&>
subscribe_encoded_extended_impl(
SubscriberId subscriber,
SubscriptionIdentifier&& subscriber,
std::vector<ExtendedOperPath> paths,
OperProtocol protocol,
std::optional<SubscriptionStorageParams> subscriptionParams =
std::nullopt);

folly::coro::AsyncGenerator<std::vector<TaggedOperDelta>&&>
subscribe_delta_extended_impl(
SubscriberId subscriber,
SubscriptionIdentifier&& subscriber,
std::vector<ExtendedOperPath> paths,
OperProtocol protocol,
std::optional<SubscriptionStorageParams> subscriptionParams =
std::nullopt);

folly::coro::AsyncGenerator<SubscriberMessage&&> subscribe_patch_impl(
SubscriberId subscriber,
SubscriptionIdentifier&& subscriber,
std::map<SubscriptionKey, RawOperPath> rawPaths,
std::optional<SubscriptionStorageParams> subscriptionParams =
std::nullopt);

folly::coro::AsyncGenerator<SubscriberMessage&&>
subscribe_patch_extended_impl(
SubscriberId subscriber,
SubscriptionIdentifier&& subscriber,
std::map<SubscriptionKey, ExtendedOperPath> paths,
std::optional<SubscriptionStorageParams> subscriptionParams =
std::nullopt);
Expand Down
83 changes: 77 additions & 6 deletions fboss/fsdb/oper/SubscribableStorage.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
#include <atomic>

#include <fboss/fsdb/oper/DeltaValue.h>
#include <fboss/fsdb/oper/SubscriptionCommon.h>
#include <fboss/thrift_cow/storage/Storage.h>
#include <folly/Expected.h>
#include <folly/coro/AsyncGenerator.h>
Expand Down Expand Up @@ -77,7 +78,7 @@ class SubscribableStorage {
folly::coro::AsyncGenerator<DeltaValue<T>&&>
subscribe(SubscriberId subscriber, PathIter begin, PathIter end) {
return static_cast<Impl*>(this)->template subscribe_impl<T, TC>(
subscriber, begin, end);
SubscriptionIdentifier(std::move(subscriber)), begin, end);
}

template <typename Path>
Expand Down Expand Up @@ -117,7 +118,33 @@ class SubscribableStorage {
std::optional<SubscriptionStorageParams> subscriptionParams =
std::nullopt) {
return static_cast<Impl*>(this)->subscribe_encoded_impl(
subscriber, begin, end, protocol, std::move(subscriptionParams));
SubscriptionIdentifier(std::move(subscriber)),
begin,
end,
protocol,
subscriptionParams);
}

folly::coro::AsyncGenerator<DeltaValue<OperState>&&> subscribe_encoded(
SubscriptionIdentifier&& subscriber,
PathIter begin,
PathIter end,
OperProtocol protocol,
std::optional<SubscriptionStorageParams> subscriptionParams =
std::nullopt) {
return static_cast<Impl*>(this)->subscribe_encoded_impl(
std::move(subscriber), begin, end, protocol, subscriptionParams);
}

folly::coro::AsyncGenerator<std::vector<DeltaValue<TaggedOperState>>&&>
subscribe_encoded_extended(
SubscriptionIdentifier&& subscriber,
std::vector<ExtendedOperPath> paths,
OperProtocol protocol,
std::optional<SubscriptionStorageParams> subscriptionParams =
std::nullopt) {
return static_cast<Impl*>(this)->subscribe_encoded_extended_impl(
std::move(subscriber), std::move(paths), protocol, subscriptionParams);
}

folly::coro::AsyncGenerator<std::vector<DeltaValue<TaggedOperState>>&&>
Expand All @@ -128,7 +155,10 @@ class SubscribableStorage {
std::optional<SubscriptionStorageParams> subscriptionParams =
std::nullopt) {
return static_cast<Impl*>(this)->subscribe_encoded_extended_impl(
subscriber, std::move(paths), protocol, std::move(subscriptionParams));
SubscriptionIdentifier(std::move(subscriber)),
std::move(paths),
protocol,
subscriptionParams);
}

template <typename Path>
Expand All @@ -147,6 +177,7 @@ class SubscribableStorage {
protocol,
std::move(subscriptionParams));
}

folly::coro::AsyncGenerator<OperDelta&&> subscribe_delta(
SubscriberId subscriber,
const ConcretePath& path,
Expand All @@ -160,6 +191,7 @@ class SubscribableStorage {
protocol,
std::move(subscriptionParams));
}

folly::coro::AsyncGenerator<OperDelta&&> subscribe_delta(
SubscriberId subscriber,
PathIter begin,
Expand All @@ -168,7 +200,22 @@ class SubscribableStorage {
std::optional<SubscriptionStorageParams> subscriptionParams =
std::nullopt) {
return static_cast<Impl*>(this)->subscribe_delta_impl(
subscriber, begin, end, protocol, std::move(subscriptionParams));
SubscriptionIdentifier(std::move(subscriber)),
begin,
end,
protocol,
subscriptionParams);
}

folly::coro::AsyncGenerator<OperDelta&&> subscribe_delta(
SubscriptionIdentifier&& subscriber,
PathIter begin,
PathIter end,
OperProtocol protocol,
std::optional<SubscriptionStorageParams> subscriptionParams =
std::nullopt) {
return static_cast<Impl*>(this)->subscribe_delta_impl(
std::move(subscriber), begin, end, protocol, subscriptionParams);
}

folly::coro::AsyncGenerator<std::vector<TaggedOperDelta>&&>
Expand All @@ -179,7 +226,21 @@ class SubscribableStorage {
std::optional<SubscriptionStorageParams> subscriptionParams =
std::nullopt) {
return static_cast<Impl*>(this)->subscribe_delta_extended_impl(
subscriber, std::move(paths), protocol, std::move(subscriptionParams));
SubscriptionIdentifier(std::move(subscriber)),
std::move(paths),
protocol,
subscriptionParams);
}

folly::coro::AsyncGenerator<std::vector<TaggedOperDelta>&&>
subscribe_delta_extended(
SubscriptionIdentifier&& subscriber,
std::vector<ExtendedOperPath> paths,
OperProtocol protocol,
std::optional<SubscriptionStorageParams> subscriptionParams =
std::nullopt) {
return static_cast<Impl*>(this)->subscribe_delta_extended_impl(
std::move(subscriber), std::move(paths), protocol, subscriptionParams);
}

template <typename Path>
Expand Down Expand Up @@ -227,6 +288,16 @@ class SubscribableStorage {
std::map<SubscriptionKey, RawOperPath> rawPaths,
std::optional<SubscriptionStorageParams> subscriptionParams =
std::nullopt) {
return static_cast<Impl*>(this)->subscribe_patch_impl(
SubscriptionIdentifier(std::move(subscriber)),
std::move(rawPaths),
subscriptionParams);
}
folly::coro::AsyncGenerator<SubscriberMessage&&> subscribe_patch(
SubscriptionIdentifier&& subscriber,
std::map<SubscriptionKey, RawOperPath> rawPaths,
std::optional<SubscriptionStorageParams> subscriptionParams =
std::nullopt) {
return static_cast<Impl*>(this)->subscribe_patch_impl(
std::move(subscriber),
std::move(rawPaths),
Expand All @@ -238,7 +309,7 @@ class SubscribableStorage {
std::optional<SubscriptionStorageParams> subscriptionParams =
std::nullopt) {
return static_cast<Impl*>(this)->subscribe_patch_extended_impl(
std::move(subscriber),
SubscriptionIdentifier(std::move(subscriber)),
std::move(rawPaths),
std::move(subscriptionParams));
}
Expand Down

0 comments on commit f788540

Please sign in to comment.