Skip to content

Commit

Permalink
optimize write_meta
Browse files Browse the repository at this point in the history
  • Loading branch information
iwanbk committed Feb 13, 2025
1 parent 598326e commit cf4f86d
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 31 deletions.
17 changes: 8 additions & 9 deletions src/cas/fjall_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -221,29 +221,28 @@ impl MetaStore for FjallStore {
block_hash: BlockID,
data_len: usize,
key_has_block: bool,
) -> Result<bool, MetaError> {
) -> Result<(bool, Block), MetaError> {
let blocks = self.block_partition.clone();
let paths = self.path_partition.clone();

let mut tx = self.keyspace.write_tx();
let should_write = match tx.get(&blocks, block_hash) {
let res = match tx.get(&blocks, block_hash) {
Ok(Some(block_data)) => {
// Block already exists

let mut block =
Block::try_from(&*block_data).expect("Only valid blocks are stored");

// if the key already has this block, the block doesn't got more references
// and we don't need to write it back.
if !key_has_block {
// bump refcount on the block
let mut block =
Block::try_from(&*block_data).expect("Only valid blocks are stored");

block.increment_refcount();
// write block back
// TODO: this could be done in an `update_and_fetch`
tx.insert(&blocks, block_hash, block.to_vec());
}

Ok(false)
Ok((false, block))
}
Ok(None) => {
let mut idx = 0;
Expand All @@ -267,12 +266,12 @@ impl MetaStore for FjallStore {
let block = Block::new(data_len, block_hash[..idx].to_vec());

tx.insert(&blocks, block_hash, block.to_vec());
Ok(true)
Ok((true, block))
}
Err(e) => Err(MetaError::OtherDBError(e.to_string())),
};
self.commit_persist(tx)?;
should_write
res
}
}

Expand Down
33 changes: 12 additions & 21 deletions src/cas/fs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,6 @@ impl CasFS {
Err(_) => None,
};
let old_obj_meta = Arc::new(old_obj_meta);
let block_map = Arc::new(self.meta_store.get_block_tree()?);

let (tx, rx) = unbounded();
let mut content_hash = Md5::new();
Expand All @@ -262,11 +261,11 @@ impl CasFS {
self.metrics.bytes_received(bytes.len());
}
})
.zip(stream::repeat((tx, block_map, old_obj_meta)))
.zip(stream::repeat((tx, old_obj_meta)))
.enumerate()
.for_each_concurrent(
5,
|(idx, (maybe_chunk, (mut tx, block_map, old_obj_meta)))| async move {
|(idx, (maybe_chunk, (mut tx, old_obj_meta)))| async move {
if let Err(e) = maybe_chunk {
if let Err(e) = tx
.send(Err(std::io::Error::new(e.kind(), e.to_string())))
Expand All @@ -290,41 +289,33 @@ impl CasFS {
false
};

let should_write = self
.meta_store
.write_block(block_hash, data_len, key_has_block);
let write_meta_result =
self.meta_store
.write_block(block_hash, data_len, key_has_block);

let mut pm = PendingMarker::new(self.metrics.clone());
match should_write {

let block = match write_meta_result {
Err(e) => {
if let Err(e) = tx.send(Err(e.into())).await {
error!("Could not send transaction error: {}", e);
}
return;
}
Ok(false) => {
Ok((false, _)) => {
pm.block_ignored();
if let Err(e) = tx.send(Ok((idx, block_hash))).await {
error!("Could not send block id: {}", e);
}
return;
}
Ok(true) => pm.block_pending(),
};

// write the actual block
// first load the block again from the DB
let block = match block_map.get_block(&block_hash) {
Ok(block) => block,
Err(e) => {
if let Err(e) = tx.send(Err(e.into())).await {
pm.block_write_error();
error!("Could not send db error: {}", e);
}
return;
Ok((true, block)) => {
pm.block_pending();
block
}
};

// write the actual block to disk
let block_path = block.disk_path(self.root.clone());
if let Err(e) = async_fs::create_dir_all(block_path.parent().unwrap()).await {
if let Err(e) = tx.send(Err(e)).await {
Expand Down
6 changes: 5 additions & 1 deletion src/cas/meta_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,10 @@ pub trait MetaStore: Send + Sync + Debug + 'static {
// data_len is the length of the block data.
// key_has_block is true if the coresponding key already has the block
//
// It returns a tuple of bool and Block:
// - bool is true if the block is new, hence need to be written to the disk
// - Block is the block object
//
// It should do at least the following:
// - Check if the hash is present in the block map
// - if exists and key_has_block is false: increment the refcount
Expand All @@ -81,7 +85,7 @@ pub trait MetaStore: Send + Sync + Debug + 'static {
block_hash: BlockID,
data_len: usize,
key_has_block: bool,
) -> Result<bool, MetaError>;
) -> Result<(bool, Block), MetaError>;
}

pub trait BaseMetaTree: Send + Sync {
Expand Down

0 comments on commit cf4f86d

Please sign in to comment.