Skip to content

Commit 2962e51

Browse files
make pub-sub test use multiple subscribers (#420)
1 parent a9406fa commit 2962e51

File tree

1 file changed

+39
-27
lines changed

1 file changed

+39
-27
lines changed

tests/universal/network/pub_sub.cxx

Lines changed: 39 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -119,7 +119,9 @@ void put_sub_fifo_channel(Talloc& alloc) {
119119

120120
std::this_thread::sleep_for(1s);
121121

122-
auto subscriber = session2.declare_subscriber(ke, channels::FifoChannel(16));
122+
std::vector<Subscriber<channels::FifoChannel::HandlerType<Sample>>> subscribers;
123+
subscribers.push_back(session2.declare_subscriber(ke, channels::FifoChannel(16)));
124+
subscribers.push_back(session2.declare_subscriber(ke, channels::FifoChannel(16)));
123125

124126
std::this_thread::sleep_for(1s);
125127

@@ -128,24 +130,28 @@ void put_sub_fifo_channel(Talloc& alloc) {
128130

129131
std::this_thread::sleep_for(1s);
130132

131-
auto res = subscriber.handler().recv();
132-
assert(std::holds_alternative<Sample>(res));
133-
assert(std::get<Sample>(res).get_keyexpr() == "zenoh/test");
134-
assert(std::get<Sample>(res).get_payload().as_string() == "first");
135-
res = subscriber.handler().try_recv();
136-
assert(std::holds_alternative<Sample>(res));
137-
assert(std::get<Sample>(res).get_keyexpr() == "zenoh/test");
138-
assert(std::get<Sample>(res).get_payload().as_string() == "second");
139-
140-
res = subscriber.handler().try_recv();
141-
assert(std::holds_alternative<channels::RecvError>(res));
142-
assert(std::get<channels::RecvError>(res) == channels::RecvError::Z_NODATA);
133+
for (const auto& subscriber : subscribers) {
134+
auto res = subscriber.handler().recv();
135+
assert(std::holds_alternative<Sample>(res));
136+
assert(std::get<Sample>(res).get_keyexpr() == "zenoh/test");
137+
assert(std::get<Sample>(res).get_payload().as_string() == "first");
138+
res = subscriber.handler().try_recv();
139+
assert(std::holds_alternative<Sample>(res));
140+
assert(std::get<Sample>(res).get_keyexpr() == "zenoh/test");
141+
assert(std::get<Sample>(res).get_payload().as_string() == "second");
142+
143+
res = subscriber.handler().try_recv();
144+
assert(std::holds_alternative<channels::RecvError>(res));
145+
assert(std::get<channels::RecvError>(res) == channels::RecvError::Z_NODATA);
146+
}
143147

144148
/// after session close subscriber handler should become disconnected
145149
session2.close();
146-
res = subscriber.handler().recv();
147-
assert(std::holds_alternative<channels::RecvError>(res));
148-
assert(std::get<channels::RecvError>(res) == channels::RecvError::Z_DISCONNECTED);
150+
for (const auto& subscriber : subscribers) {
151+
auto res = subscriber.handler().recv();
152+
assert(std::holds_alternative<channels::RecvError>(res));
153+
assert(std::get<channels::RecvError>(res) == channels::RecvError::Z_DISCONNECTED);
154+
}
149155
}
150156

151157
template <typename Talloc>
@@ -156,7 +162,9 @@ void put_sub_ring_channel(Talloc& alloc) {
156162

157163
std::this_thread::sleep_for(1s);
158164

159-
auto subscriber = session2.declare_subscriber(ke, channels::RingChannel(1));
165+
std::vector<Subscriber<channels::RingChannel::HandlerType<Sample>>> subscribers;
166+
subscribers.push_back(session2.declare_subscriber(ke, channels::RingChannel(1)));
167+
subscribers.push_back(session2.declare_subscriber(ke, channels::RingChannel(1)));
160168

161169
std::this_thread::sleep_for(1s);
162170

@@ -165,20 +173,24 @@ void put_sub_ring_channel(Talloc& alloc) {
165173

166174
std::this_thread::sleep_for(1s);
167175

168-
auto res = subscriber.handler().recv();
169-
assert(std::holds_alternative<Sample>(res));
170-
assert(std::get<Sample>(res).get_keyexpr() == "zenoh/test");
171-
assert(std::get<Sample>(res).get_payload().as_string() == "second");
176+
for (const auto& subscriber : subscribers) {
177+
auto res = subscriber.handler().recv();
178+
assert(std::holds_alternative<Sample>(res));
179+
assert(std::get<Sample>(res).get_keyexpr() == "zenoh/test");
180+
assert(std::get<Sample>(res).get_payload().as_string() == "second");
172181

173-
res = subscriber.handler().try_recv();
174-
assert(std::holds_alternative<channels::RecvError>(res));
175-
assert(std::get<channels::RecvError>(res) == channels::RecvError::Z_NODATA);
182+
res = subscriber.handler().try_recv();
183+
assert(std::holds_alternative<channels::RecvError>(res));
184+
assert(std::get<channels::RecvError>(res) == channels::RecvError::Z_NODATA);
185+
}
176186

177187
/// after session close subscriber handler should become disconnected
178188
session2.close();
179-
res = subscriber.handler().recv();
180-
assert(std::holds_alternative<channels::RecvError>(res));
181-
assert(std::get<channels::RecvError>(res) == channels::RecvError::Z_DISCONNECTED);
189+
for (const auto& subscriber : subscribers) {
190+
auto res = subscriber.handler().recv();
191+
assert(std::holds_alternative<channels::RecvError>(res));
192+
assert(std::get<channels::RecvError>(res) == channels::RecvError::Z_DISCONNECTED);
193+
}
182194
}
183195

184196
template <typename Talloc, bool share_alloc = true>

0 commit comments

Comments
 (0)