Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
200 changes: 127 additions & 73 deletions zstor/src/actors/backends.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::actors::{
config::{ConfigActor, GetConfig, ReplaceMetaBackend},
config::{ConfigActor, GetConfig, ReloadConfig, ReplaceMetaBackend},
explorer::{ExpandStorage, SizeRequest},
meta::{MarkWriteable, MetaStoreActor, ReplaceMetaStore},
metrics::{MetricsActor, SetDataBackendInfo, SetMetaBackendInfo},
Expand Down Expand Up @@ -103,78 +103,9 @@ impl Actor for BackendManagerActor {

ctx.wait(
async move {
match cfg_addr.send(GetConfig).await {
Err(e) => {
error!("Could not get config: {}", e);
// we won't manage the dbs
(HashMap::new(), HashMap::new())
}
Ok(config) => {
let managed_seq_dbs = stream::iter(config.backends())
.filter_map(|ci| async move {
let db = match SequentialZdb::new(ci.clone()).await {
Ok(db) => db,
Err(e) => {
warn!(
"Could not connect to backend {} in config file: {}",
ci, e
);
return Some((ci.clone(), (None, BackendState::new())));
}
};
let ns_info = match db.ns_info().await {
Ok(info) => info,
Err(e) => {
warn!("Failed to get ns info from backend {}: {}", ci, e);
return Some((ci.clone(), (Some(db), BackendState::new())));
}
};

let free_space = ns_info.free_space();
let state = if free_space <= FREESPACE_TRESHOLD {
BackendState::LowSpace(free_space)
} else {
BackendState::Healthy
};

Some((ci.clone(), (Some(db), state)))
})
.collect()
.await;
let managed_meta_dbs = match config.meta() {
Meta::Zdb(zdb_meta_cfg) => {
stream::iter(zdb_meta_cfg.backends())
.filter_map(|ci| async move {
let db = match UserKeyZdb::new(ci.clone()).await {
Ok(db) => db,
Err(e) => {
warn!("Failed to connect to metadata backend {} in config file: {}", ci ,e);
return Some((ci.clone(), (None, BackendState::new())));
}
};
let ns_info = match db.ns_info().await {
Ok(info) => info,
Err(e) => {
warn!("Failed to get ns info from metadata backend {}: {}", ci, e);
return Some((ci.clone(), (Some(db), BackendState::new())));
}
};
let free_space = ns_info.free_space();
let state = if free_space <= FREESPACE_TRESHOLD {
BackendState::LowSpace(free_space)
} else {
BackendState::Healthy
};

Some((ci.clone(), (Some(db), state)))
})
.collect()
.await
}
};
(managed_seq_dbs, managed_meta_dbs)
}
}
let (managed_seq_dbs, managed_meta_dbs) =
get_zdbs_from_config(cfg_addr.clone()).await;
(managed_seq_dbs, managed_meta_dbs)
}
.into_actor(self)
.map(|res, actor, _| {
Expand All @@ -189,6 +120,129 @@ impl Actor for BackendManagerActor {
);
}
}
impl Handler<ReloadConfig> for BackendManagerActor {
type Result = Result<(), ZstorError>;

fn handle(&mut self, _: ReloadConfig, ctx: &mut Self::Context) -> Self::Result {
let cfg_addr = self.config_addr.clone();
let fut = Box::pin(
async move {
let (managed_seq_dbs, managed_meta_dbs) =
get_zdbs_from_config(cfg_addr.clone()).await;
(managed_seq_dbs, managed_meta_dbs)
}
.into_actor(self)
.map(|(seq_dbs, meta_dbs), actor, _| {
// remove the data backends that are no longer managed from the metrics
for (ci, _) in actor.managed_seq_dbs.iter() {
if !seq_dbs.contains_key(ci) {
actor.metrics.do_send(SetDataBackendInfo {
ci: ci.clone(),
info: None,
});
}
}

// remove the meta backends that are no longer managed from the metrics
for (ci, _) in actor.managed_meta_dbs.iter() {
if !meta_dbs.contains_key(ci) {
actor.metrics.do_send(SetMetaBackendInfo {
ci: ci.clone(),
info: None,
});
}
}
actor.managed_seq_dbs = seq_dbs;
actor.managed_meta_dbs = meta_dbs;
}),
);
ctx.spawn(fut);
Ok(())
}
}

async fn get_zdbs_from_config(
cfg_addr: Addr<ConfigActor>,
) -> (
HashMap<ZdbConnectionInfo, (Option<SequentialZdb>, BackendState)>,
HashMap<ZdbConnectionInfo, (Option<UserKeyZdb>, BackendState)>,
) {
match cfg_addr.send(GetConfig).await {
Err(e) => {
error!("Could not get config: {}", e);
// we won't manage the dbs
(HashMap::new(), HashMap::new())
}
Ok(config) => {
let managed_seq_dbs = stream::iter(config.backends())
.filter_map(|ci| async move {
let db = match SequentialZdb::new(ci.clone()).await {
Ok(db) => db,
Err(e) => {
warn!("Could not connect to backend {} in config file: {}", ci, e);
return Some((ci.clone(), (None, BackendState::new())));
}
};
let ns_info = match db.ns_info().await {
Ok(info) => info,
Err(e) => {
warn!("Failed to get ns info from backend {}: {}", ci, e);
return Some((ci.clone(), (Some(db), BackendState::new())));
}
};

let free_space = ns_info.free_space();
let state = if free_space <= FREESPACE_TRESHOLD {
BackendState::LowSpace(free_space)
} else {
BackendState::Healthy
};
Some((ci.clone(), (Some(db), state)))
})
.collect()
.await;

let managed_meta_dbs = match config.meta() {
Meta::Zdb(zdb_meta_cfg) => {
stream::iter(zdb_meta_cfg.backends())
.filter_map(|ci| async move {
let db = match UserKeyZdb::new(ci.clone()).await {
Ok(db) => db,
Err(e) => {
warn!(
"Failed to connect to metadata backend {} in config file: {}",
ci, e
);
return Some((ci.clone(), (None, BackendState::new())));
}
};
let ns_info = match db.ns_info().await {
Ok(info) => info,
Err(e) => {
warn!(
"Failed to get ns info from metadata backend {}: {}",
ci, e
);
return Some((ci.clone(), (Some(db), BackendState::new())));
}
};
let free_space = ns_info.free_space();
let state = if free_space <= FREESPACE_TRESHOLD {
BackendState::LowSpace(free_space)
} else {
BackendState::Healthy
};

Some((ci.clone(), (Some(db), state)))
})
.collect()
.await
}
};
(managed_seq_dbs, managed_meta_dbs)
}
}
}

impl Handler<CheckBackends> for BackendManagerActor {
type Result = ResponseActFuture<Self, ()>;
Expand Down
35 changes: 21 additions & 14 deletions zstor/src/actors/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,25 @@ use prometheus::{
use std::mem;
use std::{collections::HashMap, fmt, string::FromUtf8Error};

const BACKEND_TYPE_DATA: &str = "data";
const BACKEND_TYPE_META: &str = "meta";
enum BackendType {
Data,
Meta,
}

impl BackendType {
fn as_str(&self) -> &str {
match self {
BackendType::Data => "data",
BackendType::Meta => "meta",
}
}
}

/// A metrics actor collecting metrics from the system.
pub struct MetricsActor {
data_zdbs: HashMap<ZdbConnectionInfo, NsInfo>,
meta_zdbs: HashMap<ZdbConnectionInfo, NsInfo>,
removed_zdbs: Vec<ZdbConnectionInfo>,
removed_zdbs: Vec<(ZdbConnectionInfo, BackendType)>,
successful_zstor_commands: HashMap<ZstorCommandId, usize>,
failed_zstor_commands: HashMap<ZstorCommandId, usize>,
zdbfs_stats: stats_t,
Expand Down Expand Up @@ -299,7 +310,7 @@ impl Handler<SetDataBackendInfo> for MetricsActor {
self.data_zdbs.insert(msg.ci, info);
} else {
self.data_zdbs.remove(&msg.ci);
self.removed_zdbs.push(msg.ci);
self.removed_zdbs.push((msg.ci, BackendType::Data));
}
}
}
Expand All @@ -312,7 +323,7 @@ impl Handler<SetMetaBackendInfo> for MetricsActor {
self.meta_zdbs.insert(msg.ci, info);
} else {
self.meta_zdbs.remove(&msg.ci);
self.removed_zdbs.push(msg.ci);
self.removed_zdbs.push((msg.ci, BackendType::Meta));
}
}
}
Expand Down Expand Up @@ -348,16 +359,12 @@ impl Handler<GetPrometheusMetrics> for MetricsActor {
// Take ownerhsip of the removed zdb list, and leave an empty (default) list in its
// place.
let removed_zdbs = mem::take(&mut self.removed_zdbs);
for ci in removed_zdbs {
for (ci, backend_type) in removed_zdbs {
let mut labels = HashMap::new();
labels.insert("namespace", ci.namespace().unwrap_or(""));
let address = ci.address().to_string();
labels.insert("address", &address);
if self.data_zdbs.contains_key(&ci) {
labels.insert("backend_type", BACKEND_TYPE_DATA);
} else {
labels.insert("backend_type", BACKEND_TYPE_META);
}
labels.insert("backend_type", backend_type.as_str());

if let Err(e) = self.prom_metrics.entries_gauges.remove(&labels) {
warn!("Failed to delete removed metric by label: {}", e)
Expand Down Expand Up @@ -404,18 +411,18 @@ impl Handler<GetPrometheusMetrics> for MetricsActor {
for (ci, (info, backend_type)) in self
.data_zdbs
.iter()
.map(|(ci, info)| (ci, (info, BACKEND_TYPE_DATA)))
.map(|(ci, info)| (ci, (info, BackendType::Data)))
.chain(
self.meta_zdbs
.iter()
.map(|(ci, info)| (ci, (info, BACKEND_TYPE_META))),
.map(|(ci, info)| (ci, (info, BackendType::Meta))),
)
{
let mut labels = HashMap::new();
labels.insert("namespace", ci.namespace().unwrap_or(""));
let address = ci.address().to_string();
labels.insert("address", &address);
labels.insert("backend_type", backend_type);
labels.insert("backend_type", backend_type.as_str());

let entries_gauge = self.prom_metrics.entries_gauges.get_metric_with(&labels)?;
let data_size_bytes_gauge = self
Expand Down
11 changes: 9 additions & 2 deletions zstor/src/actors/zstor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use std::{
};
use tokio::{fs, io, task::JoinHandle};

use super::config::ReloadConfig;
use super::{backends::BackendManagerActor, config::ReloadConfig};

#[derive(Serialize, Deserialize, Debug, Clone)]
/// All possible commands zstor operates on.
Expand Down Expand Up @@ -105,6 +105,7 @@ pub struct ZstorActor {
pipeline: Addr<PipelineActor>,
meta: Addr<MetaStoreActor>,
metrics: Addr<MetricsActor>,
backend: Addr<BackendManagerActor>,
}

impl ZstorActor {
Expand All @@ -114,12 +115,14 @@ impl ZstorActor {
pipeline: Addr<PipelineActor>,
meta: Addr<MetaStoreActor>,
metrics: Addr<MetricsActor>,
backend: Addr<BackendManagerActor>,
) -> ZstorActor {
Self {
cfg,
pipeline,
meta,
metrics,
backend,
}
}
}
Expand Down Expand Up @@ -395,7 +398,11 @@ impl Handler<ReloadConfig> for ZstorActor {

fn handle(&mut self, _: ReloadConfig, _: &mut Self::Context) -> Self::Result {
let cfg = self.cfg.clone();
Box::pin(async move { cfg.send(ReloadConfig).await? })
let backend = self.backend.clone();
Box::pin(async move {
let _ = cfg.send(ReloadConfig).await?;
backend.send(ReloadConfig).await?
})
}
}

Expand Down
16 changes: 11 additions & 5 deletions zstor/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,21 +92,27 @@ pub async fn setup_system(cfg_path: PathBuf, cfg: &Config) -> ZstorResult<Addr<Z
let cfg_addr = ConfigActor::new(cfg_path, cfg.clone()).start();
let pipeline_addr = SyncArbiter::start(1, || PipelineActor);

let explorer = NopExplorerActor::new().start().recipient();

let backends = BackendManagerActor::new(
cfg_addr.clone(),
explorer,
metrics_addr.clone(),
meta_addr.clone(),
)
.start();

let zstor = ZstorActor::new(
cfg_addr.clone(),
pipeline_addr,
meta_addr.clone(),
metrics_addr.clone(),
backends.clone(),
)
.start();

let _ = DirMonitorActor::new(cfg_addr.clone(), zstor.clone()).start();

let explorer = NopExplorerActor::new().start().recipient();

let backends =
BackendManagerActor::new(cfg_addr, explorer, metrics_addr.clone(), meta_addr.clone())
.start();
let _ = RepairActor::new(meta_addr, backends, zstor.clone()).start();

// Setup prometheus endpoint if needed
Expand Down
Loading