From f4563581d337466dae75d8204728f6d40598c55e Mon Sep 17 00:00:00 2001 From: Iwan BK Date: Wed, 19 Feb 2025 17:19:58 +0700 Subject: [PATCH 1/4] add support for inlined data on metadata --- src/cas/fs.rs | 26 ++++++++++++++++- src/main.rs | 1 + src/metastore/object.rs | 12 ++++++++ src/metastore/stores/fjall.rs | 16 +++++++++-- src/metastore/traits.rs | 8 +++++- src/s3fs.rs | 54 +++++++++++++++++++++++++++++++---- tests/it_s3.rs | 3 +- 7 files changed, 109 insertions(+), 11 deletions(-) diff --git a/src/cas/fs.rs b/src/cas/fs.rs index 1808a6d..1de5e18 100644 --- a/src/cas/fs.rs +++ b/src/cas/fs.rs @@ -83,11 +83,12 @@ impl CasFS { mut meta_path: PathBuf, metrics: SharedMetrics, storage_engine: StorageEngine, + inlined_metadata_size: Option, ) -> Self { meta_path.push("db"); root.push("blocks"); let meta_store: Box = match storage_engine { - StorageEngine::Fjall => Box::new(FjallStore::new(meta_path)), + StorageEngine::Fjall => Box::new(FjallStore::new(meta_path, inlined_metadata_size)), }; // Get the current amount of buckets @@ -107,6 +108,10 @@ impl CasFS { self.meta_store.get_path_tree() } + pub fn max_inlined_data_length(&self) -> usize { + self.meta_store.max_inlined_data_length() + } + pub fn get_bucket( &self, bucket_name: &str, @@ -237,6 +242,24 @@ impl CasFS { self.meta_store.list_buckets() } + pub fn store_inlined_object( + &self, + bucket_name: &str, + key: &str, + data: Vec, + ) -> Result { + let content_hash = Md5::digest(&data).into(); + let size = data.len() as u64; + let obj = self.create_object_meta( + bucket_name, + key, + size, + content_hash, + ObjectData::Inline { data }, + )?; + Ok(obj) + } + /// Delete an object from a bucket. pub async fn delete_object(&self, bucket: &str, key: &str) -> Result<(), MetaError> { let path_map = self.path_tree()?; @@ -439,6 +462,7 @@ mod tests { meta_path, metrics, StorageEngine::Fjall, + Some(1), ); (fs, dir) } diff --git a/src/main.rs b/src/main.rs index 77ee08e..f9f096d 100644 --- a/src/main.rs +++ b/src/main.rs @@ -71,6 +71,7 @@ async fn run(args: Args) -> anyhow::Result<()> { args.meta_root.clone(), metrics.clone(), storage_engine, + None, ); let s3fs = s3_cas::s3fs::S3FS::new(args.fs_root, args.meta_root, casfs, metrics.clone()); let s3fs = s3_cas::metrics::MetricFs::new(s3fs, metrics.clone()); diff --git a/src/metastore/object.rs b/src/metastore/object.rs index dbabd8f..b7d72ea 100644 --- a/src/metastore/object.rs +++ b/src/metastore/object.rs @@ -74,6 +74,10 @@ impl Object { } } + pub fn minimum_inline_metadata_size() -> usize { + minimum_raw_object_size() + PTR_SIZE // size of common fields + size of data_len field + } + pub fn to_vec(&self) -> Vec { self.into() } @@ -121,6 +125,7 @@ impl Object { .to_rfc3339_opts(SecondsFormat::Secs, true) } + // Returns the number of bytes this object would take up in serialized form. fn num_bytes(&self) -> usize { let mandatory_fields_size = 17 + BLOCKID_SIZE; match &self.data { @@ -133,6 +138,13 @@ impl Object { ObjectData::Inline { data } => mandatory_fields_size + PTR_SIZE + data.len(), } } + + pub fn inlined(&self) -> Option<&Vec> { + match &self.data { + ObjectData::Inline { data } => Some(data), + _ => None, + } + } } impl From<&Object> for Vec { diff --git a/src/metastore/stores/fjall.rs b/src/metastore/stores/fjall.rs index 1c6bd35..f959279 100644 --- a/src/metastore/stores/fjall.rs +++ b/src/metastore/stores/fjall.rs @@ -15,6 +15,7 @@ pub struct FjallStore { bucket_partition: Arc, block_partition: Arc, path_partition: Arc, + inlined_metadata_size: usize, } impl std::fmt::Debug for FjallStore { @@ -25,8 +26,10 @@ impl std::fmt::Debug for FjallStore { } } +const DEFAULT_INLINED_METADATA_SIZE: usize = 1024; + impl FjallStore { - pub fn new(path: PathBuf) -> Self { + pub fn new(path: PathBuf, inlined_metadata_size: Option) -> Self { eprintln!("Opening fjall store at {:?}", path); const BUCKET_META_PARTITION: &str = "_BUCKETS"; const BLOCK_PARTITION: &str = "_BLOCKS"; @@ -42,11 +45,13 @@ impl FjallStore { let path_partition = tx_keyspace .open_partition(PATH_PARTITION, Default::default()) .unwrap(); + let inlined_metadata_size = inlined_metadata_size.unwrap_or(DEFAULT_INLINED_METADATA_SIZE); Self { keyspace: Arc::new(tx_keyspace), bucket_partition: Arc::new(bucket_partition), block_partition: Arc::new(block_partition), path_partition: Arc::new(path_partition), + inlined_metadata_size, } } @@ -69,6 +74,13 @@ impl FjallStore { } impl MetaStore for FjallStore { + fn max_inlined_data_length(&self) -> usize { + if self.inlined_metadata_size < Object::minimum_inline_metadata_size() { + return 0; + } + self.inlined_metadata_size - Object::minimum_inline_metadata_size() + } + fn get_bucket_ext( &self, name: &str, @@ -462,7 +474,7 @@ mod tests { fn setup_store() -> (FjallStore, tempfile::TempDir) { let dir = tempdir().unwrap(); - let store = FjallStore::new(dir.path().to_path_buf()); + let store = FjallStore::new(dir.path().to_path_buf(), Some(1)); (store, dir) } diff --git a/src/metastore/traits.rs b/src/metastore/traits.rs index c75e747..c83f05a 100644 --- a/src/metastore/traits.rs +++ b/src/metastore/traits.rs @@ -14,6 +14,9 @@ use std::fmt::Debug; /// But we separate the API to make it easier to extend in the future, /// and give flexibility to the implementer to have different implementations for each tree. pub trait MetaStore: Send + Sync + Debug + 'static { + // returns the maximum length of the data that can be inlined in the metadata object + fn max_inlined_data_length(&self) -> usize; + /// returns tree which contains all the buckets. /// This tree is used to store the bucket lists and provide /// the CRUD for the bucket list. @@ -21,7 +24,8 @@ pub trait MetaStore: Send + Sync + Debug + 'static { fn get_bucket_tree(&self, bucket_name: &str) -> Result, MetaError>; - /// get_bucket_ext returns the tree for specific bucket with the extended methods. + /// get_bucket_ext returns the tree for specific bucket with the extended methods + /// we use this tree to provide additional methods for the bucket like the range and list methods. fn get_bucket_ext(&self, name: &str) -> Result, MetaError>; @@ -29,6 +33,8 @@ pub trait MetaStore: Send + Sync + Debug + 'static { /// This tree is used to store the data block metadata. fn get_block_tree(&self) -> Result, MetaError>; + /// get_tree returns the tree with the given name. + /// It is usually used if the app need to store some metadata for a specific purpose. fn get_tree(&self, name: &str) -> Result, MetaError>; /// get_path_tree returns the path meta tree diff --git a/src/s3fs.rs b/src/s3fs.rs index 20120ac..50a8433 100644 --- a/src/s3fs.rs +++ b/src/s3fs.rs @@ -358,7 +358,25 @@ impl S3 for S3FS { } }; - let e_tag = obj_meta.format_e_tag(); + // if the object is inlined, we return it directly + if let Some(data) = obj_meta.inlined() { + let bytes = bytes::Bytes::from(data.clone()); + + let body = s3s::Body::from(bytes); + let stream = StreamingBlob::from(body); + + let stream_size = data.len() as u64; + let output = GetObjectOutput { + body: Some(stream), + content_length: Some(stream_size as i64), + content_range: Some(fmt_content_range(0, stream_size - 1, stream_size)), + last_modified: Some(Timestamp::from(obj_meta.last_modified())), + e_tag: Some(obj_meta.format_e_tag()), + ..Default::default() + }; + return Ok(S3Response::new(output)); + } + let stream_size = obj_meta.size(); let range = match range { Some(range) => { @@ -404,7 +422,7 @@ impl S3 for S3FS { content_range: Some(fmt_content_range(0, stream_size - 1, stream_size)), last_modified: Some(Timestamp::from(obj_meta.last_modified())), //metadata: object_metadata, - e_tag: Some(e_tag), + e_tag: Some(obj_meta.format_e_tag()), ..Default::default() }; Ok(S3Response::new(output)) @@ -640,7 +658,30 @@ impl S3 for S3FS { return Err(s3_error!(NoSuchBucket, "Bucket does not exist")); } - // save the data + // if the content length is less than the max inlined data length, we store the object in the + // metadata store, otherwise we store it in the cas layer. + if let Some(content_length) = content_length { + use futures::TryStreamExt; + if content_length <= self.casfs.max_inlined_data_length() as i64 { + // Collect stream into Vec + let data: Vec = body + .try_collect::>() + .await + .map_err(|e| s3_error!(InternalError, "Failed to read body: {}", e))? + .into_iter() + .flatten() + .collect(); + let obj_meta = try_!(self.casfs.store_inlined_object(&bucket, &key, data)); + + let output = PutObjectOutput { + e_tag: Some(obj_meta.format_e_tag()), + ..Default::default() + }; + return Ok(S3Response::new(output)); + } + } + + // save the datadata let converted_stream = convert_stream_error(body); let byte_stream = ByteStream::new_with_size(converted_stream, content_length.unwrap() as usize); @@ -686,9 +727,10 @@ impl S3 for S3FS { let converted_stream = convert_stream_error(body); let byte_stream = ByteStream::new_with_size(converted_stream, content_length as usize); - // we only store the object here, no metadata - // the metadata will be stored when the multipart upload is completed - // the parts will be addressed by the bucket,key,upload_id, and part_number + // we only store the object here, metadata is not stored in the meta store. + // it is stored in the multipart metadata, in the `cas` layer. + // the multipart metadata will be deleted when the multipart upload is completed + // and replaced with the object metadata in metastore in the `complete_multipart_upload` function. let (blocks, hash, size) = try_!(self.casfs.store_object(&bucket, &key, byte_stream).await); if size != content_length as u64 { diff --git a/tests/it_s3.rs b/tests/it_s3.rs index 4cfcf30..688a535 100644 --- a/tests/it_s3.rs +++ b/tests/it_s3.rs @@ -73,6 +73,7 @@ fn config() -> &'static SdkConfig { FS_ROOT.into(), metrics.clone(), storage_engine, + Some(1), ); let s3fs = s3_cas::s3fs::S3FS::new(FS_ROOT.into(), FS_ROOT.into(), casfs, metrics.clone()); @@ -175,7 +176,7 @@ async fn test_put_delete_object() -> Result<()> { let body = ByteStream::from_static(content.as_bytes()); let result = c .put_object() - .bucket("non-existent-bucket") + .bucket("non-existent-buckett") .key(key) .body(body) //.checksum_crc32_c(crc32c.as_str()) From 7a6341e6f43d26416c55e5b40f17a31cee08d14a Mon Sep 17 00:00:00 2001 From: Iwan BK Date: Wed, 19 Feb 2025 19:34:29 +0700 Subject: [PATCH 2/4] test and refactor. - end to end test for inlined object - simplify some fs function --- src/cas/fs.rs | 145 ++++++++++++++++++++-------------------- src/metastore/object.rs | 20 +++--- src/s3fs.rs | 12 ++-- tests/it_s3.rs | 114 ++++++++++++++++++------------- 4 files changed, 160 insertions(+), 131 deletions(-) diff --git a/src/cas/fs.rs b/src/cas/fs.rs index 1de5e18..ca66588 100644 --- a/src/cas/fs.rs +++ b/src/cas/fs.rs @@ -135,10 +135,10 @@ impl CasFS { bucket_name: &str, key: &str, size: u64, - e_tag: BlockID, + hash: BlockID, object_data: ObjectData, ) -> Result { - let obj_meta = Object::new(size, e_tag, object_data); + let obj_meta = Object::new(size, hash, object_data); let bucket = self.meta_store.get_bucket_tree(bucket_name)?; bucket.insert_meta(key, obj_meta.to_vec())?; Ok(obj_meta) @@ -242,24 +242,6 @@ impl CasFS { self.meta_store.list_buckets() } - pub fn store_inlined_object( - &self, - bucket_name: &str, - key: &str, - data: Vec, - ) -> Result { - let content_hash = Md5::digest(&data).into(); - let size = data.len() as u64; - let obj = self.create_object_meta( - bucket_name, - key, - size, - content_hash, - ObjectData::Inline { data }, - )?; - Ok(obj) - } - /// Delete an object from a bucket. pub async fn delete_object(&self, bucket: &str, key: &str) -> Result<(), MetaError> { let path_map = self.path_tree()?; @@ -290,12 +272,12 @@ impl CasFS { } // convenient function to store an object to disk and then store it's metada - pub async fn store_object_and_meta( + pub async fn store_single_object_and_meta( &self, bucket_name: &str, key: &str, data: ByteStream, - ) -> io::Result<(Object, Vec, BlockID, u64)> { + ) -> io::Result { let (blocks, content_hash, size) = self.store_object(bucket_name, key, data).await?; let obj = self .create_object_meta( @@ -303,12 +285,10 @@ impl CasFS { key, size, content_hash, - ObjectData::SinglePart { - blocks: blocks.clone(), - }, + ObjectData::SinglePart { blocks }, ) .unwrap(); - Ok((obj, blocks, content_hash, size)) + Ok(obj) } /// Save the stream of bytes to disk. @@ -439,6 +419,25 @@ impl CasFS { size, )) } + + // Store an object inlined in the metadata. + pub fn store_inlined_object( + &self, + bucket_name: &str, + key: &str, + data: Vec, + ) -> Result { + let content_hash = Md5::digest(&data).into(); + let size = data.len() as u64; + let obj = self.create_object_meta( + bucket_name, + key, + size, + content_hash, + ObjectData::Inline { data }, + )?; + Ok(obj) + } } #[cfg(test)] @@ -485,18 +484,18 @@ mod tests { )); // Store object - let (_, block_ids, _, size) = fs - .store_object_and_meta(bucket_name, key1, stream) + let obj = fs + .store_single_object_and_meta(bucket_name, key1, stream) .await .unwrap(); // Verify results - assert_eq!(size, test_data_len as u64); - assert_eq!(block_ids.len(), 1); + assert_eq!(obj.size(), test_data_len as u64); + assert_eq!(obj.blocks().len(), 1); // Verify block & path was stored let block_tree = fs.meta_store.get_block_tree().unwrap(); - let stored_block = block_tree.get_block(&block_ids[0]).unwrap().unwrap(); + let stored_block = block_tree.get_block(&obj.blocks()[0]).unwrap().unwrap(); assert_eq!(stored_block.size(), test_data_len); assert_eq!(stored_block.rc(), 1); assert_eq!( @@ -515,14 +514,14 @@ mod tests { async move { Ok(Bytes::from(test_data_2.clone())) }, )); - let (_, new_blocks, _, _) = fs - .store_object_and_meta(bucket_name, key2, stream) + let new_obj = fs + .store_single_object_and_meta(bucket_name, key2, stream) .await .unwrap(); - assert_eq!(new_blocks, block_ids); + assert_eq!(new_obj.blocks(), obj.blocks()); - let stored_block = block_tree.get_block(&new_blocks[0]).unwrap().unwrap(); + let stored_block = block_tree.get_block(&new_obj.blocks()[0]).unwrap().unwrap(); assert_eq!(stored_block.rc(), 2); } @@ -545,14 +544,14 @@ mod tests { )); // Store object - let (_, block_ids, _, _) = fs - .store_object_and_meta(bucket_name, key1, stream) + let obj = fs + .store_single_object_and_meta(bucket_name, key1, stream) .await .unwrap(); // Initial refcount must be 1 let block_tree = fs.meta_store.get_block_tree().unwrap(); - let stored_block = block_tree.get_block(&block_ids[0]).unwrap().unwrap(); + let stored_block = block_tree.get_block(&obj.blocks()[0]).unwrap().unwrap(); assert_eq!(stored_block.rc(), 1); { @@ -564,14 +563,14 @@ mod tests { async move { Ok(Bytes::from(test_data_2.clone())) }, )); - let (_, new_blocks, _, _) = fs - .store_object_and_meta(bucket_name, key1, stream) + let new_obj = fs + .store_single_object_and_meta(bucket_name, key1, stream) .await .unwrap(); - assert_eq!(new_blocks, block_ids); + assert_eq!(new_obj.blocks(), obj.blocks()); - let stored_block = block_tree.get_block(&new_blocks[0]).unwrap().unwrap(); + let stored_block = block_tree.get_block(&new_obj.blocks()[0]).unwrap().unwrap(); assert_eq!(stored_block.rc(), 1); } { @@ -582,14 +581,14 @@ mod tests { async move { Ok(Bytes::from(test_data_3.clone())) }, )); - let (_, new_blocks, _, _) = fs - .store_object_and_meta(bucket_name, key2, stream) + let new_obj = fs + .store_single_object_and_meta(bucket_name, key2, stream) .await .unwrap(); - assert_eq!(new_blocks, block_ids); + assert_eq!(new_obj.blocks(), obj.blocks()); - let stored_block = block_tree.get_block(&new_blocks[0]).unwrap().unwrap(); + let stored_block = block_tree.get_block(&new_obj.blocks()[0]).unwrap().unwrap(); assert_eq!(stored_block.rc(), 2); } } @@ -613,8 +612,8 @@ mod tests { )); // Store object - let (_, block_ids, _, _) = fs - .store_object_and_meta(bucket_name, key, stream) + let obj = fs + .store_single_object_and_meta(bucket_name, key, stream) .await .unwrap(); @@ -625,8 +624,8 @@ mod tests { // verify blocks and path exist let block_tree = fs.meta_store.get_block_tree().unwrap(); let mut stored_paths = Vec::new(); - for id in block_ids.clone() { - let block = block_tree.get_block(&id).unwrap().unwrap(); + for id in obj.blocks() { + let block = block_tree.get_block(id).unwrap().unwrap(); assert_eq!( fs.path_tree().unwrap().contains_key(block.path()).unwrap(), true @@ -643,8 +642,8 @@ mod tests { // Verify blocks were cleaned up let block_tree = fs.meta_store.get_block_tree().unwrap(); - for id in block_ids { - assert!(block_tree.get_block(&id).unwrap().is_none()); + for id in obj.blocks() { + assert!(block_tree.get_block(id).unwrap().is_none()); } // Verify paths were cleaned up for path in stored_paths { @@ -679,13 +678,13 @@ mod tests { )); // Store first object - let (_, block_ids1, content_hash1, _) = fs - .store_object_and_meta(bucket, key1, stream1) + let obj1 = fs + .store_single_object_and_meta(bucket, key1, stream1) .await .unwrap(); // Verify blocks exist with rc=1 let block_tree = fs.meta_store.get_block_tree().unwrap(); - for id in &block_ids1 { + for id in obj1.blocks() { let block = block_tree.get_block(id).unwrap().unwrap(); assert_eq!(block.rc(), 1); } @@ -696,17 +695,17 @@ mod tests { async move { Ok(Bytes::from(test_data2.clone())) }, )); - let (_, block_ids2, content_hash2, _) = fs - .store_object_and_meta(bucket, key2, stream2) + let obj2 = fs + .store_single_object_and_meta(bucket, key2, stream2) .await .unwrap(); // Verify both objects share same blocks - assert_eq!(block_ids1, block_ids2); - assert_eq!(content_hash1, content_hash2); + assert_eq!(obj1.blocks(), obj2.blocks()); + assert_eq!(obj1.hash(), obj2.hash()); // Verify blocks exist with rc=2 let block_tree = fs.meta_store.get_block_tree().unwrap(); - for id in &block_ids2 { + for id in obj2.blocks() { let block = block_tree.get_block(id).unwrap().unwrap(); assert_eq!(block.rc(), 2); } @@ -716,7 +715,7 @@ mod tests { // Verify blocks still exist let block_tree = fs.meta_store.get_block_tree().unwrap(); - for id in &block_ids1 { + for id in obj1.blocks() { let block = block_tree.get_block(id).unwrap().unwrap(); assert_eq!(block.rc(), 1); } @@ -725,8 +724,8 @@ mod tests { fs.delete_object(bucket, key2).await.unwrap(); // Verify blocks are gone - for id in block_ids1 { - assert!(block_tree.get_block(&id).unwrap().is_none()); + for id in obj1.blocks() { + assert!(block_tree.get_block(id).unwrap().is_none()); } } @@ -754,13 +753,13 @@ mod tests { )); // Store first object - let (_, block_ids1, content_hash1, _) = fs - .store_object_and_meta(bucket, key1, stream1) + let obj1 = fs + .store_single_object_and_meta(bucket, key1, stream1) .await .unwrap(); // Verify blocks exist with rc=1 let block_tree = fs.meta_store.get_block_tree().unwrap(); - for id in &block_ids1 { + for id in obj1.blocks() { let block = block_tree.get_block(id).unwrap().unwrap(); assert_eq!(block.rc(), 1); } @@ -771,17 +770,17 @@ mod tests { async move { Ok(Bytes::from(test_data2.clone())) }, )); - let (_, block_ids2, content_hash2, _) = fs - .store_object_and_meta(bucket, key1, stream2) + let obj2 = fs + .store_single_object_and_meta(bucket, key1, stream2) .await .unwrap(); // Verify both objects share same blocks - assert_eq!(block_ids1, block_ids2); - assert_eq!(content_hash1, content_hash2); + assert_eq!(obj1.blocks(), obj2.blocks()); + assert_eq!(obj1.hash(), obj2.hash()); // Verify blocks exist with rc=2 let block_tree = fs.meta_store.get_block_tree().unwrap(); - for id in &block_ids2 { + for id in obj2.blocks() { let block = block_tree.get_block(id).unwrap().unwrap(); assert_eq!(block.rc(), 1); } @@ -790,8 +789,8 @@ mod tests { fs.delete_object(bucket, key1).await.unwrap(); // Verify blocks are gone - for id in block_ids1 { - assert!(block_tree.get_block(&id).unwrap().is_none()); + for id in obj1.blocks() { + assert!(block_tree.get_block(id).unwrap().is_none()); } } } diff --git a/src/metastore/object.rs b/src/metastore/object.rs index b7d72ea..6618b8e 100644 --- a/src/metastore/object.rs +++ b/src/metastore/object.rs @@ -15,7 +15,7 @@ pub struct Object { object_type: ObjectType, size: u64, ctime: i64, - e_tag: BlockID, + hash: BlockID, data: ObjectData, } @@ -59,7 +59,7 @@ impl ObjectType { } impl Object { - pub fn new(size: u64, e_tag: BlockID, object_data: ObjectData) -> Self { + pub fn new(size: u64, hash: BlockID, object_data: ObjectData) -> Self { let object_type = match &object_data { ObjectData::SinglePart { .. } => ObjectType::Single, ObjectData::MultiPart { .. } => ObjectType::Multipart, @@ -69,7 +69,7 @@ impl Object { object_type, size, ctime: Utc::now().timestamp(), - e_tag, + hash, data: object_data, } } @@ -84,13 +84,17 @@ impl Object { pub fn format_e_tag(&self) -> String { if let ObjectData::MultiPart { parts, .. } = &self.data { - format!("\"{}-{}\"", hex_string(&self.e_tag), parts) + format!("\"{}-{}\"", hex_string(&self.hash), parts) } else { // Handle error case or provide default - format!("\"{}\"", hex_string(&self.e_tag)) + format!("\"{}\"", hex_string(&self.hash)) } } + pub fn hash(&self) -> &BlockID { + &self.hash + } + pub fn touch(&mut self) { self.ctime = Utc::now().timestamp(); } @@ -155,7 +159,7 @@ impl From<&Object> for Vec { raw_data.extend_from_slice(&o.object_type.as_u8().to_le_bytes()); raw_data.extend_from_slice(&o.size.to_le_bytes()); raw_data.extend_from_slice(&o.ctime.to_le_bytes()); - raw_data.extend_from_slice(&o.e_tag); + raw_data.extend_from_slice(&o.hash); // Write variant-specific data match &o.data { @@ -270,7 +274,7 @@ impl TryFrom<&[u8]> for Object { object_type, size, ctime, - e_tag, + hash: e_tag, data, }) } @@ -334,7 +338,7 @@ mod tests { assert_eq!(deserialized.object_type, expected_type); assert_eq!(deserialized.size, obj.size); assert_eq!(deserialized.ctime, obj.ctime); - assert_eq!(deserialized.e_tag, obj.e_tag); + assert_eq!(deserialized.hash, obj.hash); match (obj.data, deserialized.data) { (ObjectData::SinglePart { blocks: b1 }, ObjectData::SinglePart { blocks: b2 }) => { diff --git a/src/s3fs.rs b/src/s3fs.rs index 50a8433..fc0743d 100644 --- a/src/s3fs.rs +++ b/src/s3fs.rs @@ -66,7 +66,7 @@ impl S3FS { // Compute the e_tag of the multpart upload. Per the S3 standard (according to minio), the // e_tag of a multipart uploaded object is the Md5 of the Md5 of the parts. - fn calculate_multipart_etag(&self, blocks: &[BlockID]) -> io::Result<([u8; 16], usize)> { + fn calculate_multipart_hash(&self, blocks: &[BlockID]) -> io::Result<([u8; 16], usize)> { let mut hasher = Md5::new(); let mut size = 0; let block_map = self.casfs.block_tree()?; @@ -149,13 +149,13 @@ impl S3 for S3FS { blocks.extend_from_slice(mp.blocks()); } - let (e_tag, size) = try_!(self.calculate_multipart_etag(&blocks)); + let (content_hash, size) = try_!(self.calculate_multipart_hash(&blocks)); let object_meta = try_!(self.casfs.create_object_meta( &bucket, &key, size as u64, - e_tag, + content_hash, ObjectData::MultiPart { blocks, parts: cnt as usize @@ -664,6 +664,8 @@ impl S3 for S3FS { use futures::TryStreamExt; if content_length <= self.casfs.max_inlined_data_length() as i64 { // Collect stream into Vec + // it is safe to collect the stream into memory as the content length is + // considered small let data: Vec = body .try_collect::>() .await @@ -685,9 +687,9 @@ impl S3 for S3FS { let converted_stream = convert_stream_error(body); let byte_stream = ByteStream::new_with_size(converted_stream, content_length.unwrap() as usize); - let (obj_meta, _, _, _) = try_!( + let obj_meta = try_!( self.casfs - .store_object_and_meta(&bucket, &key, byte_stream) + .store_single_object_and_meta(&bucket, &key, byte_stream) .await ); diff --git a/tests/it_s3.rs b/tests/it_s3.rs index 688a535..3e4c644 100644 --- a/tests/it_s3.rs +++ b/tests/it_s3.rs @@ -59,46 +59,58 @@ macro_rules! log_and_unwrap { }; } -fn config() -> &'static SdkConfig { - static CONFIG: Lazy = Lazy::new(|| { - setup_tracing(); - - // Fake credentials - let cred = Credentials::for_tests(); - - let metrics = s3_cas::metrics::SharedMetrics::new(); - let storage_engine = s3_cas::cas::StorageEngine::Fjall; - let casfs = s3_cas::cas::CasFS::new( - FS_ROOT.into(), - FS_ROOT.into(), - metrics.clone(), - storage_engine, - Some(1), - ); - let s3fs = s3_cas::s3fs::S3FS::new(FS_ROOT.into(), FS_ROOT.into(), casfs, metrics.clone()); - - // Setup S3 service - let service = { - let mut b = S3ServiceBuilder::new(s3fs); - b.set_auth(s3s::auth::SimpleAuth::from_single( - cred.access_key_id(), - cred.secret_access_key(), - )); - b.set_host(SingleDomain::new(DOMAIN_NAME).unwrap()); - b.build() - }; - - // Convert to aws http client - let client = s3s_aws::Client::from(service.into_shared()); - - // Setup aws sdk config - SdkConfig::builder() - .credentials_provider(SharedCredentialsProvider::new(cred)) - .http_client(client) - .region(Region::new(REGION)) - .endpoint_url(format!("http://{DOMAIN_NAME}")) - .build() - }); +use std::sync::Mutex as StdMutex; + +// Create a static CONFIG_SIZE to store the inlined size +static CONFIG_SIZE: StdMutex> = StdMutex::new(None); + +static CONFIG: Lazy = Lazy::new(|| { + setup_tracing(); + + // Fake credentials + let cred = Credentials::for_tests(); + + let metrics = s3_cas::metrics::SharedMetrics::new(); + let storage_engine = s3_cas::cas::StorageEngine::Fjall; + let casfs = s3_cas::cas::CasFS::new( + FS_ROOT.into(), + FS_ROOT.into(), + metrics.clone(), + storage_engine, + Some(1), + ); + let s3fs = s3_cas::s3fs::S3FS::new(FS_ROOT.into(), FS_ROOT.into(), casfs, metrics.clone()); + + // Setup S3 service + let service = { + let mut b = S3ServiceBuilder::new(s3fs); + b.set_auth(s3s::auth::SimpleAuth::from_single( + cred.access_key_id(), + cred.secret_access_key(), + )); + b.set_host(SingleDomain::new(DOMAIN_NAME).unwrap()); + b.build() + }; + + // Convert to aws http client + let client = s3s_aws::Client::from(service.into_shared()); + + // Setup aws sdk config + SdkConfig::builder() + .credentials_provider(SharedCredentialsProvider::new(cred)) + .http_client(client) + .region(Region::new(REGION)) + .endpoint_url(format!("http://{DOMAIN_NAME}")) + .build() +}); + +fn setup_test() -> &'static SdkConfig { + setup_test_with_inlined_size(Some(1)) +} + +fn setup_test_with_inlined_size(inlined_metadata_size: Option) -> &'static SdkConfig { + // Set the inlined size before accessing CONFIG + *CONFIG_SIZE.lock().unwrap() = inlined_metadata_size; &CONFIG } @@ -126,9 +138,21 @@ async fn create_bucket(c: &Client, bucket: &str) -> Result<()> { #[tokio::test] #[tracing::instrument] async fn test_put_delete_object() -> Result<()> { + let test_cases = [ + Some(1), // Small inline size, the object will never be inlined + Some(10240000), // Very Large inline size, the object will always be inlined + ]; + + for size in test_cases { + do_test_put_delete_object(size).await?; + } + Ok(()) +} + +async fn do_test_put_delete_object(inlined_metadata_size: Option) -> Result<()> { let _guard = serial().await; - let c = Client::new(config()); + let c = Client::new(setup_test_with_inlined_size(inlined_metadata_size)); let bucket = format!("test-single-object-{}", Uuid::new_v4()); let bucket = bucket.as_str(); let key = "sample.txt"; @@ -211,7 +235,7 @@ async fn test_put_delete_object() -> Result<()> { #[tokio::test] #[tracing::instrument] async fn test_list_buckets() -> Result<()> { - let c = Client::new(config()); + let c = Client::new(setup_test()); let response1 = log_and_unwrap!(c.list_buckets().send().await); drop(response1); @@ -238,7 +262,7 @@ async fn test_list_buckets() -> Result<()> { #[tokio::test] #[tracing::instrument] async fn test_list_objects_v2() -> Result<()> { - let c = Client::new(config()); + let c = Client::new(setup_test()); let bucket = format!("test-list-objects-v2-{}", Uuid::new_v4()); let bucket_str = bucket.as_str(); create_bucket(&c, bucket_str).await?; @@ -319,7 +343,7 @@ async fn test_list_objects_v2_startafter() -> Result<()> { log::error!("Starting test_list_objects_v2_startafter"); - let c = Client::new(config()); + let c = Client::new(setup_test()); let bucket = format!("test-list-{}", Uuid::new_v4()); let bucket_str = bucket.as_str(); create_bucket(&c, bucket_str).await?; @@ -436,7 +460,7 @@ async fn test_list_objects_v2_startafter() -> Result<()> { async fn test_multipart() -> Result<()> { let _guard = serial().await; - let c = Client::new(config()); + let c = Client::new(setup_test()); let bucket = format!("test-multipart-{}", Uuid::new_v4()); let bucket = bucket.as_str(); From 7c536ed9d2ec59e6bb0dc6cc556a4b9af8ee9c26 Mon Sep 17 00:00:00 2001 From: Iwan BK Date: Wed, 19 Feb 2025 19:40:45 +0700 Subject: [PATCH 3/4] add test for inlined object --- src/cas/fs.rs | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/src/cas/fs.rs b/src/cas/fs.rs index ca66588..a8d0e4b 100644 --- a/src/cas/fs.rs +++ b/src/cas/fs.rs @@ -525,6 +525,24 @@ mod tests { assert_eq!(stored_block.rc(), 2); } + #[tokio::test] + async fn test_store_inlined_object() { + // Setup + let (fs, _dir) = setup_test_fs(); + let bucket_name = "test_bucket"; + let key = "test_key1"; + fs.create_bucket(bucket_name).unwrap(); + + let small_data = b"small test data".to_vec(); + let obj_meta = fs + .store_inlined_object(bucket_name, key, small_data.clone()) + .unwrap(); + + // Verify inlined data + assert_eq!(obj_meta.size(), small_data.len() as u64); + assert_eq!(obj_meta.inlined().unwrap(), &small_data); + } + #[tokio::test] async fn test_store_object_refcount() { // Setup From 84a481fcce79b9610e01159778546742778c1a27 Mon Sep 17 00:00:00 2001 From: Iwan BK Date: Thu, 20 Feb 2025 10:53:56 +0700 Subject: [PATCH 4/4] add command line option for the inline metadata size --- README.md | 22 ++++++++++++++++++---- src/main.rs | 5 ++++- src/metastore/stores/fjall.rs | 2 +- 3 files changed, 23 insertions(+), 6 deletions(-) diff --git a/README.md b/README.md index ce6c8cd..38afff3 100644 --- a/README.md +++ b/README.md @@ -18,12 +18,26 @@ cd s3-cas cargo build --release --features binary ``` +## Inline metadata + +Objects smaller than or equal to a configurable threshold can be stored directly in their metadata records, +improving performance for small objects. + +Configure this feature using the command-line option: +```console +--inline-metadata-size # omit to disable inlining +``` + +When the size is set: +- If the size of object data + metadata smaller than or equal to the threshold, the object data is stored in the metadata, + otherwise use the standard block storage +- Setting size to 0 or omitting the option disables inlining completely + +Currently, objects uploaded using the multipart method will never be inlined +because they are assumed to be large objects. + ## Known issues -- The metadata database (sled) has unbounded memory growth related to the objects stored. This means - the server will eventually consume all memory on the host and crash. To fix this the metadata database - should either be replaced with a new version of sled (still in development) or a different one entirely - Only the basic API is implemented, and even then it is not entirely implemented (for instance copy between servers is not implemented). -- The codebase is very much POC and not following a good abstraction structure - Single key only, no support to add multiple keys with different permissions. diff --git a/src/main.rs b/src/main.rs index f9f096d..784380e 100644 --- a/src/main.rs +++ b/src/main.rs @@ -31,6 +31,9 @@ struct Args { #[structopt(long, default_value = "9100")] metric_port: u16, + #[structopt(long, help = "leave empty to disable it")] + inline_metadata_size: Option, + #[structopt(long, requires("secret-key"), display_order = 1000)] access_key: Option, @@ -71,7 +74,7 @@ async fn run(args: Args) -> anyhow::Result<()> { args.meta_root.clone(), metrics.clone(), storage_engine, - None, + args.inline_metadata_size, ); let s3fs = s3_cas::s3fs::S3FS::new(args.fs_root, args.meta_root, casfs, metrics.clone()); let s3fs = s3_cas::metrics::MetricFs::new(s3fs, metrics.clone()); diff --git a/src/metastore/stores/fjall.rs b/src/metastore/stores/fjall.rs index f959279..b82f0e2 100644 --- a/src/metastore/stores/fjall.rs +++ b/src/metastore/stores/fjall.rs @@ -26,7 +26,7 @@ impl std::fmt::Debug for FjallStore { } } -const DEFAULT_INLINED_METADATA_SIZE: usize = 1024; +const DEFAULT_INLINED_METADATA_SIZE: usize = 1; // setting very low will practically disable it by default impl FjallStore { pub fn new(path: PathBuf, inlined_metadata_size: Option) -> Self {