@@ -131,6 +131,7 @@ struct ClientInner {
131
131
session_pool : ServerSessionPool ,
132
132
shutdown : Shutdown ,
133
133
dropped : AtomicBool ,
134
+ end_sessions_token : std:: sync:: Mutex < AsyncDropToken > ,
134
135
#[ cfg( feature = "in-use-encryption" ) ]
135
136
csfle : tokio:: sync:: RwLock < Option < csfle:: ClientState > > ,
136
137
#[ cfg( test) ]
@@ -159,6 +160,18 @@ impl Client {
159
160
pub fn with_options ( options : ClientOptions ) -> Result < Self > {
160
161
options. validate ( ) ?;
161
162
163
+ // Spawn a cleanup task, similar to register_async_drop
164
+ let ( cleanup_tx, cleanup_rx) = tokio:: sync:: oneshot:: channel :: < BoxFuture < ' static , ( ) > > ( ) ;
165
+ crate :: runtime:: spawn ( async move {
166
+ // If the cleanup channel is closed, that task was dropped.
167
+ if let Ok ( cleanup) = cleanup_rx. await {
168
+ cleanup. await ;
169
+ }
170
+ } ) ;
171
+ let end_sessions_token = std:: sync:: Mutex :: new ( AsyncDropToken {
172
+ tx : Some ( cleanup_tx) ,
173
+ } ) ;
174
+
162
175
let inner = TrackingArc :: new ( ClientInner {
163
176
topology : Topology :: new ( options. clone ( ) ) ?,
164
177
session_pool : ServerSessionPool :: new ( ) ,
@@ -168,6 +181,7 @@ impl Client {
168
181
executed : AtomicBool :: new ( false ) ,
169
182
} ,
170
183
dropped : AtomicBool :: new ( false ) ,
184
+ end_sessions_token,
171
185
#[ cfg( feature = "in-use-encryption" ) ]
172
186
csfle : Default :: default ( ) ,
173
187
#[ cfg( test) ]
@@ -682,9 +696,13 @@ impl Drop for Client {
682
696
// this cycle.
683
697
self . inner . dropped . store ( true , Ordering :: SeqCst ) ;
684
698
let client = self . clone ( ) ;
685
- crate :: runtime:: spawn ( async move {
686
- client. end_all_sessions ( ) . await ;
687
- } ) ;
699
+ self . inner
700
+ . end_sessions_token
701
+ . lock ( )
702
+ . unwrap ( )
703
+ . spawn ( async move {
704
+ client. end_all_sessions ( ) . await ;
705
+ } ) ;
688
706
}
689
707
}
690
708
}
0 commit comments