@@ -794,21 +794,33 @@ mod tests {
794
794
use super :: * ;
795
795
use std:: time:: Duration ;
796
796
use futures:: { stream, channel:: mpsc, Stream } ;
797
- use std:: sync:: { Arc , Mutex } ;
797
+ use std:: sync:: { Arc , Mutex , Condvar } ;
798
798
use std:: pin:: Pin ;
799
799
use tokio:: runtime:: Runtime ;
800
800
801
801
// Just contains topic->channel mapping to give to outer code on `gossip_messages_for` calls.
802
802
struct TestGossipMessages {
803
- messages : Arc < Mutex < HashMap < Hash , mpsc:: UnboundedReceiver < ( Hash , Hash , ErasureChunk ) > > > > ,
803
+ messages : Arc < Mutex < HashMap <
804
+ Hash ,
805
+ (
806
+ Arc < ( Mutex < bool > , Condvar ) > ,
807
+ mpsc:: UnboundedReceiver < ( Hash , Hash , ErasureChunk ) > ,
808
+ ) ,
809
+ > > > ,
804
810
}
805
811
806
812
impl ProvideGossipMessages for TestGossipMessages {
807
813
fn gossip_messages_for ( & self , topic : Hash )
808
814
-> Pin < Box < dyn Stream < Item = ( Hash , Hash , ErasureChunk ) > + Send > >
809
815
{
810
816
match self . messages . lock ( ) . unwrap ( ) . remove ( & topic) {
811
- Some ( receiver) => receiver. boxed ( ) ,
817
+ Some ( ( pair, receiver) ) => {
818
+ let ( lock, cvar) = & * pair;
819
+ let mut consumed = lock. lock ( ) . unwrap ( ) ;
820
+ * consumed = true ;
821
+ cvar. notify_one ( ) ;
822
+ receiver. boxed ( )
823
+ } ,
812
824
None => stream:: iter ( vec ! [ ] ) . boxed ( ) ,
813
825
}
814
826
}
@@ -851,9 +863,10 @@ mod tests {
851
863
852
864
let topic = erasure_coding_topic ( relay_parent, erasure_root, local_id) ;
853
865
866
+ let pair = Arc :: new ( ( Mutex :: new ( false ) , Condvar :: new ( ) ) ) ;
854
867
let messages = TestGossipMessages {
855
868
messages : Arc :: new ( Mutex :: new ( vec ! [
856
- ( topic, gossip_receiver)
869
+ ( topic, ( pair . clone ( ) , gossip_receiver) )
857
870
] . into_iter ( ) . collect ( ) ) )
858
871
} ;
859
872
@@ -961,11 +974,14 @@ mod tests {
961
974
let topic_1 = erasure_coding_topic ( relay_parent, erasure_root_1, local_id) ;
962
975
let topic_2 = erasure_coding_topic ( relay_parent, erasure_root_2, local_id) ;
963
976
977
+ let cvar_pair1 = Arc :: new ( ( Mutex :: new ( false ) , Condvar :: new ( ) ) ) ;
978
+ let cvar_pair2 = Arc :: new ( ( Mutex :: new ( false ) , Condvar :: new ( ) ) ) ;
979
+
964
980
let messages = TestGossipMessages {
965
981
messages : Arc :: new ( Mutex :: new (
966
982
vec ! [
967
- ( topic_1, gossip_receiver_1) ,
968
- ( topic_2, gossip_receiver_2) ,
983
+ ( topic_1, ( cvar_pair1 . clone ( ) , gossip_receiver_1) ) ,
984
+ ( topic_2, ( cvar_pair2 , gossip_receiver_2) ) ,
969
985
] . into_iter ( ) . collect ( ) ) )
970
986
} ;
971
987
@@ -1000,6 +1016,16 @@ mod tests {
1000
1016
handle. sender . unbounded_send ( listen_msg_1) . unwrap ( ) ;
1001
1017
runtime. block_on ( r1) . unwrap ( ) . unwrap ( ) ;
1002
1018
1019
+ // Here, we are racing against the worker thread that might have not yet
1020
+ // reached the point when it requests the gossip messages for `topic_2`
1021
+ // which will get them removed from `TestGossipMessages`. Therefore, the
1022
+ // `Condvar` is used to wait for that event.
1023
+ let ( lock, cvar1) = & * cvar_pair1;
1024
+ let mut gossip_stream_consumed = lock. lock ( ) . unwrap ( ) ;
1025
+ while !* gossip_stream_consumed {
1026
+ gossip_stream_consumed = cvar1. wait ( gossip_stream_consumed) . unwrap ( ) ;
1027
+ }
1028
+
1003
1029
// The gossip sender taken => listener registered.
1004
1030
assert ! ( !messages. messages. lock( ) . unwrap( ) . contains_key( & topic_1) ) ;
1005
1031
}
0 commit comments