Skip to content

Commit

Permalink
Allow subscribing using Patches
Browse files Browse the repository at this point in the history
Summary:
Adding an option to subscribe using Patches instead. Utilizing FsdbSubManager which handles the data parsing for us. Once I have confidence of the functionality here, I will benchmark the difference and clean up the old Path subscription

The LoadBalancer change was needed to work with some of these visitors. Cannot have a dependant type named Fields which overshadowes the actual Fields struct

Differential Revision: D58571195

fbshipit-source-id: 3bb8eaa05fce9e85e85963ebead78c152e80c8e6
  • Loading branch information
Peyman Gardideh authored and facebook-github-bot committed Aug 22, 2024
1 parent ebd90b6 commit e5805ea
Show file tree
Hide file tree
Showing 10 changed files with 190 additions and 39 deletions.
1 change: 1 addition & 0 deletions cmake/Agent.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,7 @@ add_library(core
fboss/agent/FabricConnectivityManager.cpp
fboss/agent/EncapIndexAllocator.cpp
fboss/agent/FibHelpers.cpp
fboss/agent/FsdbAdaptedSubManager.cpp
fboss/agent/HwAsicTable.cpp
fboss/agent/HwSwitch.cpp
fboss/agent/HwSwitchConnectionStatusTable.cpp
Expand Down
4 changes: 4 additions & 0 deletions fboss/agent/AgentFeatures.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,10 @@ DEFINE_uint32(
dsf_gr_hold_time,
0,
"GR hold time for FSDB DsfSubscription in sec");
DEFINE_bool(
dsf_subscribe_patch,
false,
"Subscribe to remote FSDB using Patch apis");
// Remote neighbor entries are always flushed to avoid blackholing the traffic.
// However, by default, remote{systemPorts, Rifs} are not flushed but marked
// STALE in the software. This is to avoid hardware programmign churn.
Expand Down
1 change: 1 addition & 0 deletions fboss/agent/AgentFeatures.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ DECLARE_bool(dsf_flush_remote_sysports_and_rifs_on_gr);
DECLARE_uint32(dsf_num_parallel_sessions_per_remote_interface_node);
DECLARE_int32(dsf_num_fsdb_connect_threads);
DECLARE_int32(dsf_num_fsdb_stream_threads);
DECLARE_bool(dsf_subscribe_patch);

DECLARE_bool(set_classid_for_my_subnet_and_ip_routes);
DECLARE_int32(stat_publish_interval_ms);
Expand Down
3 changes: 3 additions & 0 deletions fboss/agent/BUCK
Original file line number Diff line number Diff line change
Expand Up @@ -623,6 +623,7 @@ cpp_library(
"EncapIndexAllocator.cpp",
"FabricConnectivityManager.cpp",
"FibHelpers.cpp",
"FsdbAdaptedSubManager.cpp",
"FsdbSyncer.cpp",
"HwAsicTable.cpp",
"HwSwitchConnectionStatusTable.cpp",
Expand Down Expand Up @@ -774,6 +775,7 @@ cpp_library(
"//fboss/facebook/bitsflow:bitsflow_helper",
"//fboss/fsdb/client:fsdb_pub_sub",
"//fboss/fsdb/client:fsdb_stream_client",
"//fboss/fsdb/client:fsdb_sub_manager",
"//fboss/fsdb/common:flags",
"//fboss/fsdb/if:fsdb_common-cpp2-types",
"//fboss/fsdb/if:fsdb_model",
Expand All @@ -792,6 +794,7 @@ cpp_library(
"//fboss/lib/platforms:product-info",
"//fboss/thrift_cow/nodes:nodes",
"//fboss/thrift_cow/nodes:serializer",
"//fboss/thrift_cow/storage:cow_storage",
"//folly:conv",
"//folly:demangle",
"//folly:exception_string",
Expand Down
126 changes: 96 additions & 30 deletions fboss/agent/DsfSubscription.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ auto constexpr kDsfCtrlLogPrefix = "DSF_CTRL_EVENT: ";

namespace facebook::fboss {

using k_fsdb_model = fsdb::fsdb_model_tags::strings;

DsfSubscription::DsfSubscription(
fsdb::SubscriptionOptions options,
folly::EventBase* reconnectEvb,
Expand All @@ -75,6 +77,11 @@ DsfSubscription::DsfSubscription(
opts_.clientId_,
reconnectEvb,
subscriberEvb)),
subMgr_(new FsdbAdaptedSubManager(
fsdb::SubscriptionOptions(opts_),
getServerOptions(localIp.str(), remoteIp.str()),
reconnectEvb,
subscriberEvb)),
localNodeName_(std::move(localNodeName)),
remoteNodeName_(std::move(remoteNodeName)),
remoteNodeSwitchIds_(std::move(remoteNodeSwitchIds)),
Expand Down Expand Up @@ -112,25 +119,69 @@ void DsfSubscription::stop() {
stopped_ = true;
}
void DsfSubscription::setupSubscription() {
fsdbPubSubMgr_->addStatePathSubscription(
fsdb::SubscriptionOptions(opts_),
getAllSubscribePaths(localNodeName_, localIp_),
[this](
fsdb::SubscriptionState oldState, fsdb::SubscriptionState newState) {
handleFsdbSubscriptionStateUpdate(oldState, newState);
},
[this](fsdb::OperSubPathUnit&& operStateUnit) {
handleFsdbUpdate(std::move(operStateUnit));
},
getServerOptions(localIp_.str(), remoteIp_.str()));
XLOG(DBG2) << kDsfCtrlLogPrefix
<< "added subscription for : " << remoteEndpointStr();
auto subscriptionStateCb = [this](
fsdb::SubscriptionState oldState,
fsdb::SubscriptionState newState) {
handleFsdbSubscriptionStateUpdate(oldState, newState);
};
if (FLAGS_dsf_subscribe_patch) {
auto remoteEndpoint = makeRemoteEndpoint(localNodeName_, localIp_);
auto sysPortPathKey = subMgr_->addPath(getSystemPortsPath());
auto inftMapKey = subMgr_->addPath(getInterfacesPath());
auto dsfSubscriptionsKey = subMgr_->addPath(
getDsfSubscriptionsPath(makeRemoteEndpoint(localNodeName_, localIp_)));
subMgr_->subscribe(
[this, remoteEndpoint, sysPortPathKey, inftMapKey, dsfSubscriptionsKey](
auto update) {
auto agentState = update.data->template ref<k_fsdb_model::agent>();
bool portsOrIntfsChanged = false;
for (const auto& key : update.updatedKeys) {
if (key == sysPortPathKey || key == inftMapKey) {
portsOrIntfsChanged = true;
} else if (key == dsfSubscriptionsKey) {
auto newRemoteState =
agentState
->template safe_cref<k_fsdb_model::fsdbSubscriptions>()
->safe_cref(remoteEndpoint)
->toThrift();
session_.remoteSubStateChanged(newRemoteState);
}
}
if (portsOrIntfsChanged) {
auto switchState =
agentState->template ref<k_fsdb_model::switchState>();
queueRemoteStateChanged(
*switchState->getSystemPorts(), *switchState->getInterfaces());
}
},
std::move(subscriptionStateCb));
} else {
fsdbPubSubMgr_->addStatePathSubscription(
fsdb::SubscriptionOptions(opts_),
getAllSubscribePaths(localNodeName_, localIp_),
[this](
fsdb::SubscriptionState oldState,
fsdb::SubscriptionState newState) {
handleFsdbSubscriptionStateUpdate(oldState, newState);
},
[this](fsdb::OperSubPathUnit&& operStateUnit) {
handleFsdbUpdate(std::move(operStateUnit));
},
getServerOptions(localIp_.str(), remoteIp_.str()));
XLOG(DBG2) << kDsfCtrlLogPrefix
<< "added subscription for : " << remoteEndpointStr();
}
}

void DsfSubscription::tearDownSubscription() {
fsdbPubSubMgr_->removeStatePathSubscription(
getAllSubscribePaths(localNodeName_, localIp_), remoteIp_.str());
XLOG(DBG2) << kDsfCtrlLogPrefix
<< "removed subscription for : " << remoteEndpointStr();
if (FLAGS_dsf_subscribe_patch) {
subMgr_->stop();
} else {
fsdbPubSubMgr_->removeStatePathSubscription(
getAllSubscribePaths(localNodeName_, localIp_), remoteIp_.str());
XLOG(DBG2) << kDsfCtrlLogPrefix
<< "removed subscription for : " << remoteEndpointStr();
}
}
std::string DsfSubscription::remoteEndpointStr() const {
static const std::string kRemoteEndpoint =
Expand All @@ -143,9 +194,13 @@ fsdb::FsdbStreamClient::State DsfSubscription::getStreamState() const {
}

const fsdb::SubscriptionInfo DsfSubscription::getSubscriptionInfo() const {
// Since we own our own pub sub mgr, there should always be exactly one
// subscription
return fsdbPubSubMgr_->getSubscriptionInfo()[0];
if (FLAGS_dsf_subscribe_patch) {
return *subMgr_->getInfo();
} else {
// Since we own our own pub sub mgr, there should always be exactly one
// subscription
return fsdbPubSubMgr_->getSubscriptionInfo().at(0);
}
}

std::string DsfSubscription::makeRemoteEndpoint(
Expand Down Expand Up @@ -189,29 +244,21 @@ void DsfSubscription::handleFsdbSubscriptionStateUpdate(

void DsfSubscription::handleFsdbUpdate(fsdb::OperSubPathUnit&& operStateUnit) {
DsfUpdate dsfUpdate;
MultiSwitchSystemPortMap mswitchSysPorts;
MultiSwitchInterfaceMap mswitchIntfs;
for (const auto& change : *operStateUnit.changes()) {
if (getSystemPortsPath().matchesPath(*change.path()->path())) {
XLOG(DBG2) << "Got sys port update from : " << remoteNodeName_;
MultiSwitchSystemPortMap mswitchSysPorts;
mswitchSysPorts.fromThrift(thrift_cow::deserialize<
MultiSwitchSystemPortMapTypeClass,
MultiSwitchSystemPortMapThriftType>(
fsdb::OperProtocol::BINARY, *change.state()->contents()));
for (const auto& [id, sysPortMap] : mswitchSysPorts) {
auto matcher = HwSwitchMatcher(id);
dsfUpdate.switchId2SystemPorts[matcher.switchId()] = sysPortMap;
}
} else if (getInterfacesPath().matchesPath(*change.path()->path())) {
XLOG(DBG2) << "Got rif update from : " << remoteNodeName_;
MultiSwitchInterfaceMap mswitchIntfs;
mswitchIntfs.fromThrift(thrift_cow::deserialize<
MultiSwitchInterfaceMapTypeClass,
MultiSwitchInterfaceMapThriftType>(
fsdb::OperProtocol::BINARY, *change.state()->contents()));
for (const auto& [id, intfMap] : mswitchIntfs) {
auto matcher = HwSwitchMatcher(id);
dsfUpdate.switchId2Intfs[matcher.switchId()] = intfMap;
}
} else if (getDsfSubscriptionsPath(
makeRemoteEndpoint(localNodeName_, localIp_))
.matchesPath(*change.path()->path())) {
Expand All @@ -232,6 +279,25 @@ void DsfSubscription::handleFsdbUpdate(fsdb::OperSubPathUnit&& operStateUnit) {
remoteNodeName_);
}
}
queueRemoteStateChanged(mswitchSysPorts, mswitchIntfs);
}

void DsfSubscription::queueRemoteStateChanged(
MultiSwitchSystemPortMap& newPortMap,
MultiSwitchInterfaceMap& newInterfaceMap) {
DsfUpdate dsfUpdate;
for (const auto& [id, sysPortMap] : newPortMap) {
auto matcher = HwSwitchMatcher(id);
dsfUpdate.switchId2SystemPorts[matcher.switchId()] = sysPortMap;
}
for (const auto& [id, intfMap] : newInterfaceMap) {
auto matcher = HwSwitchMatcher(id);
dsfUpdate.switchId2Intfs[matcher.switchId()] = intfMap;
}
queueDsfUpdate(std::move(dsfUpdate));
}

void DsfSubscription::queueDsfUpdate(DsfUpdate&& dsfUpdate) {
bool needsScheduling = false;
{
auto nextDsfUpdateWlock = nextDsfUpdate_.wlock();
Expand Down
10 changes: 10 additions & 0 deletions fboss/agent/DsfSubscription.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,13 @@

#pragma once

#include <fboss/thrift_cow/storage/CowStorage.h>
#include "fboss/agent/DsfSession.h"
#include "fboss/agent/FsdbAdaptedSubManager.h"
#include "fboss/agent/state/SwitchState.h"
#include "fboss/fsdb/client/FsdbPubSubManager.h"
#include "fboss/fsdb/client/FsdbSubManager.h"
#include "fboss/fsdb/if/FsdbModel.h"

#include <string>

Expand Down Expand Up @@ -71,11 +76,16 @@ class DsfSubscription {
fsdb::SubscriptionState oldState,
fsdb::SubscriptionState newState);
void handleFsdbUpdate(fsdb::OperSubPathUnit&& operStateUnit);
void queueRemoteStateChanged(
MultiSwitchSystemPortMap& newPortMap,
MultiSwitchInterfaceMap& newInterfaceMap);
void queueDsfUpdate(DsfUpdate&& dsfUpdate);
fsdb::FsdbStreamClient::State getStreamState() const;

fsdb::SubscriptionOptions opts_;
folly::EventBase* hwUpdateEvb_;
std::unique_ptr<fsdb::FsdbPubSubManager> fsdbPubSubMgr_;
std::unique_ptr<FsdbAdaptedSubManager> subMgr_;
std::string localNodeName_;
std::string remoteNodeName_;
std::set<SwitchID> remoteNodeSwitchIds_;
Expand Down
11 changes: 11 additions & 0 deletions fboss/agent/FsdbAdaptedSubManager.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
// (c) Meta Platforms, Inc. and affiliates. Confidential and proprietary.

#include "fboss/agent/FsdbAdaptedSubManager.h"

namespace facebook::fboss {

template class fsdb::FsdbSubManager<
fsdb::CowStorage<fsdb::FsdbOperStateRoot, FsdbOperStateRoot>,
true /* IsCow */>;

} // namespace facebook::fboss
49 changes: 49 additions & 0 deletions fboss/agent/FsdbAdaptedSubManager.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
// (c) Meta Platforms, Inc. and affiliates. Confidential and proprietary.

#pragma once

#include "fboss/agent/state/SwitchState.h"
#include "fboss/fsdb/client/FsdbSubManager.h"
#include "fboss/fsdb/if/FsdbModel.h"
#include "fboss/thrift_cow/storage/CowStorage.h"

namespace facebook::fboss {

/*
Creating an instantiation of FsdbSubManager that returns fully adapted
SwitchState types. To do this, first we have to adapt all the way to fsdb root
*/

using k_fsdb_model = fsdb::fsdb_model_tags::strings;

class AgentData;
RESOLVE_STRUCT_MEMBER(AgentData, k_fsdb_model::switchState, SwitchState);
class AgentData : public ThriftStructNode<AgentData, fsdb::AgentData> {
public:
using BaseT = ThriftStructNode<AgentData, fsdb::AgentData>;
using BaseT::modify;
// Inherit the constructors required for clone()
using BaseT::BaseT;
friend class CloneAllocator;
};

class FsdbOperStateRoot;
RESOLVE_STRUCT_MEMBER(FsdbOperStateRoot, k_fsdb_model::agent, AgentData);
class FsdbOperStateRoot
: public ThriftStructNode<FsdbOperStateRoot, fsdb::FsdbOperStateRoot> {
public:
using BaseT = ThriftStructNode<FsdbOperStateRoot, fsdb::FsdbOperStateRoot>;
using BaseT::modify;
// Inherit the constructors required for clone()
using BaseT::BaseT;
friend class CloneAllocator;
};

extern template class fsdb::FsdbSubManager<
fsdb::CowStorage<fsdb::FsdbOperStateRoot, FsdbOperStateRoot>,
true /* IsCow */>;
using FsdbAdaptedSubManager = fsdb::FsdbSubManager<
fsdb::CowStorage<fsdb::FsdbOperStateRoot, FsdbOperStateRoot>,
true /* IsCow */>;

} // namespace facebook::fboss
4 changes: 2 additions & 2 deletions fboss/agent/state/LoadBalancer.h
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ class LoadBalancer
: public ThriftStructNode<LoadBalancer, state::LoadBalancerFields> {
public:
template <typename T>
struct Fields {
struct FieldsT {
using ElementType = T;
using ElemnentTC = apache::thrift::type_class::enumeration;
using FieldsTC = apache::thrift::type_class::set<ElemnentTC>;
Expand All @@ -79,7 +79,7 @@ class LoadBalancer
};

template <typename T>
using FieldsConstIter = typename Fields<T>::const_iterator;
using FieldsConstIter = typename FieldsT<T>::const_iterator;

using BaseT = ThriftStructNode<LoadBalancer, state::LoadBalancerFields>;
using IPv4Field = LoadBalancerFields::IPv4Field;
Expand Down
20 changes: 13 additions & 7 deletions fboss/agent/test/DsfSubscriptionTests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -65,20 +65,26 @@ std::shared_ptr<InterfaceMap> makeRifs(const SystemPortMap* sysPorts) {
}
} // namespace

template <uint16_t N>
struct NumRemoteAsics {
static auto constexpr kNumRemoteAsics = N;
template <uint16_t NumRemoteAsics, bool SubscribePatch>
struct TestParams {
static auto constexpr kNumRemoteAsics = NumRemoteAsics;
static auto constexpr kSubscribePatch = SubscribePatch;
};
using TestTypes = ::testing::Types<NumRemoteAsics<1>, NumRemoteAsics<2>>;
using TestTypes = ::testing::Types<
TestParams<1, true>,
TestParams<1, false>,
TestParams<2, true>,
TestParams<2, false>>;

template <typename NumRemoteSwitchAsics>
template <typename TestParam>
class DsfSubscriptionTest : public ::testing::Test {
public:
static auto constexpr kNumRemoteSwitchAsics =
NumRemoteSwitchAsics::kNumRemoteAsics;
static auto constexpr kNumRemoteSwitchAsics = TestParam::kNumRemoteAsics;
static auto constexpr kSubscribePatch = TestParam::kSubscribePatch;
void SetUp() override {
FLAGS_publish_state_to_fsdb = true;
FLAGS_fsdb_sync_full_state = true;
FLAGS_dsf_subscribe_patch = kSubscribePatch;
auto config = testConfigA(cfg::SwitchType::VOQ);
handle_ = createTestHandle(&config);
sw_ = handle_->getSw();
Expand Down

0 comments on commit e5805ea

Please sign in to comment.