11use crate :: actors:: {
2- config:: { ConfigActor , GetConfig , ReplaceMetaBackend } ,
2+ config:: { ConfigActor , GetConfig , ReloadConfig , ReplaceMetaBackend } ,
33 explorer:: { ExpandStorage , SizeRequest } ,
44 meta:: { MarkWriteable , MetaStoreActor , ReplaceMetaStore } ,
55 metrics:: { MetricsActor , SetDataBackendInfo , SetMetaBackendInfo } ,
@@ -103,78 +103,9 @@ impl Actor for BackendManagerActor {
103103
104104 ctx. wait (
105105 async move {
106- match cfg_addr. send ( GetConfig ) . await {
107- Err ( e) => {
108- error ! ( "Could not get config: {}" , e) ;
109- // we won't manage the dbs
110- ( HashMap :: new ( ) , HashMap :: new ( ) )
111- }
112- Ok ( config) => {
113- let managed_seq_dbs = stream:: iter ( config. backends ( ) )
114- . filter_map ( |ci| async move {
115- let db = match SequentialZdb :: new ( ci. clone ( ) ) . await {
116- Ok ( db) => db,
117- Err ( e) => {
118- warn ! (
119- "Could not connect to backend {} in config file: {}" ,
120- ci, e
121- ) ;
122- return Some ( ( ci. clone ( ) , ( None , BackendState :: new ( ) ) ) ) ;
123- }
124- } ;
125- let ns_info = match db. ns_info ( ) . await {
126- Ok ( info) => info,
127- Err ( e) => {
128- warn ! ( "Failed to get ns info from backend {}: {}" , ci, e) ;
129- return Some ( ( ci. clone ( ) , ( Some ( db) , BackendState :: new ( ) ) ) ) ;
130- }
131- } ;
132-
133- let free_space = ns_info. free_space ( ) ;
134- let state = if free_space <= FREESPACE_TRESHOLD {
135- BackendState :: LowSpace ( free_space)
136- } else {
137- BackendState :: Healthy
138- } ;
139-
140- Some ( ( ci. clone ( ) , ( Some ( db) , state) ) )
141- } )
142- . collect ( )
143- . await ;
144- let managed_meta_dbs = match config. meta ( ) {
145- Meta :: Zdb ( zdb_meta_cfg) => {
146- stream:: iter ( zdb_meta_cfg. backends ( ) )
147- . filter_map ( |ci| async move {
148- let db = match UserKeyZdb :: new ( ci. clone ( ) ) . await {
149- Ok ( db) => db,
150- Err ( e) => {
151- warn ! ( "Failed to connect to metadata backend {} in config file: {}" , ci , e) ;
152- return Some ( ( ci. clone ( ) , ( None , BackendState :: new ( ) ) ) ) ;
153- }
154- } ;
155- let ns_info = match db. ns_info ( ) . await {
156- Ok ( info) => info,
157- Err ( e) => {
158- warn ! ( "Failed to get ns info from metadata backend {}: {}" , ci, e) ;
159- return Some ( ( ci. clone ( ) , ( Some ( db) , BackendState :: new ( ) ) ) ) ;
160- }
161- } ;
162- let free_space = ns_info. free_space ( ) ;
163- let state = if free_space <= FREESPACE_TRESHOLD {
164- BackendState :: LowSpace ( free_space)
165- } else {
166- BackendState :: Healthy
167- } ;
168-
169- Some ( ( ci. clone ( ) , ( Some ( db) , state) ) )
170- } )
171- . collect ( )
172- . await
173- }
174- } ;
175- ( managed_seq_dbs, managed_meta_dbs)
176- }
177- }
106+ let ( managed_seq_dbs, managed_meta_dbs) =
107+ get_zdbs_from_config ( cfg_addr. clone ( ) ) . await ;
108+ ( managed_seq_dbs, managed_meta_dbs)
178109 }
179110 . into_actor ( self )
180111 . map ( |res, actor, _| {
@@ -189,6 +120,129 @@ impl Actor for BackendManagerActor {
189120 ) ;
190121 }
191122}
123+ impl Handler < ReloadConfig > for BackendManagerActor {
124+ type Result = Result < ( ) , ZstorError > ;
125+
126+ fn handle ( & mut self , _: ReloadConfig , ctx : & mut Self :: Context ) -> Self :: Result {
127+ let cfg_addr = self . config_addr . clone ( ) ;
128+ let fut = Box :: pin (
129+ async move {
130+ let ( managed_seq_dbs, managed_meta_dbs) =
131+ get_zdbs_from_config ( cfg_addr. clone ( ) ) . await ;
132+ ( managed_seq_dbs, managed_meta_dbs)
133+ }
134+ . into_actor ( self )
135+ . map ( |( seq_dbs, meta_dbs) , actor, _| {
136+ // remove the data backends that are no longer managed from the metrics
137+ for ( key, val) in actor. managed_seq_dbs . iter ( ) {
138+ if !seq_dbs. contains_key ( key) {
139+ actor. metrics . do_send ( SetDataBackendInfo {
140+ ci : val. 0 . as_ref ( ) . unwrap ( ) . connection_info ( ) . clone ( ) ,
141+ info : None ,
142+ } ) ;
143+ }
144+ }
145+
146+ // remove the meta backends that are no longer managed from the metrics
147+ for ( key, val) in actor. managed_meta_dbs . iter ( ) {
148+ if !meta_dbs. contains_key ( key) {
149+ actor. metrics . do_send ( SetMetaBackendInfo {
150+ ci : val. 0 . as_ref ( ) . unwrap ( ) . connection_info ( ) . clone ( ) ,
151+ info : None ,
152+ } ) ;
153+ }
154+ }
155+ actor. managed_seq_dbs = seq_dbs;
156+ actor. managed_meta_dbs = meta_dbs;
157+ } ) ,
158+ ) ;
159+ ctx. spawn ( fut) ;
160+ Ok ( ( ) )
161+ }
162+ }
163+
164+ async fn get_zdbs_from_config (
165+ cfg_addr : Addr < ConfigActor > ,
166+ ) -> (
167+ HashMap < ZdbConnectionInfo , ( Option < SequentialZdb > , BackendState ) > ,
168+ HashMap < ZdbConnectionInfo , ( Option < UserKeyZdb > , BackendState ) > ,
169+ ) {
170+ match cfg_addr. send ( GetConfig ) . await {
171+ Err ( e) => {
172+ error ! ( "Could not get config: {}" , e) ;
173+ // we won't manage the dbs
174+ ( HashMap :: new ( ) , HashMap :: new ( ) )
175+ }
176+ Ok ( config) => {
177+ let managed_seq_dbs = stream:: iter ( config. backends ( ) )
178+ . filter_map ( |ci| async move {
179+ let db = match SequentialZdb :: new ( ci. clone ( ) ) . await {
180+ Ok ( db) => db,
181+ Err ( e) => {
182+ warn ! ( "Could not connect to backend {} in config file: {}" , ci, e) ;
183+ return Some ( ( ci. clone ( ) , ( None , BackendState :: new ( ) ) ) ) ;
184+ }
185+ } ;
186+ let ns_info = match db. ns_info ( ) . await {
187+ Ok ( info) => info,
188+ Err ( e) => {
189+ warn ! ( "Failed to get ns info from backend {}: {}" , ci, e) ;
190+ return Some ( ( ci. clone ( ) , ( Some ( db) , BackendState :: new ( ) ) ) ) ;
191+ }
192+ } ;
193+
194+ let free_space = ns_info. free_space ( ) ;
195+ let state = if free_space <= FREESPACE_TRESHOLD {
196+ BackendState :: LowSpace ( free_space)
197+ } else {
198+ BackendState :: Healthy
199+ } ;
200+ Some ( ( ci. clone ( ) , ( Some ( db) , state) ) )
201+ } )
202+ . collect ( )
203+ . await ;
204+
205+ let managed_meta_dbs = match config. meta ( ) {
206+ Meta :: Zdb ( zdb_meta_cfg) => {
207+ stream:: iter ( zdb_meta_cfg. backends ( ) )
208+ . filter_map ( |ci| async move {
209+ let db = match UserKeyZdb :: new ( ci. clone ( ) ) . await {
210+ Ok ( db) => db,
211+ Err ( e) => {
212+ warn ! (
213+ "Failed to connect to metadata backend {} in config file: {}" ,
214+ ci, e
215+ ) ;
216+ return Some ( ( ci. clone ( ) , ( None , BackendState :: new ( ) ) ) ) ;
217+ }
218+ } ;
219+ let ns_info = match db. ns_info ( ) . await {
220+ Ok ( info) => info,
221+ Err ( e) => {
222+ warn ! (
223+ "Failed to get ns info from metadata backend {}: {}" ,
224+ ci, e
225+ ) ;
226+ return Some ( ( ci. clone ( ) , ( Some ( db) , BackendState :: new ( ) ) ) ) ;
227+ }
228+ } ;
229+ let free_space = ns_info. free_space ( ) ;
230+ let state = if free_space <= FREESPACE_TRESHOLD {
231+ BackendState :: LowSpace ( free_space)
232+ } else {
233+ BackendState :: Healthy
234+ } ;
235+
236+ Some ( ( ci. clone ( ) , ( Some ( db) , state) ) )
237+ } )
238+ . collect ( )
239+ . await
240+ }
241+ } ;
242+ ( managed_seq_dbs, managed_meta_dbs)
243+ }
244+ }
245+ }
192246
193247impl Handler < CheckBackends > for BackendManagerActor {
194248 type Result = ResponseActFuture < Self , ( ) > ;
0 commit comments