@@ -20,7 +20,7 @@ use tokio::{
20
20
join,
21
21
process:: { Child , ChildStdin , ChildStdout , Command } ,
22
22
select,
23
- sync:: { mpsc, oneshot, OnceCell } ,
23
+ sync:: { mpsc, oneshot, OnceCell , OwnedSemaphorePermit , Semaphore } ,
24
24
task:: { JoinHandle , JoinSet } ,
25
25
time:: { self , MissedTickBehavior } ,
26
26
} ;
@@ -821,6 +821,66 @@ enum DemultiplexCommand {
821
821
ListenOnce ( JobId , oneshot:: Sender < WorkerMessage > ) ,
822
822
}
823
823
824
+ /// Enforces a limited number of concurrent `Coordinator`s.
825
+ #[ derive( Debug ) ]
826
+ pub struct CoordinatorFactory {
827
+ semaphore : Arc < Semaphore > ,
828
+ }
829
+
830
+ impl CoordinatorFactory {
831
+ pub fn new ( maximum : usize ) -> Self {
832
+ Self {
833
+ semaphore : Arc :: new ( Semaphore :: new ( maximum) ) ,
834
+ }
835
+ }
836
+
837
+ pub async fn build < B > ( & self , backend : B ) -> LimitedCoordinator < B >
838
+ where
839
+ B : Backend ,
840
+ {
841
+ let semaphore = self . semaphore . clone ( ) ;
842
+ let permit = semaphore
843
+ . acquire_owned ( )
844
+ . await
845
+ . expect ( "Unable to acquire permit" ) ;
846
+
847
+ let coordinator = Coordinator :: new ( backend) ;
848
+
849
+ LimitedCoordinator {
850
+ coordinator,
851
+ _permit : permit,
852
+ }
853
+ }
854
+ }
855
+
856
+ pub struct LimitedCoordinator < T > {
857
+ coordinator : Coordinator < T > ,
858
+ _permit : OwnedSemaphorePermit ,
859
+ }
860
+
861
+ impl < T > LimitedCoordinator < T >
862
+ where
863
+ T : Backend ,
864
+ {
865
+ pub async fn shutdown ( self ) -> Result < T > {
866
+ self . coordinator . shutdown ( ) . await
867
+ }
868
+ }
869
+
870
+ impl < T > ops:: Deref for LimitedCoordinator < T > {
871
+ type Target = Coordinator < T > ;
872
+
873
+ fn deref ( & self ) -> & Self :: Target {
874
+ & self . coordinator
875
+ }
876
+ }
877
+
878
+ impl < T > ops:: DerefMut for LimitedCoordinator < T > {
879
+ fn deref_mut ( & mut self ) -> & mut Self :: Target {
880
+ & mut self . coordinator
881
+ }
882
+ }
883
+
824
884
#[ derive( Debug ) ]
825
885
pub struct Coordinator < B > {
826
886
backend : B ,
@@ -2700,7 +2760,6 @@ mod tests {
2700
2760
use futures:: future:: { join, try_join_all} ;
2701
2761
use std:: { env, sync:: Once } ;
2702
2762
use tempdir:: TempDir ;
2703
- use tokio:: sync:: { OwnedSemaphorePermit , Semaphore } ;
2704
2763
2705
2764
use super :: * ;
2706
2765
@@ -2777,67 +2836,9 @@ mod tests {
2777
2836
. unwrap_or ( 5 )
2778
2837
} ) ;
2779
2838
2780
- struct CoordinatorFactory {
2781
- semaphore : Arc < Semaphore > ,
2782
- }
2783
-
2784
- impl CoordinatorFactory {
2785
- pub fn new ( maximum : usize ) -> Self {
2786
- Self {
2787
- semaphore : Arc :: new ( Semaphore :: new ( maximum) ) ,
2788
- }
2789
- }
2790
-
2791
- pub async fn build < B > ( & self , backend : B ) -> LimitedCoordinator < B >
2792
- where
2793
- B : Backend ,
2794
- {
2795
- let semaphore = self . semaphore . clone ( ) ;
2796
- let permit = semaphore
2797
- . acquire_owned ( )
2798
- . await
2799
- . expect ( "Unable to acquire permit" ) ;
2800
-
2801
- let coordinator = Coordinator :: new ( backend) ;
2802
-
2803
- LimitedCoordinator {
2804
- _permit : permit,
2805
- coordinator,
2806
- }
2807
- }
2808
- }
2809
-
2810
2839
static TEST_COORDINATOR_FACTORY : Lazy < CoordinatorFactory > =
2811
2840
Lazy :: new ( || CoordinatorFactory :: new ( * MAX_CONCURRENT_TESTS ) ) ;
2812
2841
2813
- struct LimitedCoordinator < T > {
2814
- _permit : OwnedSemaphorePermit ,
2815
- coordinator : Coordinator < T > ,
2816
- }
2817
-
2818
- impl < T > LimitedCoordinator < T >
2819
- where
2820
- T : Backend ,
2821
- {
2822
- async fn shutdown ( self ) -> super :: Result < T , super :: Error > {
2823
- self . coordinator . shutdown ( ) . await
2824
- }
2825
- }
2826
-
2827
- impl < T > ops:: Deref for LimitedCoordinator < T > {
2828
- type Target = Coordinator < T > ;
2829
-
2830
- fn deref ( & self ) -> & Self :: Target {
2831
- & self . coordinator
2832
- }
2833
- }
2834
-
2835
- impl < T > ops:: DerefMut for LimitedCoordinator < T > {
2836
- fn deref_mut ( & mut self ) -> & mut Self :: Target {
2837
- & mut self . coordinator
2838
- }
2839
- }
2840
-
2841
2842
async fn new_coordinator_test ( ) -> LimitedCoordinator < impl Backend > {
2842
2843
TEST_COORDINATOR_FACTORY . build ( TestBackend :: new ( ) ) . await
2843
2844
}
0 commit comments