@@ -5,6 +5,8 @@ use crate::actors::{
55} ;
66use actix:: prelude:: * ;
77use log:: { debug, error, warn} ;
8+ use std:: sync:: atomic:: { AtomicBool , Ordering } ;
9+ use std:: sync:: Arc ;
810use std:: time:: Duration ;
911use std:: time:: { SystemTime , UNIX_EPOCH } ;
1012
@@ -23,6 +25,7 @@ pub struct RepairActor {
2325 meta : Addr < MetaStoreActor > ,
2426 backend_manager : Addr < BackendManagerActor > ,
2527 zstor : Addr < ZstorActor > ,
28+ handling_sweep_objects : Arc < AtomicBool > ,
2629}
2730
2831impl RepairActor {
@@ -37,6 +40,7 @@ impl RepairActor {
3740 meta,
3841 backend_manager,
3942 zstor,
43+ handling_sweep_objects : Arc :: new ( AtomicBool :: new ( false ) ) ,
4044 }
4145 }
4246
@@ -57,15 +61,36 @@ impl Actor for RepairActor {
5761 }
5862}
5963
64+ struct SweepGuard {
65+ flag : Arc < AtomicBool > ,
66+ }
67+
68+ impl Drop for SweepGuard {
69+ fn drop ( & mut self ) {
70+ self . flag . store ( false , Ordering :: Release ) ;
71+ }
72+ }
73+
6074impl Handler < SweepObjects > for RepairActor {
6175 type Result = ResponseFuture < ( ) > ;
6276
6377 fn handle ( & mut self , _: SweepObjects , _: & mut Self :: Context ) -> Self :: Result {
78+ if self . handling_sweep_objects . swap ( true , Ordering :: Acquire ) {
79+ log:: info!( "Dropping SweepObjects message - still processing" ) ;
80+ return Box :: pin ( async { } ) ;
81+ }
82+
6483 let meta = self . meta . clone ( ) ;
6584 let backend_manager = self . backend_manager . clone ( ) ;
6685 let zstor = self . zstor . clone ( ) ;
86+ let handling_sweep_objects = Arc :: clone ( & self . handling_sweep_objects ) ;
6787
88+ log:: info!( "Starting SweepObjects" ) ;
6889 Box :: pin ( async move {
90+ let _guard = SweepGuard {
91+ flag : Arc :: clone ( & handling_sweep_objects) ,
92+ } ;
93+
6994 let start_time = SystemTime :: now ( )
7095 . duration_since ( UNIX_EPOCH )
7196 . unwrap ( )
0 commit comments