Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(dedup rebuild): if possible, use same backend during rebuild #142

Merged
merged 2 commits into from
Dec 17, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
200 changes: 158 additions & 42 deletions zstor/src/actors/zstor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use crate::{
config::Config,
erasure::Shard,
meta::{Checksum, MetaData, ShardInfo},
zdb::{SequentialZdb, ZdbError, ZdbResult},
zdb::{Key, SequentialZdb, ZdbConnectionInfo, ZdbError, ZdbResult},
ZstorError, ZstorResult,
};
use actix::prelude::*;
Expand Down Expand Up @@ -324,6 +324,7 @@ impl Handler<Rebuild> for ZstorActor {
};

let input = load_data(&old_metadata).await?;
let existing_data = input.clone();
let (mut metadata, shards) = pipeline
.send(RebuildData {
input,
Expand All @@ -332,7 +333,29 @@ impl Handler<Rebuild> for ZstorActor {
})
.await??;

save_data(&mut cfg.deref().clone(), shards, &mut metadata).await?;
// build a list of the key and the backend used for the shards
let mut used_backends = Vec::new();
for (i, data) in existing_data.iter().enumerate() {
let key = old_metadata.shards()[i].key().to_vec();
if let Some(data) = data {
if data.as_slice() == shards[i].as_ref() {
used_backends.push((key, Some(old_metadata.shards()[i].zdb().clone())));
} else {
used_backends.push((key, None));
error!("Shard {} is DIFFERENT", i);
}
} else {
used_backends.push((key, None));
}
}

rebuild_data(
&mut cfg.deref().clone(),
shards,
&mut metadata,
used_backends,
)
.await?;

info!(
"Rebuild file from {} to {}",
Expand Down Expand Up @@ -471,63 +494,156 @@ async fn load_data(metadata: &MetaData) -> ZstorResult<Vec<Option<Vec<u8>>>> {
Ok(shards)
}

async fn save_data(
async fn check_backend_space(
backend: ZdbConnectionInfo,
shard_len: usize,
) -> ZdbResult<SequentialZdb> {
let db = SequentialZdb::new(backend.clone()).await?;
let ns_info = db.ns_info().await?;
match ns_info.free_space() {
insufficient if (insufficient as usize) < shard_len => Err(ZdbError::new_storage_size(
db.connection_info().clone(),
shard_len,
ns_info.free_space() as usize,
)),
_ => Ok(db),
}
}

async fn find_valid_backends(
cfg: &mut Config,
shard_len: usize,
needed_backends: usize,
skip_backends: Vec<(Vec<Key>, Option<ZdbConnectionInfo>)>,
) -> ZstorResult<Vec<SequentialZdb>> {
loop {
debug!("Finding backend config");
let backends = cfg.shard_stores()?;
let mut failed_shards = 0;
let mut valid_dbs = Vec::new();

let handles: Vec<_> = backends
.into_iter()
.filter(|backend| {
!skip_backends
.iter()
.any(|(_, b)| b.as_ref() == Some(backend))
})
.map(|backend| {
tokio::spawn(async move { check_backend_space(backend, shard_len).await })
})
.collect();

for result in join_all(handles).await {
match result? {
Ok(db) => valid_dbs.push(db),
Err(e) => {
debug!("Backend error: {}", e);
cfg.remove_shard(e.remote());
failed_shards += 1;
}
}
}

if valid_dbs.len() >= needed_backends && failed_shards == 0 {
return Ok(valid_dbs);
}

debug!("Backend config failed, retrying...");
}
}

async fn rebuild_data(
cfg: &mut Config,
shards: Vec<Shard>,
metadata: &mut MetaData,
// used_backends specifies which backends are already used
// which also means we don't need to check it again and the shard is not missing
used_backends: Vec<(Vec<Key>, Option<ZdbConnectionInfo>)>,
) -> ZstorResult<()> {
let shard_len = if shards.is_empty() {
0
} else {
shards[0].len()
};

let dbs = loop {
debug!("Finding backend config");
let backends = cfg.shard_stores()?;

let mut failed_shards: usize = 0;
let mut handles: Vec<JoinHandle<ZdbResult<_>>> = Vec::with_capacity(shards.len());

for backend in backends {
handles.push(tokio::spawn(async move {
let db = SequentialZdb::new(backend.clone()).await?;
// check space in backend
let ns_info = db.ns_info().await?;
match ns_info.free_space() {
insufficient if (insufficient as usize) < shard_len => {
Err(ZdbError::new_storage_size(
db.connection_info().clone(),
shard_len,
ns_info.free_space() as usize,
))
}
_ => Ok(db),
}
}));
let mut existing_backends_num = 0;
for (_, ci) in used_backends.iter() {
if ci.is_some() {
existing_backends_num += 1;
}
}

let mut dbs = Vec::new();
for db in join_all(handles).await {
match db? {
Err(zdbe) => {
debug!("could not connect to 0-db: {}", zdbe);
cfg.remove_shard(zdbe.remote());
failed_shards += 1;
}
Ok(db) => dbs.push(db), // no error so healthy db backend
let new_dbs = find_valid_backends(
cfg,
shard_len,
shards.len() - existing_backends_num,
used_backends.clone(),
)
.await?;

// create the key,connection_info, and db for the shard
// - if the backend is already used, we don't need to set the shard
// hence the None db
// - if the backend is not used, we need to set the shard
// hence the Some(db) which will be used the set the shard
let mut new_dbs = new_dbs.into_iter();
let mut key_dbs = Vec::new();
for (key, ci) in used_backends {
match ci {
Some(ci) => key_dbs.push((key, ci, None)),
None => {
// unwrap is safe here because we know we have enough backends from the find_valid_backends
let db = new_dbs.next().unwrap();
key_dbs.push((key, db.connection_info().clone(), Some(db)));
}
}
}

// if we find one we are good
if failed_shards == 0 {
debug!("found valid backend configuration");
break dbs;
}
let mut handles: Vec<JoinHandle<ZstorResult<_>>> = Vec::with_capacity(shards.len());
for ((existing_key, existing_ci, db), (shard_idx, shard)) in
key_dbs.into_iter().zip(shards.into_iter().enumerate())
{
handles.push(tokio::spawn(async move {
if let Some(db) = db {
let keys = db.set(&shard).await?;
Ok(ShardInfo::new(
shard_idx,
shard.checksum(),
keys,
db.connection_info().clone(),
))
} else {
// no need to db.set if it is an already used backend (shard is not missing)
Ok(ShardInfo::new(
shard_idx,
shard.checksum(),
existing_key.clone(),
existing_ci.clone(),
))
}
}));
}

for shard_info in try_join_all(handles).await? {
metadata.add_shard(shard_info?);
}

debug!("Backend config failed");
Ok(())
}

async fn save_data(
cfg: &mut Config,
shards: Vec<Shard>,
metadata: &mut MetaData,
) -> ZstorResult<()> {
let shard_len = if shards.is_empty() {
0
} else {
shards[0].len()
};

let dbs = find_valid_backends(cfg, shard_len, shards.len(), [].to_vec()).await?;

trace!("store shards in backends");

let mut handles: Vec<JoinHandle<ZstorResult<_>>> = Vec::with_capacity(shards.len());
Expand Down
Loading