@@ -174,3 +174,58 @@ impl BarrierWaitResult {
174
174
self . 0
175
175
}
176
176
}
177
+
178
+ #[ cfg( test) ]
179
+ mod test {
180
+ use futures_channel:: mpsc:: unbounded;
181
+ use futures_util:: sink:: SinkExt ;
182
+ use futures_util:: stream:: StreamExt ;
183
+
184
+ use crate :: sync:: { Arc , Barrier } ;
185
+ use crate :: task;
186
+
187
+ #[ test]
188
+ fn test_barrier ( ) {
189
+ // NOTE(dignifiedquire): Based on the test in std, I was seeing some
190
+ // race conditions, so running it in a loop to make sure things are
191
+ // solid.
192
+
193
+ for _ in 0 ..1_000 {
194
+ task:: block_on ( async move {
195
+ const N : usize = 10 ;
196
+
197
+ let barrier = Arc :: new ( Barrier :: new ( N ) ) ;
198
+ let ( tx, mut rx) = unbounded ( ) ;
199
+
200
+ for _ in 0 ..N - 1 {
201
+ let c = barrier. clone ( ) ;
202
+ let mut tx = tx. clone ( ) ;
203
+ task:: spawn ( async move {
204
+ let res = c. wait ( ) . await ;
205
+
206
+ tx. send ( res. is_leader ( ) ) . await . unwrap ( ) ;
207
+ } ) ;
208
+ }
209
+
210
+ // At this point, all spawned threads should be blocked,
211
+ // so we shouldn't get anything from the port
212
+ let res = rx. try_next ( ) ;
213
+ assert ! ( match res {
214
+ Err ( _err) => true ,
215
+ _ => false ,
216
+ } ) ;
217
+
218
+ let mut leader_found = barrier. wait ( ) . await . is_leader ( ) ;
219
+
220
+ // Now, the barrier is cleared and we should get data.
221
+ for _ in 0 ..N - 1 {
222
+ if rx. next ( ) . await . unwrap ( ) {
223
+ assert ! ( !leader_found) ;
224
+ leader_found = true ;
225
+ }
226
+ }
227
+ assert ! ( leader_found) ;
228
+ } ) ;
229
+ }
230
+ }
231
+ }
0 commit comments