12
12
// ZettaScale Zenoh Team, <[email protected] >
13
13
//
14
14
15
+ #include < chrono>
15
16
#include < thread>
16
17
17
18
#include " zenoh.hxx"
@@ -118,7 +119,9 @@ void put_sub_fifo_channel(Talloc& alloc) {
118
119
119
120
std::this_thread::sleep_for (1s);
120
121
121
- auto subscriber = session2.declare_subscriber (ke, channels::FifoChannel (16 ));
122
+ std::vector<Subscriber<channels::FifoChannel::HandlerType<Sample>>> subscribers;
123
+ subscribers.push_back (session2.declare_subscriber (ke, channels::FifoChannel (16 )));
124
+ subscribers.push_back (session2.declare_subscriber (ke, channels::FifoChannel (16 )));
122
125
123
126
std::this_thread::sleep_for (1s);
124
127
@@ -127,24 +130,28 @@ void put_sub_fifo_channel(Talloc& alloc) {
127
130
128
131
std::this_thread::sleep_for (1s);
129
132
130
- auto res = subscriber.handler ().recv ();
131
- assert (std::holds_alternative<Sample>(res));
132
- assert (std::get<Sample>(res).get_keyexpr () == " zenoh/test" );
133
- assert (std::get<Sample>(res).get_payload ().as_string () == " first" );
134
- res = subscriber.handler ().try_recv ();
135
- assert (std::holds_alternative<Sample>(res));
136
- assert (std::get<Sample>(res).get_keyexpr () == " zenoh/test" );
137
- assert (std::get<Sample>(res).get_payload ().as_string () == " second" );
138
-
139
- res = subscriber.handler ().try_recv ();
140
- assert (std::holds_alternative<channels::RecvError>(res));
141
- assert (std::get<channels::RecvError>(res) == channels::RecvError::Z_NODATA);
133
+ for (const auto & subscriber : subscribers) {
134
+ auto res = subscriber.handler ().recv ();
135
+ assert (std::holds_alternative<Sample>(res));
136
+ assert (std::get<Sample>(res).get_keyexpr () == " zenoh/test" );
137
+ assert (std::get<Sample>(res).get_payload ().as_string () == " first" );
138
+ res = subscriber.handler ().try_recv ();
139
+ assert (std::holds_alternative<Sample>(res));
140
+ assert (std::get<Sample>(res).get_keyexpr () == " zenoh/test" );
141
+ assert (std::get<Sample>(res).get_payload ().as_string () == " second" );
142
+
143
+ res = subscriber.handler ().try_recv ();
144
+ assert (std::holds_alternative<channels::RecvError>(res));
145
+ assert (std::get<channels::RecvError>(res) == channels::RecvError::Z_NODATA);
146
+ }
142
147
143
148
// / after session close subscriber handler should become disconnected
144
149
session2.close ();
145
- res = subscriber.handler ().recv ();
146
- assert (std::holds_alternative<channels::RecvError>(res));
147
- assert (std::get<channels::RecvError>(res) == channels::RecvError::Z_DISCONNECTED);
150
+ for (const auto & subscriber : subscribers) {
151
+ auto res = subscriber.handler ().recv ();
152
+ assert (std::holds_alternative<channels::RecvError>(res));
153
+ assert (std::get<channels::RecvError>(res) == channels::RecvError::Z_DISCONNECTED);
154
+ }
148
155
}
149
156
150
157
template <typename Talloc>
@@ -155,7 +162,9 @@ void put_sub_ring_channel(Talloc& alloc) {
155
162
156
163
std::this_thread::sleep_for (1s);
157
164
158
- auto subscriber = session2.declare_subscriber (ke, channels::RingChannel (1 ));
165
+ std::vector<Subscriber<channels::RingChannel::HandlerType<Sample>>> subscribers;
166
+ subscribers.push_back (session2.declare_subscriber (ke, channels::RingChannel (1 )));
167
+ subscribers.push_back (session2.declare_subscriber (ke, channels::RingChannel (1 )));
159
168
160
169
std::this_thread::sleep_for (1s);
161
170
@@ -164,20 +173,24 @@ void put_sub_ring_channel(Talloc& alloc) {
164
173
165
174
std::this_thread::sleep_for (1s);
166
175
167
- auto res = subscriber.handler ().recv ();
168
- assert (std::holds_alternative<Sample>(res));
169
- assert (std::get<Sample>(res).get_keyexpr () == " zenoh/test" );
170
- assert (std::get<Sample>(res).get_payload ().as_string () == " second" );
176
+ for (const auto & subscriber : subscribers) {
177
+ auto res = subscriber.handler ().recv ();
178
+ assert (std::holds_alternative<Sample>(res));
179
+ assert (std::get<Sample>(res).get_keyexpr () == " zenoh/test" );
180
+ assert (std::get<Sample>(res).get_payload ().as_string () == " second" );
171
181
172
- res = subscriber.handler ().try_recv ();
173
- assert (std::holds_alternative<channels::RecvError>(res));
174
- assert (std::get<channels::RecvError>(res) == channels::RecvError::Z_NODATA);
182
+ res = subscriber.handler ().try_recv ();
183
+ assert (std::holds_alternative<channels::RecvError>(res));
184
+ assert (std::get<channels::RecvError>(res) == channels::RecvError::Z_NODATA);
185
+ }
175
186
176
187
// / after session close subscriber handler should become disconnected
177
188
session2.close ();
178
- res = subscriber.handler ().recv ();
179
- assert (std::holds_alternative<channels::RecvError>(res));
180
- assert (std::get<channels::RecvError>(res) == channels::RecvError::Z_DISCONNECTED);
189
+ for (const auto & subscriber : subscribers) {
190
+ auto res = subscriber.handler ().recv ();
191
+ assert (std::holds_alternative<channels::RecvError>(res));
192
+ assert (std::get<channels::RecvError>(res) == channels::RecvError::Z_DISCONNECTED);
193
+ }
181
194
}
182
195
183
196
template <typename Talloc, bool share_alloc = true >
0 commit comments