Skip to content

Commit

Permalink
Merge pull request eclipse-uprotocol#92 from billpittman/dev/test/ext…
Browse files Browse the repository at this point in the history
…ra/PublisherSubscriberTest

PublisherSubscriberTest: Add some basic tests
  • Loading branch information
gregmedd authored Aug 9, 2024
2 parents eefdde8 + 9f9ae9b commit 88cc7d1
Showing 1 changed file with 185 additions and 35 deletions.
220 changes: 185 additions & 35 deletions test/extra/PublisherSubscriberTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,24 +9,24 @@
//
// SPDX-License-Identifier: Apache-2.0

#include <google/protobuf/util/message_differencer.h>
#include <gtest/gtest.h>
#include <up-cpp/communication/Publisher.h>
#include <up-cpp/communication/Subscriber.h>
#include <up-cpp/datamodel/serializer/UUri.h>

#include <queue>

#include "up-transport-zenoh-cpp/ZenohUTransport.h"

namespace {
constexpr size_t num_publish_messages = 25;

using namespace uprotocol;

constexpr std::string_view ZENOH_CONFIG_FILE = BUILD_REALPATH_ZENOH_CONF;

constexpr std::string_view ENTITY_URI_STR = "//test0/10001/1/0";
constexpr std::string_view TOPIC_URI_STR = "//test0/10001/1/8000";
constexpr int ENTITY_URI = 0;
constexpr int TOPIC_URI = 0x8000;
constexpr int TOPIC_URI2 = 0x8001;

class PublisherSubscriberTest : public testing::Test {
protected:
Expand All @@ -37,7 +37,7 @@ class PublisherSubscriberTest : public testing::Test {

// Run once per execution of the test application.
// Used for setup of all tests. Has access to this instance.
PublisherSubscriberTest() = default;
PublisherSubscriberTest() { zenoh::init_logger(); }
~PublisherSubscriberTest() = default;

// Run once per execution of the test application.
Expand All @@ -46,37 +46,49 @@ class PublisherSubscriberTest : public testing::Test {
static void TearDownTestSuite() {}
};

v1::UUri makeUUri(std::string_view authority, uint16_t ue_id,
uint16_t ue_instance, uint8_t version, uint16_t resource) {
v1::UUri makeUUri(uint16_t resource_id) {
v1::UUri uuri;
uuri.set_authority_name(static_cast<std::string>(authority));
uuri.set_ue_id((static_cast<uint32_t>(ue_instance) << 16) | ue_id);
uuri.set_ue_version_major(version);
uuri.set_resource_id(resource);
uuri.set_authority_name(static_cast<std::string>("test0"));
uuri.set_ue_id((0x10001));
uuri.set_ue_version_major(1);
uuri.set_resource_id(resource_id);
return uuri;
}

v1::UUri makeUUri(std::string_view serialized) {
return datamodel::serializer::uri::AsString::deserialize(
static_cast<std::string>(serialized));
}

std::shared_ptr<transport::UTransport> getTransport(
const v1::UUri& uuri = makeUUri(ENTITY_URI_STR)) {
const v1::UUri& uuri = makeUUri(ENTITY_URI)) {
return std::make_shared<transport::ZenohUTransport>(uuri,
ZENOH_CONFIG_FILE);
}

using MsgDiff = google::protobuf::util::MessageDifferencer;
// ValidateMessages
// Validates the messages received by the subscriber. Every message has a
// sequencial message number, and the sum of all the numbers should be equal to
// (num_messages * (num_messages + 1)) / 2. Additionally, each message should
// match the expected prefix.
void ValidateMessages(std::queue<v1::UMessage>& rx_queue, size_t num_messages,
const std::string& prefix) {
EXPECT_EQ(rx_queue.size(), num_messages);

int sum = 0;
while (!rx_queue.empty()) {
auto message = rx_queue.front();
rx_queue.pop();

int pos = message.payload().find(prefix);
int num = std::stoi(message.payload().substr(prefix.size()));
sum += num;
EXPECT_NE(pos, std::string::npos);
}
EXPECT_EQ(sum, (num_messages * (num_messages + 1)) / 2);
}

// TODO(sashacmc): config generation

TEST_F(PublisherSubscriberTest, SinglePubSingleSub) {
zenoh::init_logger();

auto transport = getTransport();

communication::Publisher pub(transport, makeUUri(TOPIC_URI_STR),
communication::Publisher pub(transport, makeUUri(TOPIC_URI),
v1::UPayloadFormat::UPAYLOAD_FORMAT_TEXT);

std::mutex rx_queue_mtx;
Expand All @@ -87,27 +99,165 @@ TEST_F(PublisherSubscriberTest, SinglePubSingleSub) {
};

auto maybe_sub = communication::Subscriber::subscribe(
transport, makeUUri(TOPIC_URI_STR), std::move(on_rx));
transport, makeUUri(TOPIC_URI), std::move(on_rx));
EXPECT_TRUE(maybe_sub);

if (maybe_sub) {
for (auto remaining = num_publish_messages; remaining > 0;
--remaining) {
std::ostringstream message;
message << "Message number: " << remaining;

auto result =
pub.publish({std::move(message).str(),
v1::UPayloadFormat::UPAYLOAD_FORMAT_TEXT});
EXPECT_EQ(result.code(), v1::UCode::OK);
}
}

ValidateMessages(rx_queue, num_publish_messages, "Message number: ");
}

// Single publisher, multiple subscribers (2) on the same topic.
TEST_F(PublisherSubscriberTest, SinglePubMultipleSub) {
auto transport = getTransport();

communication::Publisher pub(transport, makeUUri(TOPIC_URI),
v1::UPayloadFormat::UPAYLOAD_FORMAT_TEXT);

// First subscriber and queue
std::mutex rx_queue_mtx;
std::queue<v1::UMessage> rx_queue;
auto on_rx = [&rx_queue_mtx, &rx_queue](const v1::UMessage& message) {
std::lock_guard lock(rx_queue_mtx);
rx_queue.push(message);
};
auto maybe_sub = communication::Subscriber::subscribe(
transport, makeUUri(TOPIC_URI), std::move(on_rx));
EXPECT_TRUE(maybe_sub);
if (!maybe_sub) {
return;

// Second subscriber and queue
std::mutex rx_queue_mtx2;
std::queue<v1::UMessage> rx_queue2;
auto on_rx2 = [&rx_queue_mtx2, &rx_queue2](const v1::UMessage& message) {
std::lock_guard lock(rx_queue_mtx2);
rx_queue2.push(message);
};
auto maybe_sub2 = communication::Subscriber::subscribe(
transport, makeUUri(TOPIC_URI), std::move(on_rx2));
EXPECT_TRUE(maybe_sub2);

if (maybe_sub && maybe_sub2) {
for (auto remaining = num_publish_messages; remaining > 0;
--remaining) {
std::ostringstream message;
message << "Message number: " << remaining;

auto result =
pub.publish({std::move(message).str(),
v1::UPayloadFormat::UPAYLOAD_FORMAT_TEXT});
EXPECT_EQ(result.code(), v1::UCode::OK);
}
}
auto sub = std::move(maybe_sub).value();

constexpr size_t num_publish_messages = 25;
for (auto remaining = num_publish_messages; remaining > 0; --remaining) {
std::ostringstream message;
message << "Message number: " << remaining;
ValidateMessages(rx_queue, num_publish_messages, "Message number: ");
ValidateMessages(rx_queue2, num_publish_messages, "Message number: ");
}

// Single publisher, two subscribers on different topics
TEST_F(PublisherSubscriberTest, SinglePubMultipleSubDifferentTopics) {
auto transport = getTransport();

communication::Publisher pub(transport, makeUUri(TOPIC_URI),
v1::UPayloadFormat::UPAYLOAD_FORMAT_TEXT);

std::mutex rx_queue_mtx;
std::queue<v1::UMessage> rx_queue;
auto on_rx = [&rx_queue_mtx, &rx_queue](const v1::UMessage& message) {
std::lock_guard lock(rx_queue_mtx);
rx_queue.push(message);
};

auto maybe_sub = communication::Subscriber::subscribe(
transport, makeUUri(TOPIC_URI), std::move(on_rx));
EXPECT_TRUE(maybe_sub);

// subscribe to a different topic (non-existent topic)
auto on_rx2 = [](const v1::UMessage& message) { FAIL(); };
auto maybe_sub2 = communication::Subscriber::subscribe(
transport, makeUUri(TOPIC_URI2), std::move(on_rx2));
EXPECT_TRUE(maybe_sub2);

if (maybe_sub && maybe_sub2) {
for (auto remaining = num_publish_messages; remaining > 0;
--remaining) {
std::ostringstream message;
message << "Message number: " << remaining;

auto result =
pub.publish({std::move(message).str(),
v1::UPayloadFormat::UPAYLOAD_FORMAT_TEXT});
EXPECT_EQ(result.code(), v1::UCode::OK);
}
}

ValidateMessages(rx_queue, num_publish_messages, "Message number: ");
}

// Two publishers, two subscribers, two topics
TEST_F(PublisherSubscriberTest, MultiplePubMultipleSubDifferentTopics) {
auto transport = getTransport();

communication::Publisher pub(transport, makeUUri(TOPIC_URI),
v1::UPayloadFormat::UPAYLOAD_FORMAT_TEXT);
communication::Publisher pub2(transport, makeUUri(TOPIC_URI2),
v1::UPayloadFormat::UPAYLOAD_FORMAT_TEXT);

// first subscriber and queue
std::mutex rx_queue_mtx;
std::queue<v1::UMessage> rx_queue;
auto on_rx = [&rx_queue_mtx, &rx_queue](const v1::UMessage& message) {
std::lock_guard lock(rx_queue_mtx);
rx_queue.push(message);
};
auto maybe_sub = communication::Subscriber::subscribe(
transport, makeUUri(TOPIC_URI), std::move(on_rx));
EXPECT_TRUE(maybe_sub);

// second subscriber and queue
std::mutex rx_queue_mtx2;
std::queue<v1::UMessage> rx_queue2;
auto on_rx2 = [&rx_queue_mtx2, &rx_queue2](const v1::UMessage& message) {
std::lock_guard lock(rx_queue_mtx2);
rx_queue2.push(message);
};
auto maybe_sub2 = communication::Subscriber::subscribe(
transport, makeUUri(TOPIC_URI2), std::move(on_rx2));
EXPECT_TRUE(maybe_sub2);

if (maybe_sub && maybe_sub2) {
for (auto remaining = num_publish_messages; remaining > 0;
--remaining) {
std::ostringstream message;
message << "Pub 1 - Message number: " << remaining;
auto result =
pub.publish({std::move(message).str(),
v1::UPayloadFormat::UPAYLOAD_FORMAT_TEXT});
EXPECT_EQ(result.code(), v1::UCode::OK);

auto result = pub.publish({std::move(message).str(),
v1::UPayloadFormat::UPAYLOAD_FORMAT_TEXT});
EXPECT_EQ(result.code(), v1::UCode::OK);
std::ostringstream message2;
message2 << "Pub 2 - Message number: " << remaining;
auto result2 =
pub2.publish({std::move(message2).str(),
v1::UPayloadFormat::UPAYLOAD_FORMAT_TEXT});
EXPECT_EQ(result2.code(), v1::UCode::OK);
}
}

EXPECT_EQ(rx_queue.size(), num_publish_messages);
EXPECT_NE(sub, nullptr);
sub.reset();
ValidateMessages(rx_queue, num_publish_messages,
"Pub 1 - Message number: ");
ValidateMessages(rx_queue2, num_publish_messages,
"Pub 2 - Message number: ");
}

} // namespace

0 comments on commit 88cc7d1

Please sign in to comment.