Skip to content

Commit bd8c2a7

Browse files
committed
fix(metrics): reload backend data during config reload.
Reloading backends also resulted on updated backend on metrics
1 parent b5e7a3c commit bd8c2a7

File tree

4 files changed

+168
-94
lines changed

4 files changed

+168
-94
lines changed

zstor/src/actors/backends.rs

Lines changed: 127 additions & 73 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
use crate::actors::{
2-
config::{ConfigActor, GetConfig, ReplaceMetaBackend},
2+
config::{ConfigActor, GetConfig, ReloadConfig, ReplaceMetaBackend},
33
explorer::{ExpandStorage, SizeRequest},
44
meta::{MarkWriteable, MetaStoreActor, ReplaceMetaStore},
55
metrics::{MetricsActor, SetDataBackendInfo, SetMetaBackendInfo},
@@ -103,78 +103,9 @@ impl Actor for BackendManagerActor {
103103

104104
ctx.wait(
105105
async move {
106-
match cfg_addr.send(GetConfig).await {
107-
Err(e) => {
108-
error!("Could not get config: {}", e);
109-
// we won't manage the dbs
110-
(HashMap::new(), HashMap::new())
111-
}
112-
Ok(config) => {
113-
let managed_seq_dbs = stream::iter(config.backends())
114-
.filter_map(|ci| async move {
115-
let db = match SequentialZdb::new(ci.clone()).await {
116-
Ok(db) => db,
117-
Err(e) => {
118-
warn!(
119-
"Could not connect to backend {} in config file: {}",
120-
ci, e
121-
);
122-
return Some((ci.clone(), (None, BackendState::new())));
123-
}
124-
};
125-
let ns_info = match db.ns_info().await {
126-
Ok(info) => info,
127-
Err(e) => {
128-
warn!("Failed to get ns info from backend {}: {}", ci, e);
129-
return Some((ci.clone(), (Some(db), BackendState::new())));
130-
}
131-
};
132-
133-
let free_space = ns_info.free_space();
134-
let state = if free_space <= FREESPACE_TRESHOLD {
135-
BackendState::LowSpace(free_space)
136-
} else {
137-
BackendState::Healthy
138-
};
139-
140-
Some((ci.clone(), (Some(db), state)))
141-
})
142-
.collect()
143-
.await;
144-
let managed_meta_dbs = match config.meta() {
145-
Meta::Zdb(zdb_meta_cfg) => {
146-
stream::iter(zdb_meta_cfg.backends())
147-
.filter_map(|ci| async move {
148-
let db = match UserKeyZdb::new(ci.clone()).await {
149-
Ok(db) => db,
150-
Err(e) => {
151-
warn!("Failed to connect to metadata backend {} in config file: {}", ci ,e);
152-
return Some((ci.clone(), (None, BackendState::new())));
153-
}
154-
};
155-
let ns_info = match db.ns_info().await {
156-
Ok(info) => info,
157-
Err(e) => {
158-
warn!("Failed to get ns info from metadata backend {}: {}", ci, e);
159-
return Some((ci.clone(), (Some(db), BackendState::new())));
160-
}
161-
};
162-
let free_space = ns_info.free_space();
163-
let state = if free_space <= FREESPACE_TRESHOLD {
164-
BackendState::LowSpace(free_space)
165-
} else {
166-
BackendState::Healthy
167-
};
168-
169-
Some((ci.clone(), (Some(db), state)))
170-
})
171-
.collect()
172-
.await
173-
}
174-
};
175-
(managed_seq_dbs, managed_meta_dbs)
176-
}
177-
}
106+
let (managed_seq_dbs, managed_meta_dbs) =
107+
get_zdbs_from_config(cfg_addr.clone()).await;
108+
(managed_seq_dbs, managed_meta_dbs)
178109
}
179110
.into_actor(self)
180111
.map(|res, actor, _| {
@@ -189,6 +120,129 @@ impl Actor for BackendManagerActor {
189120
);
190121
}
191122
}
123+
impl Handler<ReloadConfig> for BackendManagerActor {
124+
type Result = Result<(), ZstorError>;
125+
126+
fn handle(&mut self, _: ReloadConfig, ctx: &mut Self::Context) -> Self::Result {
127+
let cfg_addr = self.config_addr.clone();
128+
let fut = Box::pin(
129+
async move {
130+
let (managed_seq_dbs, managed_meta_dbs) =
131+
get_zdbs_from_config(cfg_addr.clone()).await;
132+
(managed_seq_dbs, managed_meta_dbs)
133+
}
134+
.into_actor(self)
135+
.map(|(seq_dbs, meta_dbs), actor, _| {
136+
// remove the data backends that are no longer managed from the metrics
137+
for (ci, _) in actor.managed_seq_dbs.iter() {
138+
if !seq_dbs.contains_key(ci) {
139+
actor.metrics.do_send(SetDataBackendInfo {
140+
ci: ci.clone(),
141+
info: None,
142+
});
143+
}
144+
}
145+
146+
// remove the meta backends that are no longer managed from the metrics
147+
for (ci, _) in actor.managed_meta_dbs.iter() {
148+
if !meta_dbs.contains_key(ci) {
149+
actor.metrics.do_send(SetMetaBackendInfo {
150+
ci: ci.clone(),
151+
info: None,
152+
});
153+
}
154+
}
155+
actor.managed_seq_dbs = seq_dbs;
156+
actor.managed_meta_dbs = meta_dbs;
157+
}),
158+
);
159+
ctx.spawn(fut);
160+
Ok(())
161+
}
162+
}
163+
164+
async fn get_zdbs_from_config(
165+
cfg_addr: Addr<ConfigActor>,
166+
) -> (
167+
HashMap<ZdbConnectionInfo, (Option<SequentialZdb>, BackendState)>,
168+
HashMap<ZdbConnectionInfo, (Option<UserKeyZdb>, BackendState)>,
169+
) {
170+
match cfg_addr.send(GetConfig).await {
171+
Err(e) => {
172+
error!("Could not get config: {}", e);
173+
// we won't manage the dbs
174+
(HashMap::new(), HashMap::new())
175+
}
176+
Ok(config) => {
177+
let managed_seq_dbs = stream::iter(config.backends())
178+
.filter_map(|ci| async move {
179+
let db = match SequentialZdb::new(ci.clone()).await {
180+
Ok(db) => db,
181+
Err(e) => {
182+
warn!("Could not connect to backend {} in config file: {}", ci, e);
183+
return Some((ci.clone(), (None, BackendState::new())));
184+
}
185+
};
186+
let ns_info = match db.ns_info().await {
187+
Ok(info) => info,
188+
Err(e) => {
189+
warn!("Failed to get ns info from backend {}: {}", ci, e);
190+
return Some((ci.clone(), (Some(db), BackendState::new())));
191+
}
192+
};
193+
194+
let free_space = ns_info.free_space();
195+
let state = if free_space <= FREESPACE_TRESHOLD {
196+
BackendState::LowSpace(free_space)
197+
} else {
198+
BackendState::Healthy
199+
};
200+
Some((ci.clone(), (Some(db), state)))
201+
})
202+
.collect()
203+
.await;
204+
205+
let managed_meta_dbs = match config.meta() {
206+
Meta::Zdb(zdb_meta_cfg) => {
207+
stream::iter(zdb_meta_cfg.backends())
208+
.filter_map(|ci| async move {
209+
let db = match UserKeyZdb::new(ci.clone()).await {
210+
Ok(db) => db,
211+
Err(e) => {
212+
warn!(
213+
"Failed to connect to metadata backend {} in config file: {}",
214+
ci, e
215+
);
216+
return Some((ci.clone(), (None, BackendState::new())));
217+
}
218+
};
219+
let ns_info = match db.ns_info().await {
220+
Ok(info) => info,
221+
Err(e) => {
222+
warn!(
223+
"Failed to get ns info from metadata backend {}: {}",
224+
ci, e
225+
);
226+
return Some((ci.clone(), (Some(db), BackendState::new())));
227+
}
228+
};
229+
let free_space = ns_info.free_space();
230+
let state = if free_space <= FREESPACE_TRESHOLD {
231+
BackendState::LowSpace(free_space)
232+
} else {
233+
BackendState::Healthy
234+
};
235+
236+
Some((ci.clone(), (Some(db), state)))
237+
})
238+
.collect()
239+
.await
240+
}
241+
};
242+
(managed_seq_dbs, managed_meta_dbs)
243+
}
244+
}
245+
}
192246

193247
impl Handler<CheckBackends> for BackendManagerActor {
194248
type Result = ResponseActFuture<Self, ()>;

zstor/src/actors/metrics.rs

Lines changed: 21 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -11,14 +11,25 @@ use prometheus::{
1111
use std::mem;
1212
use std::{collections::HashMap, fmt, string::FromUtf8Error};
1313

14-
const BACKEND_TYPE_DATA: &str = "data";
15-
const BACKEND_TYPE_META: &str = "meta";
14+
enum BackendType {
15+
Data,
16+
Meta,
17+
}
18+
19+
impl BackendType {
20+
fn as_str(&self) -> &str {
21+
match self {
22+
BackendType::Data => "data",
23+
BackendType::Meta => "meta",
24+
}
25+
}
26+
}
1627

1728
/// A metrics actor collecting metrics from the system.
1829
pub struct MetricsActor {
1930
data_zdbs: HashMap<ZdbConnectionInfo, NsInfo>,
2031
meta_zdbs: HashMap<ZdbConnectionInfo, NsInfo>,
21-
removed_zdbs: Vec<ZdbConnectionInfo>,
32+
removed_zdbs: Vec<(ZdbConnectionInfo, BackendType)>,
2233
successful_zstor_commands: HashMap<ZstorCommandId, usize>,
2334
failed_zstor_commands: HashMap<ZstorCommandId, usize>,
2435
zdbfs_stats: stats_t,
@@ -299,7 +310,7 @@ impl Handler<SetDataBackendInfo> for MetricsActor {
299310
self.data_zdbs.insert(msg.ci, info);
300311
} else {
301312
self.data_zdbs.remove(&msg.ci);
302-
self.removed_zdbs.push(msg.ci);
313+
self.removed_zdbs.push((msg.ci, BackendType::Data));
303314
}
304315
}
305316
}
@@ -312,7 +323,7 @@ impl Handler<SetMetaBackendInfo> for MetricsActor {
312323
self.meta_zdbs.insert(msg.ci, info);
313324
} else {
314325
self.meta_zdbs.remove(&msg.ci);
315-
self.removed_zdbs.push(msg.ci);
326+
self.removed_zdbs.push((msg.ci, BackendType::Meta));
316327
}
317328
}
318329
}
@@ -348,16 +359,12 @@ impl Handler<GetPrometheusMetrics> for MetricsActor {
348359
// Take ownerhsip of the removed zdb list, and leave an empty (default) list in its
349360
// place.
350361
let removed_zdbs = mem::take(&mut self.removed_zdbs);
351-
for ci in removed_zdbs {
362+
for (ci, backend_type) in removed_zdbs {
352363
let mut labels = HashMap::new();
353364
labels.insert("namespace", ci.namespace().unwrap_or(""));
354365
let address = ci.address().to_string();
355366
labels.insert("address", &address);
356-
if self.data_zdbs.contains_key(&ci) {
357-
labels.insert("backend_type", BACKEND_TYPE_DATA);
358-
} else {
359-
labels.insert("backend_type", BACKEND_TYPE_META);
360-
}
367+
labels.insert("backend_type", backend_type.as_str());
361368

362369
if let Err(e) = self.prom_metrics.entries_gauges.remove(&labels) {
363370
warn!("Failed to delete removed metric by label: {}", e)
@@ -404,18 +411,18 @@ impl Handler<GetPrometheusMetrics> for MetricsActor {
404411
for (ci, (info, backend_type)) in self
405412
.data_zdbs
406413
.iter()
407-
.map(|(ci, info)| (ci, (info, BACKEND_TYPE_DATA)))
414+
.map(|(ci, info)| (ci, (info, BackendType::Data)))
408415
.chain(
409416
self.meta_zdbs
410417
.iter()
411-
.map(|(ci, info)| (ci, (info, BACKEND_TYPE_META))),
418+
.map(|(ci, info)| (ci, (info, BackendType::Meta))),
412419
)
413420
{
414421
let mut labels = HashMap::new();
415422
labels.insert("namespace", ci.namespace().unwrap_or(""));
416423
let address = ci.address().to_string();
417424
labels.insert("address", &address);
418-
labels.insert("backend_type", backend_type);
425+
labels.insert("backend_type", backend_type.as_str());
419426

420427
let entries_gauge = self.prom_metrics.entries_gauges.get_metric_with(&labels)?;
421428
let data_size_bytes_gauge = self

zstor/src/actors/zstor.rs

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ use std::{
2121
};
2222
use tokio::{fs, io, task::JoinHandle};
2323

24-
use super::config::ReloadConfig;
24+
use super::{backends::BackendManagerActor, config::ReloadConfig};
2525

2626
#[derive(Serialize, Deserialize, Debug, Clone)]
2727
/// All possible commands zstor operates on.
@@ -105,6 +105,7 @@ pub struct ZstorActor {
105105
pipeline: Addr<PipelineActor>,
106106
meta: Addr<MetaStoreActor>,
107107
metrics: Addr<MetricsActor>,
108+
backend: Addr<BackendManagerActor>,
108109
}
109110

110111
impl ZstorActor {
@@ -114,12 +115,14 @@ impl ZstorActor {
114115
pipeline: Addr<PipelineActor>,
115116
meta: Addr<MetaStoreActor>,
116117
metrics: Addr<MetricsActor>,
118+
backend: Addr<BackendManagerActor>,
117119
) -> ZstorActor {
118120
Self {
119121
cfg,
120122
pipeline,
121123
meta,
122124
metrics,
125+
backend,
123126
}
124127
}
125128
}
@@ -395,7 +398,11 @@ impl Handler<ReloadConfig> for ZstorActor {
395398

396399
fn handle(&mut self, _: ReloadConfig, _: &mut Self::Context) -> Self::Result {
397400
let cfg = self.cfg.clone();
398-
Box::pin(async move { cfg.send(ReloadConfig).await? })
401+
let backend = self.backend.clone();
402+
Box::pin(async move {
403+
let _ = cfg.send(ReloadConfig).await?;
404+
backend.send(ReloadConfig).await?
405+
})
399406
}
400407
}
401408

zstor/src/lib.rs

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -92,21 +92,27 @@ pub async fn setup_system(cfg_path: PathBuf, cfg: &Config) -> ZstorResult<Addr<Z
9292
let cfg_addr = ConfigActor::new(cfg_path, cfg.clone()).start();
9393
let pipeline_addr = SyncArbiter::start(1, || PipelineActor);
9494

95+
let explorer = NopExplorerActor::new().start().recipient();
96+
97+
let backends = BackendManagerActor::new(
98+
cfg_addr.clone(),
99+
explorer,
100+
metrics_addr.clone(),
101+
meta_addr.clone(),
102+
)
103+
.start();
104+
95105
let zstor = ZstorActor::new(
96106
cfg_addr.clone(),
97107
pipeline_addr,
98108
meta_addr.clone(),
99109
metrics_addr.clone(),
110+
backends.clone(),
100111
)
101112
.start();
102113

103114
let _ = DirMonitorActor::new(cfg_addr.clone(), zstor.clone()).start();
104115

105-
let explorer = NopExplorerActor::new().start().recipient();
106-
107-
let backends =
108-
BackendManagerActor::new(cfg_addr, explorer, metrics_addr.clone(), meta_addr.clone())
109-
.start();
110116
let _ = RepairActor::new(meta_addr, backends, zstor.clone()).start();
111117

112118
// Setup prometheus endpoint if needed

0 commit comments

Comments
 (0)