diff --git a/include/up-transport-zenoh-cpp/ZenohUTransport.h b/include/up-transport-zenoh-cpp/ZenohUTransport.h index fbe28ed..cc86ebb 100644 --- a/include/up-transport-zenoh-cpp/ZenohUTransport.h +++ b/include/up-transport-zenoh-cpp/ZenohUTransport.h @@ -12,10 +12,36 @@ #ifndef UP_TRANSPORT_ZENOH_CPP_ZENOHUTRANSPORT_H #define UP_TRANSPORT_ZENOH_CPP_ZENOHUTRANSPORT_H +#include #include #include +#include #include +#include +#include + +namespace zenohc { + +class OwnedQuery { +public: + OwnedQuery(const z_query_t& query) : _query(z_query_clone(&query)) {} + + OwnedQuery(const OwnedQuery&) = delete; + OwnedQuery& operator=(const OwnedQuery&) = delete; + + ~OwnedQuery() { z_drop(&_query); } + + Query loan() const { return z_loan(_query); } + bool check() const { return z_check(_query); } + +private: + z_owned_query_t _query; +}; + +using OwnedQueryPtr = std::shared_ptr; + +} // namespace zenohc namespace uprotocol::transport { @@ -61,6 +87,31 @@ struct ZenohUTransport : public UTransport { /// @brief Represents the callable end of a callback connection. using CallableConn = typename UTransport::CallableConn; + using UuriKey = std::string; + + struct ListenerKey { + CallableConn listener; + std::string zenoh_key; + + ListenerKey(CallableConn listener, const std::string& zenoh_key) + : listener(listener), zenoh_key(zenoh_key) {} + + bool operator==(const ListenerKey& other) const { + return listener == other.listener && zenoh_key == other.zenoh_key; + } + + bool operator<(const ListenerKey& other) const { + if (listener == other.listener) { + return zenoh_key < other.zenoh_key; + } + return listener < other.listener; + } + }; + + using RpcCallbackMap = std::map; + using SubscriberMap = std::map; + using QueryableMap = std::map; + using QueryMap = std::map; /// @brief Register listener to be called when UMessage is received /// for the given URI. @@ -95,6 +146,57 @@ struct ZenohUTransport : public UTransport { virtual void cleanupListener(CallableConn listener) override; private: + FRIEND_TEST(TestZenohUTransport, toZenohKeyString); + + static v1::UStatus uError(v1::UCode code, std::string_view message); + + static std::string toZenohKeyString( + const std::string& default_authority_name, const v1::UUri& source, + const std::optional& sink); + + static std::vector> + uattributesToAttachment(const v1::UAttributes& attributes); + + static v1::UAttributes attachmentToUAttributes( + const zenoh::AttachmentView& attachment); + + static zenoh::Priority mapZenohPriority(v1::UPriority upriority); + + static v1::UMessage sampleToUMessage(const zenoh::Sample& sample); + + v1::UStatus registerRequestListener_(const std::string& zenoh_key, + CallableConn listener); + + v1::UStatus registerResponseListener_(const std::string& zenoh_key, + CallableConn listener); + + v1::UStatus registerPublishNotificationListener_( + const std::string& zenoh_key, CallableConn listener); + + v1::UStatus sendRequest_(const std::string& zenoh_key, + const std::string& payload, + const v1::UAttributes& attributes); + + v1::UStatus sendResponse_(const std::string& payload, + const v1::UAttributes& attributes); + + v1::UStatus sendPublishNotification_(const std::string& zenoh_key, + const std::string& payload, + const v1::UAttributes& attributes); + + zenoh::Session session_; + + RpcCallbackMap rpc_callback_map_; + std::mutex rpc_callback_map_mutex_; + + SubscriberMap subscriber_map_; + std::mutex subscriber_map_mutex_; + + QueryableMap queryable_map_; + std::mutex queryable_map_mutex_; + + QueryMap query_map_; + std::mutex query_map_mutex_; }; } // namespace uprotocol::transport diff --git a/src/ZenohUTransport.cpp b/src/ZenohUTransport.cpp index f8bf760..935a019 100644 --- a/src/ZenohUTransport.cpp +++ b/src/ZenohUTransport.cpp @@ -9,4 +9,384 @@ // // SPDX-License-Identifier: Apache-2.0 +#include +#include +#include +#include + +#include + #include "up-transport-zenoh-cpp/ZenohUTransport.h" + +namespace uprotocol::transport { + +const char UATTRIBUTE_VERSION = 1; + +const uint32_t WILDCARD_ENTITY_ID = 0x0000FFFF; +const uint32_t WILDCARD_ENTITY_VERSION = 0x000000FF; +const uint32_t WILDCARD_RESOURCE_ID = 0x0000FFFF; + +using namespace zenoh; +using namespace uprotocol::v1; +using namespace uprotocol::datamodel; + +UStatus ZenohUTransport::uError(UCode code, std::string_view message) { + UStatus status; + status.set_code(code); + status.set_message(std::string(message)); + return status; +} + +std::string ZenohUTransport::toZenohKeyString( + const std::string& default_authority_name, const UUri& source, + const std::optional& sink) { + std::ostringstream zenoh_key; + + auto writeUUri = [&](const v1::UUri& uuri) { + zenoh_key << "/"; + + // authority_name + if (uuri.authority_name().empty()) { + zenoh_key << default_authority_name; + } else { + zenoh_key << uuri.authority_name(); + } + zenoh_key << "/"; + + // ue_id + if (uuri.ue_id() == WILDCARD_ENTITY_ID) { + zenoh_key << "*"; + } else { + zenoh_key << uuri.ue_id(); + } + zenoh_key << "/"; + + // ue_version_major + if (uuri.ue_version_major() == WILDCARD_ENTITY_VERSION) { + zenoh_key << "*"; + } else { + zenoh_key << uuri.ue_version_major(); + } + zenoh_key << "/"; + + // resource_id + if (uuri.resource_id() == WILDCARD_RESOURCE_ID) { + zenoh_key << "*"; + } else { + zenoh_key << uuri.resource_id(); + } + }; + + zenoh_key << "up"; + zenoh_key << std::uppercase << std::hex; + + writeUUri(source); + + if (sink.has_value()) { + writeUUri(*sink); + } else { + zenoh_key << "/{}/{}/{}/{}"; + } + return zenoh_key.str(); +} + +std::vector> +ZenohUTransport::uattributesToAttachment(const UAttributes& attributes) { + std::vector> res; + + std::string version(&UATTRIBUTE_VERSION, 1); + + std::string data; + attributes.SerializeToString(&data); + + res.push_back(std::make_pair("", version)); + res.push_back(std::make_pair("", data)); + return res; +} + +UAttributes ZenohUTransport::attachmentToUAttributes( + const AttachmentView& attachment) { + std::vector attachment_vec; + attachment.iterate( + [&](const BytesView& key, const BytesView& value) -> bool { + attachment_vec.push_back(value); + return true; + }); + + if (attachment_vec.size() != 2) { + // TODO: error report, exception? + } + + if (attachment_vec[0].get_len() == 1) { + if (attachment_vec[0].as_string_view()[0] != UATTRIBUTE_VERSION) { + // TODO: error report, exception? + } + }; + UAttributes res; + // TODO: more efficient way? + res.ParseFromString(std::string(attachment_vec[1].as_string_view())); + return res; +} + +Priority ZenohUTransport::mapZenohPriority(UPriority upriority) { + switch (upriority) { + case UPriority::UPRIORITY_CS0: + return Z_PRIORITY_BACKGROUND; + case UPriority::UPRIORITY_CS1: + return Z_PRIORITY_DATA_LOW; + case UPriority::UPRIORITY_CS2: + return Z_PRIORITY_DATA; + case UPriority::UPRIORITY_CS3: + return Z_PRIORITY_DATA_HIGH; + case UPriority::UPRIORITY_CS4: + return Z_PRIORITY_INTERACTIVE_LOW; + case UPriority::UPRIORITY_CS5: + return Z_PRIORITY_INTERACTIVE_HIGH; + case UPriority::UPRIORITY_CS6: + return Z_PRIORITY_REAL_TIME; + case UPriority::UPRIORITY_UNSPECIFIED: + default: + return Z_PRIORITY_DATA_LOW; + } +} + +UMessage ZenohUTransport::sampleToUMessage(const Sample& sample) { + UAttributes attributes; + if (sample.get_attachment().check()) { + attributes = attachmentToUAttributes(sample.get_attachment()); + } + std::string payload(sample.get_payload().as_string_view()); + UMessage message; + message.set_payload(payload); + message.set_allocated_attributes(&attributes); + + return message; +} + +ZenohUTransport::ZenohUTransport(const UUri& defaultUri, + const std::filesystem::path& configFile) + : UTransport(defaultUri), + session_(expect(open( + std::move(expect(config_from_file(configFile.string().c_str())))))) {} + +UStatus ZenohUTransport::registerRequestListener_(const std::string& zenoh_key, + CallableConn listener) { + auto on_query = [&](const Query& query) { + UAttributes attributes; + if (query.get_attachment().check()) { + attributes = attachmentToUAttributes(query.get_attachment()); + } + auto id_str = serializer::uuid::AsString().serialize(attributes.id()); + std::unique_lock lock(query_map_mutex_); + query_map_.insert(std::make_pair( + std::move(id_str), std::move(std::make_shared(query)))); + }; + + auto on_drop_queryable = []() {}; + + auto queryable = expect( + session_.declare_queryable(zenoh_key, {on_query, on_drop_queryable})); + + return UStatus(); +} + +UStatus ZenohUTransport::registerResponseListener_(const std::string& zenoh_key, + CallableConn listener) { + std::unique_lock lock(rpc_callback_map_mutex_); + rpc_callback_map_.insert(std::make_pair(zenoh_key, listener)); + + return UStatus(); +} + +UStatus ZenohUTransport::registerPublishNotificationListener_( + const std::string& zenoh_key, CallableConn listener) { + auto data_handler = [&](const Sample& sample) { + listener(sampleToUMessage(sample)); + // invoke_nonblock_callback(&cb_sender, &listener_cloned, Ok(msg)); + }; + + auto key = ListenerKey(listener, zenoh_key); + auto subscriber = expect( + session_.declare_subscriber(zenoh_key, data_handler)); + { + std::unique_lock lock(subscriber_map_mutex_); + subscriber_map_.insert( + std::make_pair(std::move(key), std::move(subscriber))); + } + return UStatus(); +} + +UStatus ZenohUTransport::sendRequest_(const std::string& zenoh_key, + const std::string& payload, + const UAttributes& attributes) { + auto source_str = + serializer::uri::AsString().serialize(attributes.source()); + CallableConn resp_callback; + { + std::unique_lock lock(rpc_callback_map_mutex_); + + if (auto resp_callback_it = rpc_callback_map_.find(source_str); + resp_callback_it == rpc_callback_map_.end()) { + return uError(UCode::UNAVAILABLE, "failed to find UUID"); + } else { + resp_callback = resp_callback_it->second; + } + } + auto on_reply = [&](Reply&& reply) { + auto result = reply.get(); + + if (auto sample = std::get_if(&result)) { + resp_callback(sampleToUMessage(*sample)); + } else if (auto error = std::get_if(&result)) { + // TODO: error report + } + }; + + auto attachment = uattributesToAttachment(attributes); + + GetOptions options; + options.set_target(Z_QUERY_TARGET_BEST_MATCHING); + options.set_value(Value(payload)); + options.set_attachment(attachment); + + auto onDone = []() {}; + + session_.get(zenoh_key, "", {on_reply, onDone}, options); + + return UStatus(); +} + +UStatus ZenohUTransport::sendResponse_(const std::string& payload, + const UAttributes& attributes) { + auto reqid_str = serializer::uuid::AsString().serialize(attributes.reqid()); + OwnedQueryPtr query; + { + std::unique_lock lock(query_map_mutex_); + if (auto query_it = query_map_.find(reqid_str); + query_it == query_map_.end()) { + return uError(UCode::INTERNAL, "query doesn't exist"); + } else { + query = query_it->second; + } + } + + QueryReplyOptions options; + query->loan().reply(query->loan().get_keyexpr().as_string_view(), payload, + options); + + return UStatus(); +} + +UStatus ZenohUTransport::sendPublishNotification_( + const std::string& zenoh_key, const std::string& payload, + const UAttributes& attributes) { + auto attachment = uattributesToAttachment(attributes); + + auto priority = mapZenohPriority(attributes.priority()); + + PutOptions options; + options.set_encoding(Z_ENCODING_PREFIX_APP_CUSTOM); + options.set_priority(priority); + options.set_attachment(attachment); + if (!session_.put(zenoh_key, payload, options)) { + return uError(UCode::INTERNAL, "Unable to send with Zenoh"); + } + + return UStatus(); +} + +v1::UStatus ZenohUTransport::sendImpl(const UMessage& message) { + if (!message.has_payload()) { + return uError(UCode::INVALID_ARGUMENT, "Invalid UPayload"); + } + const auto& payload = message.payload(); + + const auto& attributes = message.attributes(); + if (attributes.type() == UMessageType::UMESSAGE_TYPE_UNSPECIFIED) { + return uError(UCode::INVALID_ARGUMENT, "Invalid UAttributes"); + } + + std::string zenoh_key = + toZenohKeyString(getDefaultSource().authority_name(), attributes.sink(), + attributes.source()); + switch (attributes.type()) { + case UMessageType::UMESSAGE_TYPE_PUBLISH: { + auto [valid, maybe_reason] = + validator::message::isValidPublish(message); + if (!valid) { + return uError(UCode::INVALID_ARGUMENT, + validator::message::message(*maybe_reason)); + } + return sendPublishNotification_(zenoh_key, payload, attributes); + } + case UMessageType::UMESSAGE_TYPE_NOTIFICATION: { + auto [valid, maybe_reason] = + validator::message::isValidNotification(message); + if (!valid) { + return uError(UCode::INVALID_ARGUMENT, + validator::message::message(*maybe_reason)); + } + return sendPublishNotification_(zenoh_key, payload, attributes); + } + case UMessageType::UMESSAGE_TYPE_REQUEST: { + auto [valid, maybe_reason] = + validator::message::isValidRpcRequest(message); + if (!valid) { + return uError(UCode::INVALID_ARGUMENT, + validator::message::message(*maybe_reason)); + } + return sendRequest_(zenoh_key, payload, attributes); + } + case UMessageType::UMESSAGE_TYPE_RESPONSE: { + auto [valid, maybe_reason] = + validator::message::isValidRpcResponse(message); + if (!valid) { + return uError(UCode::INVALID_ARGUMENT, + validator::message::message(*maybe_reason)); + } + return sendResponse_(payload, attributes); + } + default: { + return uError(UCode::INVALID_ARGUMENT, + "Wrong Message type in UAttributes"); + } + } + return UStatus(); +} + +v1::UStatus ZenohUTransport::registerListenerImpl( + const v1::UUri& sink_filter, CallableConn&& listener, + std::optional&& source_filter) { + std::string zenoh_key = toZenohKeyString( + getDefaultSource().authority_name(), sink_filter, source_filter); + // TODO: Is 0 == none? + if (!sink_filter.authority_name().empty() && sink_filter.ue_id() == 0 && + sink_filter.resource_id() == 0) { + // This is special UUri which means we need to register for all of + // Publish, Notification, Request, and Response RPC response + registerResponseListener_(zenoh_key, listener); + registerRequestListener_(zenoh_key, listener); + registerPublishNotificationListener_(zenoh_key, listener); + } else { + auto [valid, maybe_reason] = validator::uri::isValid(sink_filter); + if (!valid) { + return uError(UCode::INVALID_ARGUMENT, + validator::uri::message(*maybe_reason)); + } + + if (std::get<0>(validator::uri::isValidRpcResponse(sink_filter))) { + registerResponseListener_(zenoh_key, std::move(listener)); + } else if (std::get<0>(validator::uri::isValidRpcMethod(sink_filter))) { + registerRequestListener_(zenoh_key, std::move(listener)); + } else { + registerPublishNotificationListener_(zenoh_key, + std::move(listener)); + } + } + + return v1::UStatus(); +} + +void ZenohUTransport::cleanupListener(CallableConn listener) {} + +} // namespace uprotocol::transport diff --git a/test/coverage/ZenohUTransportTest.cpp b/test/coverage/ZenohUTransportTest.cpp index 45c0352..4f281ff 100644 --- a/test/coverage/ZenohUTransportTest.cpp +++ b/test/coverage/ZenohUTransportTest.cpp @@ -10,11 +10,21 @@ // SPDX-License-Identifier: Apache-2.0 #include -#include +#include +#include -namespace { +#include -class TestFixture : public testing::Test { +#include "up-transport-zenoh-cpp/ZenohUTransport.h" + +namespace uprotocol::transport { + +using namespace uprotocol::v1; +using namespace uprotocol::transport; + +constexpr const char* AUTHORITY_NAME = "test"; + +class TestZenohUTransport : public testing::Test { protected: // Run once per TEST_F. // Used to set up clean environments per test. @@ -23,8 +33,8 @@ class TestFixture : public testing::Test { // Run once per execution of the test application. // Used for setup of all tests. Has access to this instance. - TestFixture() = default; - ~TestFixture() = default; + TestZenohUTransport() = default; + ~TestZenohUTransport() = default; // Run once per execution of the test application. // Used only for global setup outside of tests. @@ -32,7 +42,66 @@ class TestFixture : public testing::Test { static void TearDownTestSuite() {} }; -// TODO replace -TEST_F(TestFixture, SomeTestName) {} +uprotocol::v1::UUri create_uuri(const std::string& authority, uint32_t ue_id, + uint32_t ue_version_major, + uint32_t resource_id) { + uprotocol::v1::UUri uuri; + uuri.set_authority_name(authority); + uuri.set_ue_id(ue_id); + uuri.set_ue_version_major(ue_version_major); + uuri.set_resource_id(resource_id); + + return uuri; +} + +// TODO(sashacmc): config generation +TEST_F(TestZenohUTransport, ConstructDestroy) { + uprotocol::v1::UUri def_src_uuri; + def_src_uuri.set_authority_name(AUTHORITY_NAME); + def_src_uuri.set_ue_id(0x18000); + def_src_uuri.set_ue_version_major(1); + def_src_uuri.set_resource_id(0); + + zenoh::init_logger(); + try { + auto ut = ZenohUTransport(def_src_uuri, + "/home/sashacmc/src/up-client-zenoh-cpp/test/" + "extra/DEFAULT_CONFIG.json5"); + } catch (zenoh::ErrorMessage& e) { + throw std::runtime_error(std::string(e.as_string_view())); + } +} + +TEST_F(TestZenohUTransport, toZenohKeyString) { + EXPECT_EQ( + ZenohUTransport::toZenohKeyString( + "", create_uuri("192.168.1.100", 0x10AB, 3, 0x80CD), std::nullopt), + "up/192.168.1.100/10AB/3/80CD/{}/{}/{}/{}"); + + EXPECT_EQ(ZenohUTransport::toZenohKeyString( + "", create_uuri("192.168.1.100", 0x10AB, 3, 0x80CD), + create_uuri("192.168.1.101", 0x20EF, 4, 0)), + "up/192.168.1.100/10AB/3/80CD/192.168.1.101/20EF/4/0"); + + EXPECT_EQ(ZenohUTransport::toZenohKeyString( + "", create_uuri("*", 0xFFFF, 0xFF, 0xFFFF), + create_uuri("192.168.1.101", 0x20EF, 4, 0)), + "up/*/*/*/*/192.168.1.101/20EF/4/0"); + + EXPECT_EQ(ZenohUTransport::toZenohKeyString( + "", create_uuri("my-host1", 0x10AB, 3, 0), + create_uuri("my-host2", 0x20EF, 4, 0xB)), + "up/my-host1/10AB/3/0/my-host2/20EF/4/B"); + + EXPECT_EQ(ZenohUTransport::toZenohKeyString( + "", create_uuri("*", 0xFFFF, 0xFF, 0xFFFF), + create_uuri("my-host2", 0x20EF, 4, 0xB)), + "up/*/*/*/*/my-host2/20EF/4/B"); + + EXPECT_EQ(ZenohUTransport::toZenohKeyString( + "", create_uuri("*", 0xFFFF, 0xFF, 0xFFFF), + create_uuri("[::1]", 0xFFFF, 0xFF, 0xFFFF)), + "up/*/*/*/*/[::1]/*/*/*"); +} -} // namespace +} // namespace uprotocol::transport diff --git a/test/extra/PublisherSubscriberTest.cpp b/test/extra/PublisherSubscriberTest.cpp index 54e772d..d3fc132 100644 --- a/test/extra/PublisherSubscriberTest.cpp +++ b/test/extra/PublisherSubscriberTest.cpp @@ -9,10 +9,22 @@ // // SPDX-License-Identifier: Apache-2.0 +#include #include +#include +#include + +#include + +#include "up-transport-zenoh-cpp/ZenohUTransport.h" namespace { +using namespace uprotocol::v1; +using namespace uprotocol::transport; + +constexpr const char* AUTHORITY_NAME = "test"; + class TestFixture : public testing::Test { protected: // Run once per TEST_F. @@ -31,7 +43,84 @@ class TestFixture : public testing::Test { static void TearDownTestSuite() {} }; -// TODO replace -TEST_F(TestFixture, SomeTestName) {} +using MsgDiff = google::protobuf::util::MessageDifferencer; + +uprotocol::v1::UUID* make_uuid() { + uint64_t timestamp = + std::chrono::duration_cast( + std::chrono::system_clock::now().time_since_epoch()) + .count(); + auto id = new uprotocol::v1::UUID(); + id->set_msb((timestamp << 16) | (8ULL << 12) | + (0x123ULL)); // version 8 ; counter = 0x123 + id->set_lsb((2ULL << 62) | (0xFFFFFFFFFFFFULL)); // variant 10 + return id; +} + +// TODO(sashacmc): config generation +TEST_F(TestFixture, PubSub) { + UUri uuri; + + uuri.set_authority_name(AUTHORITY_NAME); + uuri.set_ue_id(0x00010001); + uuri.set_ue_version_major(1); + uuri.set_resource_id(0); + + zenoh::init_logger(); + try { + std::cerr << "Test MESSAGE" << std::endl; + auto ut = ZenohUTransport(uuri, + "/home/sashacmc/src/up-client-zenoh-cpp/test/" + "extra/DEFAULT_CONFIG.json5"); + + uprotocol::v1::UUri sink_filter; + sink_filter.set_authority_name(AUTHORITY_NAME); + sink_filter.set_ue_id(0x00010001); + sink_filter.set_ue_version_major(1); + sink_filter.set_resource_id(0x8000); + + uprotocol::v1::UUri source_filter; + source_filter.set_authority_name(AUTHORITY_NAME); + source_filter.set_ue_id(0x00010001); + source_filter.set_ue_version_major(1); + source_filter.set_resource_id(0x8000); + + uprotocol::v1::UMessage capture_msg; + size_t capture_count = 0; + auto action = [&](const uprotocol::v1::UMessage& msg) { + capture_msg = msg; + capture_count++; + }; + auto lhandle = ut.registerListener(sink_filter, action, source_filter); + EXPECT_TRUE(lhandle.has_value()); + auto handle = std::move(lhandle).value(); + EXPECT_TRUE(handle); + + const size_t max_count = 1; // 1000 * 100; + for (auto i = 0; i < max_count; i++) { + auto src = new uprotocol::v1::UUri(); + src->set_authority_name(AUTHORITY_NAME); + src->set_ue_id(0x00010001); + src->set_ue_version_major(1); + src->set_resource_id(0x8000); + + auto attr = new uprotocol::v1::UAttributes(); + attr->set_type(uprotocol::v1::UMESSAGE_TYPE_PUBLISH); + attr->set_allocated_source(src); + attr->set_allocated_id(make_uuid()); + attr->set_payload_format(uprotocol::v1::UPAYLOAD_FORMAT_PROTOBUF); + attr->set_ttl(1000); + uprotocol::v1::UMessage msg; + msg.set_allocated_attributes(attr); + msg.set_payload("payload"); + auto result = ut.send(msg); + EXPECT_EQ(i + 1, capture_count); + EXPECT_TRUE(MsgDiff::Equals(msg, capture_msg)); + } + handle.reset(); + } catch (zenoh::ErrorMessage& e) { + throw std::runtime_error(std::string(e.as_string_view())); + } +} } // namespace