Skip to content

Commit

Permalink
DO NOT MERGE
Browse files Browse the repository at this point in the history
Checking changes in eclipse-uprotocol#64 enable eclipse-uprotocol#54 to proceed

Merge remote-tracking branch 'sasha/zenoh-utransport-impl' into check/add-zenohcpp-conan
  • Loading branch information
gregmedd committed Jun 27, 2024
2 parents e004ccb + a1c58c0 commit 6e990db
Show file tree
Hide file tree
Showing 4 changed files with 650 additions and 10 deletions.
102 changes: 102 additions & 0 deletions include/up-transport-zenoh-cpp/ZenohUTransport.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,36 @@
#ifndef UP_TRANSPORT_ZENOH_CPP_ZENOHUTRANSPORT_H
#define UP_TRANSPORT_ZENOH_CPP_ZENOHUTRANSPORT_H

#include <gtest/gtest.h>
#include <up-cpp/transport/UTransport.h>

#include <filesystem>
#include <mutex>
#include <optional>
#include <unordered_map>
#include <zenoh.hxx>

namespace zenohc {

class OwnedQuery {
public:
OwnedQuery(const z_query_t& query) : _query(z_query_clone(&query)) {}

OwnedQuery(const OwnedQuery&) = delete;
OwnedQuery& operator=(const OwnedQuery&) = delete;

~OwnedQuery() { z_drop(&_query); }

Query loan() const { return z_loan(_query); }
bool check() const { return z_check(_query); }

private:
z_owned_query_t _query;
};

using OwnedQueryPtr = std::shared_ptr<OwnedQuery>;

} // namespace zenohc

namespace uprotocol::transport {

Expand Down Expand Up @@ -61,6 +87,31 @@ struct ZenohUTransport : public UTransport {

/// @brief Represents the callable end of a callback connection.
using CallableConn = typename UTransport::CallableConn;
using UuriKey = std::string;

struct ListenerKey {
CallableConn listener;
std::string zenoh_key;

ListenerKey(CallableConn listener, const std::string& zenoh_key)
: listener(listener), zenoh_key(zenoh_key) {}

bool operator==(const ListenerKey& other) const {
return listener == other.listener && zenoh_key == other.zenoh_key;
}

bool operator<(const ListenerKey& other) const {
if (listener == other.listener) {
return zenoh_key < other.zenoh_key;
}
return listener < other.listener;
}
};

using RpcCallbackMap = std::map<UuriKey, CallableConn>;
using SubscriberMap = std::map<ListenerKey, zenoh::Subscriber>;
using QueryableMap = std::map<ListenerKey, zenoh::Queryable>;
using QueryMap = std::map<std::string, zenoh::OwnedQueryPtr>;

/// @brief Register listener to be called when UMessage is received
/// for the given URI.
Expand Down Expand Up @@ -95,6 +146,57 @@ struct ZenohUTransport : public UTransport {
virtual void cleanupListener(CallableConn listener) override;

private:
FRIEND_TEST(TestZenohUTransport, toZenohKeyString);

static v1::UStatus uError(v1::UCode code, std::string_view message);

static std::string toZenohKeyString(
const std::string& default_authority_name, const v1::UUri& source,
const std::optional<v1::UUri>& sink);

static std::vector<std::pair<std::string, std::string>>
uattributesToAttachment(const v1::UAttributes& attributes);

static v1::UAttributes attachmentToUAttributes(
const zenoh::AttachmentView& attachment);

static zenoh::Priority mapZenohPriority(v1::UPriority upriority);

static v1::UMessage sampleToUMessage(const zenoh::Sample& sample);

v1::UStatus registerRequestListener_(const std::string& zenoh_key,
CallableConn listener);

v1::UStatus registerResponseListener_(const std::string& zenoh_key,
CallableConn listener);

v1::UStatus registerPublishNotificationListener_(
const std::string& zenoh_key, CallableConn listener);

v1::UStatus sendRequest_(const std::string& zenoh_key,
const std::string& payload,
const v1::UAttributes& attributes);

v1::UStatus sendResponse_(const std::string& payload,
const v1::UAttributes& attributes);

v1::UStatus sendPublishNotification_(const std::string& zenoh_key,
const std::string& payload,
const v1::UAttributes& attributes);

zenoh::Session session_;

RpcCallbackMap rpc_callback_map_;
std::mutex rpc_callback_map_mutex_;

SubscriberMap subscriber_map_;
std::mutex subscriber_map_mutex_;

QueryableMap queryable_map_;
std::mutex queryable_map_mutex_;

QueryMap query_map_;
std::mutex query_map_mutex_;
};

} // namespace uprotocol::transport
Expand Down
Loading

0 comments on commit 6e990db

Please sign in to comment.