@@ -3,6 +3,7 @@ use crate::{
33 zdb:: ZdbConnectionInfo ,
44} ;
55use actix:: prelude:: * ;
6+ use std:: sync:: atomic:: { AtomicU64 , Ordering } ;
67use std:: { path:: PathBuf , sync:: Arc } ;
78
89#[ derive( Message ) ]
@@ -134,6 +135,7 @@ pub struct ReplaceMetaStore {
134135pub struct MetaStoreActor {
135136 meta_store : Arc < dyn MetaStore > ,
136137 writeable : bool ,
138+ rebuild_all_meta_counter : Arc < AtomicU64 > ,
137139}
138140
139141impl MetaStoreActor {
@@ -143,6 +145,23 @@ impl MetaStoreActor {
143145 Self {
144146 meta_store : Arc :: from ( meta_store) ,
145147 writeable,
148+ rebuild_all_meta_counter : Arc :: new ( AtomicU64 :: new ( 0 ) ) ,
149+ }
150+ }
151+
152+ /// create a guard for the rebuild meta operation.
153+ /// This guard is used to check if a newer rebuild operation has started
154+ /// and the current one should be stopped.
155+ /// We stop the current one because the newer one will have the latest meta store configuration,
156+ /// so there is no point to continue the current one.
157+ fn create_rebuild_meta_guard ( & self ) -> RebuildAllMetaGuard {
158+ let new_gen = self
159+ . rebuild_all_meta_counter
160+ . fetch_add ( 1 , Ordering :: Relaxed )
161+ + 1 ;
162+ RebuildAllMetaGuard {
163+ generation : new_gen,
164+ current_gen : self . rebuild_all_meta_counter . clone ( ) ,
146165 }
147166 }
148167}
@@ -322,6 +341,16 @@ impl Handler<GetFailures> for MetaStoreActor {
322341 }
323342}
324343
344+ struct RebuildAllMetaGuard {
345+ generation : u64 ,
346+ current_gen : Arc < AtomicU64 > ,
347+ }
348+
349+ impl RebuildAllMetaGuard {
350+ fn is_current ( & self ) -> bool {
351+ self . generation == self . current_gen . load ( Ordering :: SeqCst )
352+ }
353+ }
325354/// Rebuild all meta data in the metastore:
326355/// - scan all keys in the metastore before current timestamp
327356/// - load meta by key
@@ -333,6 +362,8 @@ impl Handler<RebuildAllMeta> for MetaStoreActor {
333362 let metastore = self . meta_store . clone ( ) ;
334363 let addr = ctx. address ( ) ;
335364 log:: info!( "Rebuilding all meta handler" ) ;
365+ let rebuild_guard = self . create_rebuild_meta_guard ( ) ;
366+
336367 Box :: pin ( async move {
337368 let mut cursor = None ;
338369 let mut backend_idx = None ;
@@ -361,6 +392,10 @@ impl Handler<RebuildAllMeta> for MetaStoreActor {
361392 } ;
362393
363394 for key in keys {
395+ if !rebuild_guard. is_current ( ) {
396+ log:: info!( "Newer rebuild started, stopping current one" ) ;
397+ break ;
398+ }
364399 log:: info!( "Rebuilding meta key: {}" , key) ;
365400 let meta: MetaData = match addr. send ( LoadMetaByKey { key : key. clone ( ) } ) . await {
366401 Ok ( Ok ( m) ) => m. unwrap ( ) ,
0 commit comments