Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add support for inlined data on metadata #66

Merged
merged 4 commits into from
Feb 20, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 18 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,26 @@ cd s3-cas
cargo build --release --features binary
```

## Inline metadata

Objects smaller than or equal to a configurable threshold can be stored directly in their metadata records,
improving performance for small objects.

Configure this feature using the command-line option:
```console
--inline-metadata-size <size> # omit to disable inlining
```

When the size is set:
- If the size of object data + metadata smaller than or equal to the threshold, the object data is stored in the metadata,
otherwise use the standard block storage
- Setting size to 0 or omitting the option disables inlining completely

Currently, objects uploaded using the multipart method will never be inlined
because they are assumed to be large objects.

## Known issues

- The metadata database (sled) has unbounded memory growth related to the objects stored. This means
the server will eventually consume all memory on the host and crash. To fix this the metadata database
should either be replaced with a new version of sled (still in development) or a different one entirely
- Only the basic API is implemented, and even then it is not entirely implemented (for instance copy
between servers is not implemented).
- The codebase is very much POC and not following a good abstraction structure
- Single key only, no support to add multiple keys with different permissions.
153 changes: 97 additions & 56 deletions src/cas/fs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,11 +83,12 @@ impl CasFS {
mut meta_path: PathBuf,
metrics: SharedMetrics,
storage_engine: StorageEngine,
inlined_metadata_size: Option<usize>,
) -> Self {
meta_path.push("db");
root.push("blocks");
let meta_store: Box<dyn MetaStore> = match storage_engine {
StorageEngine::Fjall => Box::new(FjallStore::new(meta_path)),
StorageEngine::Fjall => Box::new(FjallStore::new(meta_path, inlined_metadata_size)),
};

// Get the current amount of buckets
Expand All @@ -107,6 +108,10 @@ impl CasFS {
self.meta_store.get_path_tree()
}

pub fn max_inlined_data_length(&self) -> usize {
self.meta_store.max_inlined_data_length()
}

pub fn get_bucket(
&self,
bucket_name: &str,
Expand All @@ -130,10 +135,10 @@ impl CasFS {
bucket_name: &str,
key: &str,
size: u64,
e_tag: BlockID,
hash: BlockID,
object_data: ObjectData,
) -> Result<Object, MetaError> {
let obj_meta = Object::new(size, e_tag, object_data);
let obj_meta = Object::new(size, hash, object_data);
let bucket = self.meta_store.get_bucket_tree(bucket_name)?;
bucket.insert_meta(key, obj_meta.to_vec())?;
Ok(obj_meta)
Expand Down Expand Up @@ -267,25 +272,23 @@ impl CasFS {
}

// convenient function to store an object to disk and then store it's metada
pub async fn store_object_and_meta(
pub async fn store_single_object_and_meta(
&self,
bucket_name: &str,
key: &str,
data: ByteStream,
) -> io::Result<(Object, Vec<BlockID>, BlockID, u64)> {
) -> io::Result<Object> {
let (blocks, content_hash, size) = self.store_object(bucket_name, key, data).await?;
let obj = self
.create_object_meta(
bucket_name,
key,
size,
content_hash,
ObjectData::SinglePart {
blocks: blocks.clone(),
},
ObjectData::SinglePart { blocks },
)
.unwrap();
Ok((obj, blocks, content_hash, size))
Ok(obj)
}

/// Save the stream of bytes to disk.
Expand Down Expand Up @@ -416,6 +419,25 @@ impl CasFS {
size,
))
}

// Store an object inlined in the metadata.
pub fn store_inlined_object(
&self,
bucket_name: &str,
key: &str,
data: Vec<u8>,
) -> Result<Object, MetaError> {
let content_hash = Md5::digest(&data).into();
let size = data.len() as u64;
let obj = self.create_object_meta(
bucket_name,
key,
size,
content_hash,
ObjectData::Inline { data },
)?;
Ok(obj)
}
}

#[cfg(test)]
Expand All @@ -439,6 +461,7 @@ mod tests {
meta_path,
metrics,
StorageEngine::Fjall,
Some(1),
);
(fs, dir)
}
Expand All @@ -461,18 +484,18 @@ mod tests {
));

// Store object
let (_, block_ids, _, size) = fs
.store_object_and_meta(bucket_name, key1, stream)
let obj = fs
.store_single_object_and_meta(bucket_name, key1, stream)
.await
.unwrap();

// Verify results
assert_eq!(size, test_data_len as u64);
assert_eq!(block_ids.len(), 1);
assert_eq!(obj.size(), test_data_len as u64);
assert_eq!(obj.blocks().len(), 1);

// Verify block & path was stored
let block_tree = fs.meta_store.get_block_tree().unwrap();
let stored_block = block_tree.get_block(&block_ids[0]).unwrap().unwrap();
let stored_block = block_tree.get_block(&obj.blocks()[0]).unwrap().unwrap();
assert_eq!(stored_block.size(), test_data_len);
assert_eq!(stored_block.rc(), 1);
assert_eq!(
Expand All @@ -491,17 +514,35 @@ mod tests {
async move { Ok(Bytes::from(test_data_2.clone())) },
));

let (_, new_blocks, _, _) = fs
.store_object_and_meta(bucket_name, key2, stream)
let new_obj = fs
.store_single_object_and_meta(bucket_name, key2, stream)
.await
.unwrap();

assert_eq!(new_blocks, block_ids);
assert_eq!(new_obj.blocks(), obj.blocks());

let stored_block = block_tree.get_block(&new_blocks[0]).unwrap().unwrap();
let stored_block = block_tree.get_block(&new_obj.blocks()[0]).unwrap().unwrap();
assert_eq!(stored_block.rc(), 2);
}

#[tokio::test]
async fn test_store_inlined_object() {
// Setup
let (fs, _dir) = setup_test_fs();
let bucket_name = "test_bucket";
let key = "test_key1";
fs.create_bucket(bucket_name).unwrap();

let small_data = b"small test data".to_vec();
let obj_meta = fs
.store_inlined_object(bucket_name, key, small_data.clone())
.unwrap();

// Verify inlined data
assert_eq!(obj_meta.size(), small_data.len() as u64);
assert_eq!(obj_meta.inlined().unwrap(), &small_data);
}

#[tokio::test]
async fn test_store_object_refcount() {
// Setup
Expand All @@ -521,14 +562,14 @@ mod tests {
));

// Store object
let (_, block_ids, _, _) = fs
.store_object_and_meta(bucket_name, key1, stream)
let obj = fs
.store_single_object_and_meta(bucket_name, key1, stream)
.await
.unwrap();

// Initial refcount must be 1
let block_tree = fs.meta_store.get_block_tree().unwrap();
let stored_block = block_tree.get_block(&block_ids[0]).unwrap().unwrap();
let stored_block = block_tree.get_block(&obj.blocks()[0]).unwrap().unwrap();
assert_eq!(stored_block.rc(), 1);

{
Expand All @@ -540,14 +581,14 @@ mod tests {
async move { Ok(Bytes::from(test_data_2.clone())) },
));

let (_, new_blocks, _, _) = fs
.store_object_and_meta(bucket_name, key1, stream)
let new_obj = fs
.store_single_object_and_meta(bucket_name, key1, stream)
.await
.unwrap();

assert_eq!(new_blocks, block_ids);
assert_eq!(new_obj.blocks(), obj.blocks());

let stored_block = block_tree.get_block(&new_blocks[0]).unwrap().unwrap();
let stored_block = block_tree.get_block(&new_obj.blocks()[0]).unwrap().unwrap();
assert_eq!(stored_block.rc(), 1);
}
{
Expand All @@ -558,14 +599,14 @@ mod tests {
async move { Ok(Bytes::from(test_data_3.clone())) },
));

let (_, new_blocks, _, _) = fs
.store_object_and_meta(bucket_name, key2, stream)
let new_obj = fs
.store_single_object_and_meta(bucket_name, key2, stream)
.await
.unwrap();

assert_eq!(new_blocks, block_ids);
assert_eq!(new_obj.blocks(), obj.blocks());

let stored_block = block_tree.get_block(&new_blocks[0]).unwrap().unwrap();
let stored_block = block_tree.get_block(&new_obj.blocks()[0]).unwrap().unwrap();
assert_eq!(stored_block.rc(), 2);
}
}
Expand All @@ -589,8 +630,8 @@ mod tests {
));

// Store object
let (_, block_ids, _, _) = fs
.store_object_and_meta(bucket_name, key, stream)
let obj = fs
.store_single_object_and_meta(bucket_name, key, stream)
.await
.unwrap();

Expand All @@ -601,8 +642,8 @@ mod tests {
// verify blocks and path exist
let block_tree = fs.meta_store.get_block_tree().unwrap();
let mut stored_paths = Vec::new();
for id in block_ids.clone() {
let block = block_tree.get_block(&id).unwrap().unwrap();
for id in obj.blocks() {
let block = block_tree.get_block(id).unwrap().unwrap();
assert_eq!(
fs.path_tree().unwrap().contains_key(block.path()).unwrap(),
true
Expand All @@ -619,8 +660,8 @@ mod tests {

// Verify blocks were cleaned up
let block_tree = fs.meta_store.get_block_tree().unwrap();
for id in block_ids {
assert!(block_tree.get_block(&id).unwrap().is_none());
for id in obj.blocks() {
assert!(block_tree.get_block(id).unwrap().is_none());
}
// Verify paths were cleaned up
for path in stored_paths {
Expand Down Expand Up @@ -655,13 +696,13 @@ mod tests {
));

// Store first object
let (_, block_ids1, content_hash1, _) = fs
.store_object_and_meta(bucket, key1, stream1)
let obj1 = fs
.store_single_object_and_meta(bucket, key1, stream1)
.await
.unwrap();
// Verify blocks exist with rc=1
let block_tree = fs.meta_store.get_block_tree().unwrap();
for id in &block_ids1 {
for id in obj1.blocks() {
let block = block_tree.get_block(id).unwrap().unwrap();
assert_eq!(block.rc(), 1);
}
Expand All @@ -672,17 +713,17 @@ mod tests {
async move { Ok(Bytes::from(test_data2.clone())) },
));

let (_, block_ids2, content_hash2, _) = fs
.store_object_and_meta(bucket, key2, stream2)
let obj2 = fs
.store_single_object_and_meta(bucket, key2, stream2)
.await
.unwrap();

// Verify both objects share same blocks
assert_eq!(block_ids1, block_ids2);
assert_eq!(content_hash1, content_hash2);
assert_eq!(obj1.blocks(), obj2.blocks());
assert_eq!(obj1.hash(), obj2.hash());
// Verify blocks exist with rc=2
let block_tree = fs.meta_store.get_block_tree().unwrap();
for id in &block_ids2 {
for id in obj2.blocks() {
let block = block_tree.get_block(id).unwrap().unwrap();
assert_eq!(block.rc(), 2);
}
Expand All @@ -692,7 +733,7 @@ mod tests {

// Verify blocks still exist
let block_tree = fs.meta_store.get_block_tree().unwrap();
for id in &block_ids1 {
for id in obj1.blocks() {
let block = block_tree.get_block(id).unwrap().unwrap();
assert_eq!(block.rc(), 1);
}
Expand All @@ -701,8 +742,8 @@ mod tests {
fs.delete_object(bucket, key2).await.unwrap();

// Verify blocks are gone
for id in block_ids1 {
assert!(block_tree.get_block(&id).unwrap().is_none());
for id in obj1.blocks() {
assert!(block_tree.get_block(id).unwrap().is_none());
}
}

Expand Down Expand Up @@ -730,13 +771,13 @@ mod tests {
));

// Store first object
let (_, block_ids1, content_hash1, _) = fs
.store_object_and_meta(bucket, key1, stream1)
let obj1 = fs
.store_single_object_and_meta(bucket, key1, stream1)
.await
.unwrap();
// Verify blocks exist with rc=1
let block_tree = fs.meta_store.get_block_tree().unwrap();
for id in &block_ids1 {
for id in obj1.blocks() {
let block = block_tree.get_block(id).unwrap().unwrap();
assert_eq!(block.rc(), 1);
}
Expand All @@ -747,17 +788,17 @@ mod tests {
async move { Ok(Bytes::from(test_data2.clone())) },
));

let (_, block_ids2, content_hash2, _) = fs
.store_object_and_meta(bucket, key1, stream2)
let obj2 = fs
.store_single_object_and_meta(bucket, key1, stream2)
.await
.unwrap();

// Verify both objects share same blocks
assert_eq!(block_ids1, block_ids2);
assert_eq!(content_hash1, content_hash2);
assert_eq!(obj1.blocks(), obj2.blocks());
assert_eq!(obj1.hash(), obj2.hash());
// Verify blocks exist with rc=2
let block_tree = fs.meta_store.get_block_tree().unwrap();
for id in &block_ids2 {
for id in obj2.blocks() {
let block = block_tree.get_block(id).unwrap().unwrap();
assert_eq!(block.rc(), 1);
}
Expand All @@ -766,8 +807,8 @@ mod tests {
fs.delete_object(bucket, key1).await.unwrap();

// Verify blocks are gone
for id in block_ids1 {
assert!(block_tree.get_block(&id).unwrap().is_none());
for id in obj1.blocks() {
assert!(block_tree.get_block(id).unwrap().is_none());
}
}
}
Loading
Loading