From 6f8504d272fe42276559faeabb2fe4d3441036dd Mon Sep 17 00:00:00 2001 From: Iwan BK Date: Thu, 16 Jan 2025 15:40:47 +0700 Subject: [PATCH] implement other S3 API - complete_multipart_upload - create_multipart - list_buckets - list_objects_v2 - upload_part --- src/cas.rs | 2 +- src/cas/fs.rs | 5 +- src/metrics.rs | 4 +- src/s3fs.rs | 356 ++++++++++++++++++++++++++++++++++++++++++++++--- tests/it_s3.rs | 128 +++++++++++++++++- 5 files changed, 470 insertions(+), 25 deletions(-) diff --git a/src/cas.rs b/src/cas.rs index 6169712..c6e746d 100644 --- a/src/cas.rs +++ b/src/cas.rs @@ -4,7 +4,7 @@ pub mod bucket_meta; mod buffered_byte_stream; mod errors; mod fs; -mod multipart; +pub mod multipart; pub mod object; pub mod range_request; diff --git a/src/cas/fs.rs b/src/cas/fs.rs index d8cd011..fe9c29b 100644 --- a/src/cas/fs.rs +++ b/src/cas/fs.rs @@ -56,6 +56,7 @@ const MULTIPART_TREE: &str = "_MULTIPART_PARTS"; pub const PTR_SIZE: usize = mem::size_of::(); // Size of a `usize` in bytes const MAX_KEYS: i64 = 1000; +#[derive(Debug)] pub struct CasFS { db: Db, root: PathBuf, @@ -122,7 +123,7 @@ impl CasFS { } /// Open the tree containing the multipart parts. - fn multipart_tree(&self) -> Result { + pub fn multipart_tree(&self) -> Result { self.db.open_tree(MULTIPART_TREE) } @@ -249,7 +250,7 @@ impl CasFS { } /// Get a list of all buckets in the system. - fn buckets(&self) -> Result, sled::Error> { + pub fn buckets(&self) -> Result, sled::Error> { Ok(self .bucket_meta_tree()? .scan_prefix(&[]) diff --git a/src/metrics.rs b/src/metrics.rs index 6cd5fe4..bdf1e65 100644 --- a/src/metrics.rs +++ b/src/metrics.rs @@ -25,7 +25,7 @@ const S3_API_METHODS: &[&str] = &[ "upload_part", ]; -#[derive(Clone)] +#[derive(Clone, Debug)] pub struct SharedMetrics { metrics: Arc, } @@ -51,7 +51,7 @@ impl Deref for SharedMetrics { &self.metrics } } - +#[derive(Debug)] pub struct Metrics { method_calls: IntCounterVec, bucket_count: IntGauge, diff --git a/src/s3fs.rs b/src/s3fs.rs index 43235fd..d880cba 100644 --- a/src/s3fs.rs +++ b/src/s3fs.rs @@ -1,33 +1,47 @@ -use crate::cas::block::Block; -use crate::cas::block_stream::BlockStream; -use crate::cas::object::Object; -use crate::cas::range_request::parse_range_request; -use crate::cas::CasFS; -use crate::metrics::SharedMetrics; +use std::convert::TryFrom; +use std::io::{self, ErrorKind}; +use std::ops::Deref; +use std::path::PathBuf; + use bytes::Bytes; use chrono::prelude::*; +use faster_hex::{hex_decode, hex_string}; use futures::Stream; use futures::StreamExt; +use md5::{Digest, Md5}; +use tracing::error; +use tracing::info; +use uuid::Uuid; + use s3_server::dto::ByteStream; use s3s::dto::StreamingBlob; use s3s::dto::Timestamp; use s3s::dto::{ - CreateBucketInput, CreateBucketOutput, DeleteBucketInput, DeleteBucketOutput, - DeleteObjectInput, DeleteObjectOutput, DeleteObjectsInput, DeleteObjectsOutput, DeletedObject, - GetBucketLocationInput, GetBucketLocationOutput, GetObjectInput, GetObjectOutput, - HeadBucketInput, HeadBucketOutput, HeadObjectInput, HeadObjectOutput, PutObjectInput, - PutObjectOutput, + Bucket, CompleteMultipartUploadInput, CompleteMultipartUploadOutput, CreateBucketInput, + CreateBucketOutput, CreateMultipartUploadInput, CreateMultipartUploadOutput, DeleteBucketInput, + DeleteBucketOutput, DeleteObjectInput, DeleteObjectOutput, DeleteObjectsInput, + DeleteObjectsOutput, DeletedObject, GetBucketLocationInput, GetBucketLocationOutput, + GetObjectInput, GetObjectOutput, HeadBucketInput, HeadBucketOutput, HeadObjectInput, + HeadObjectOutput, ListBucketsInput, ListBucketsOutput, ListObjectsV2Input, ListObjectsV2Output, + PutObjectInput, PutObjectOutput, UploadPartInput, UploadPartOutput, }; use s3s::s3_error; use s3s::S3Result; use s3s::S3; use s3s::{S3Request, S3Response}; -use std::convert::TryFrom; -use std::io::{self, ErrorKind}; -use std::path::PathBuf; -use tracing::error; -use tracing::info; +use crate::cas::block::Block; +use crate::cas::block_stream::BlockStream; +use crate::cas::bucket_meta::BucketMeta; +use crate::cas::multipart::MultiPart; +use crate::cas::object::Object; +use crate::cas::range_request::parse_range_request; +use crate::cas::CasFS; +use crate::metrics::SharedMetrics; + +const MAX_KEYS: i32 = 1000; + +#[derive(Debug)] pub struct S3FS { root: PathBuf, casfs: CasFS, @@ -58,13 +72,107 @@ impl S3FS { } } -use crate::cas::bucket_meta::BucketMeta; fn fmt_content_range(start: u64, end_inclusive: u64, size: u64) -> String { format!("bytes {start}-{end_inclusive}/{size}") } #[async_trait::async_trait] impl S3 for S3FS { + #[tracing::instrument] + async fn complete_multipart_upload( + &self, + req: S3Request, + ) -> S3Result> { + let CompleteMultipartUploadInput { + multipart_upload, + bucket, + key, + upload_id, + .. + } = req.input; + + let multipart_upload = if let Some(multipart_upload) = multipart_upload { + multipart_upload + } else { + let err = s3_error!(InvalidPart, "Missing multipart_upload"); + return Err(err.into()); + }; + + let multipart_map = try_!(self.casfs.multipart_tree()); + + let mut blocks = vec![]; + let mut cnt: i32 = 0; + for part in multipart_upload.parts.iter().flatten() { + let part_number = try_!(part + .part_number + .ok_or_else(|| { io::Error::new(io::ErrorKind::NotFound, "Missing part_number") })); + cnt = cnt.wrapping_add(1); + if part_number != cnt { + try_!(Err(io::Error::new( + io::ErrorKind::Other, + "InvalidPartOrder" + ))); + } + let part_key = format!("{}-{}-{}-{}", &bucket, &key, &upload_id, part_number); + let part_data_enc = try_!(multipart_map.get(&part_key)); + let part_data_enc = match part_data_enc { + Some(pde) => pde, + None => { + error!("Missing part \"{}\" in multipart upload", part_key); + return Err(s3_error!(InvalidArgument, "Part not uploaded").into()); + } + }; + + // unwrap here is safe as it is a coding error + let mp = MultiPart::try_from(&*part_data_enc).expect("Corrupted multipart data"); + + blocks.extend_from_slice(mp.blocks()); + } + + // Compute the e_tag of the multpart upload. Per the S3 standard (according to minio), the + // e_tag of a multipart uploaded object is the Md5 of the Md5 of the parts. + let mut hasher = Md5::new(); + let mut size = 0; + let block_map = try_!(self.casfs.block_tree()); + for block in &blocks { + let bi = try_!(block_map.get(&block)).unwrap(); // unwrap is fine as all blocks in must be present + let block_info = Block::try_from(&*bi).expect("Block data is corrupt"); + size += block_info.size(); + hasher.update(&block); + } + let e_tag = hasher.finalize().into(); + + let bc = try_!(self.casfs.bucket(&bucket)); + + let object = Object::new(size as u64, e_tag, cnt as usize, blocks); + + try_!(bc.insert(&key, Vec::::from(&object))); + + // Try to delete the multipart metadata. If this fails, it is not really an issue. + for part in multipart_upload.parts.into_iter().flatten() { + let part_key = format!( + "{}-{}-{}-{}", + &bucket, + &key, + &upload_id, + part.part_number.unwrap() + ); + + if let Err(e) = multipart_map.remove(part_key) { + error!("Could not remove part: {}", e); + }; + } + + let output = CompleteMultipartUploadOutput { + bucket: Some(bucket), + key: Some(key), + e_tag: Some(object.format_e_tag()), + ..Default::default() + }; + Ok(S3Response::new(output)) + } + + #[tracing::instrument] async fn create_bucket( &self, req: S3Request, @@ -97,6 +205,26 @@ impl S3 for S3FS { Ok(S3Response::new(output)) } + #[tracing::instrument] + async fn create_multipart_upload( + &self, + req: S3Request, + ) -> S3Result> { + let CreateMultipartUploadInput { bucket, key, .. } = req.input; + + let upload_id = Uuid::new_v4().to_string(); + + let output = CreateMultipartUploadOutput { + bucket: Some(bucket), + key: Some(key), + upload_id: Some(upload_id.to_string()), + ..Default::default() + }; + + Ok(S3Response::new(output)) + } + + #[tracing::instrument] async fn delete_bucket( &self, req: S3Request, @@ -110,6 +238,7 @@ impl S3 for S3FS { Ok(S3Response::new(DeleteBucketOutput {})) } + #[tracing::instrument] async fn delete_object( &self, req: S3Request, @@ -129,6 +258,7 @@ impl S3 for S3FS { Ok(S3Response::new(output)) } + #[tracing::instrument] async fn delete_objects( &self, req: S3Request, @@ -190,6 +320,7 @@ impl S3 for S3FS { Ok(S3Response::new(output)) } + #[tracing::instrument] async fn get_object( &self, req: S3Request, @@ -247,6 +378,7 @@ impl S3 for S3FS { Ok(S3Response::new(output)) } + #[tracing::instrument] async fn head_bucket( &self, req: S3Request, @@ -260,6 +392,7 @@ impl S3 for S3FS { Ok(S3Response::new(HeadBucketOutput::default())) } + #[tracing::instrument] async fn head_object( &self, req: S3Request, @@ -284,6 +417,131 @@ impl S3 for S3FS { Ok(S3Response::new(output)) } + #[tracing::instrument] + async fn list_buckets( + &self, + _: S3Request, + ) -> S3Result> { + let csfs_buckets = try_!(self.casfs.buckets()); + let mut buckets = Vec::with_capacity(csfs_buckets.len()); + for bucket in csfs_buckets { + let bucket = Bucket { + creation_date: None, //creation_date: bucket.creation_date, TODO: fix it + name: bucket.name, + }; + buckets.push(bucket); + } + let output = ListBucketsOutput { + buckets: Some(buckets), + owner: None, + ..Default::default() + }; + Ok(S3Response::new(output)) + } + + #[tracing::instrument] + async fn list_objects_v2( + &self, + req: S3Request, + ) -> S3Result> { + let ListObjectsV2Input { + bucket, + delimiter, + prefix, + encoding_type, + start_after, + max_keys, + continuation_token, + .. + } = req.input; + + let b = try_!(self.casfs.bucket(&bucket)); + + let key_count = max_keys + .map(|mk| if mk > MAX_KEYS { MAX_KEYS } else { mk }) + .unwrap_or(MAX_KEYS); + + let token = if let Some(ref rt) = continuation_token { + let mut out = vec![0; rt.len() / 2]; + if hex_decode(rt.as_bytes(), &mut out).is_err() { + return Err( + s3_error!(InvalidToken, "continuation token has an invalid format").into(), + ); + }; + match String::from_utf8(out) { + Ok(s) => Some(s), + Err(_) => { + return Err(s3_error!(InvalidToken, "continuation token is invalid").into()) + } + } + } else { + None + }; + + let start_bytes = if let Some(ref token) = token { + token.as_bytes() + } else if let Some(ref prefix) = prefix { + prefix.as_bytes() + } else if let Some(ref start_after) = start_after { + start_after.as_bytes() + } else { + &[] + }; + let prefix_bytes = prefix.as_deref().or(Some("")).unwrap().as_bytes(); + + let mut objects: Vec<_> = b + .range(start_bytes..) + .filter_map(|read_result| match read_result { + Ok((r, k)) => Some((r, k)), + Err(_) => None, + }) + .skip_while(|(raw_key, _)| match start_after { + None => false, + Some(ref start_after) => raw_key.deref() <= start_after.as_bytes(), + }) + .take_while(|(raw_key, _)| raw_key.starts_with(prefix_bytes)) + .map(|(raw_key, raw_value)| { + // SAFETY: we only insert valid utf8 strings + let key = unsafe { String::from_utf8_unchecked(raw_key.to_vec()) }; + // unwrap is fine as it would mean either a coding error or a corrupt DB + let obj = Object::try_from(&*raw_value).unwrap(); + + s3s::dto::Object { + key: Some(key), + e_tag: Some(obj.format_e_tag()), + last_modified: Some(obj.last_modified().into()), + owner: None, + size: Some(obj.size() as i64), + storage_class: None, + ..Default::default() + } + }) + .take((key_count + 1) as usize) + .collect(); + + let mut next_token = None; + let truncated = objects.len() == key_count as usize + 1; + if truncated { + next_token = Some(hex_string(objects.pop().unwrap().key.unwrap().as_bytes())) + } + + let output = ListObjectsV2Output { + key_count: Some(key_count), + max_keys: Some(key_count), + contents: Some(objects), + continuation_token, + delimiter: delimiter, + encoding_type, + name: Some(bucket), + prefix, + start_after, + next_continuation_token: next_token, + ..Default::default() + }; + Ok(S3Response::new(output)) + } + + #[tracing::instrument] async fn put_object( &self, req: S3Request, @@ -329,6 +587,70 @@ impl S3 for S3FS { }; Ok(S3Response::new(output)) } + + #[tracing::instrument] + async fn upload_part( + &self, + req: S3Request, + ) -> S3Result> { + let UploadPartInput { + body, + bucket, + content_length, + content_md5: _, // TODO: Verify + key, + part_number, + upload_id, + .. + } = req.input; + + let Some(body) = body else { + return Err(s3_error!(IncompleteBody)); + }; + + let content_length = content_length.ok_or_else(|| { + s3_error!( + MissingContentLength, + "You did not provide the number of bytes in the Content-Length HTTP header." + ) + })?; + + let converted_stream = convert_stream_error(body); + let byte_stream = ByteStream::new_with_size(converted_stream, content_length as usize); + let (blocks, hash, size) = try_!(self.casfs.store_bytes(byte_stream).await); + + if size != content_length as u64 { + return Err(s3_error!( + InvalidRequest, + "You did not send the amount of bytes specified by the Content-Length HTTP header." + ) + .into()); + } + + let mp_map = try_!(self.casfs.multipart_tree()); + + let e_tag = format!("\"{}\"", hex_string(&hash)); + let storage_key = format!("{}-{}-{}-{}", &bucket, &key, &upload_id, part_number); + let mp = MultiPart::new( + size as usize, + part_number as i64, + bucket, + key, + upload_id, + hash, + blocks, + ); + + let enc_mp = Vec::from(&mp); + + try_!(mp_map.insert(storage_key, enc_mp)); + + let output = UploadPartOutput { + e_tag: Some(e_tag), + ..Default::default() + }; + Ok(S3Response::new(output)) + } } // Add helper function diff --git a/tests/it_s3.rs b/tests/it_s3.rs index f6972d0..4582c9a 100644 --- a/tests/it_s3.rs +++ b/tests/it_s3.rs @@ -18,8 +18,8 @@ use aws_sdk_s3::Client; use aws_sdk_s3::types::BucketLocationConstraint; //use aws_sdk_s3::types::ChecksumMode; -//use aws_sdk_s3::types::CompletedMultipartUpload; -//use aws_sdk_s3::types::CompletedPart; +use aws_sdk_s3::types::CompletedMultipartUpload; +use aws_sdk_s3::types::CompletedPart; use aws_sdk_s3::types::CreateBucketConfiguration; use anyhow::Result; @@ -27,7 +27,7 @@ use once_cell::sync::Lazy; use std::convert::TryInto; use tokio::sync::Mutex; use tokio::sync::MutexGuard; -use tracing::debug; +use tracing::{debug, error}; use uuid::Uuid; const FS_ROOT: &str = concat!(env!("CARGO_TARGET_TMPDIR"), "/s3s-fs-tests-aws"); @@ -44,6 +44,21 @@ pub fn setup_tracing() { tracing::subscriber::set_global_default(subscriber).expect("setting default subscriber failed"); } +macro_rules! log_and_unwrap { + ($result:expr) => { + match $result { + Ok(ans) => { + debug!(?ans); + ans + } + Err(err) => { + error!(?err); + return Err(err.into()); + } + } + }; +} + fn config() -> &'static SdkConfig { static CONFIG: Lazy = Lazy::new(|| { setup_tracing(); @@ -162,6 +177,113 @@ async fn test_single_object() -> Result<()> { Ok(()) } +#[tokio::test] +#[tracing::instrument] +async fn test_list_buckets() -> Result<()> { + let c = Client::new(config()); + let response1 = log_and_unwrap!(c.list_buckets().send().await); + drop(response1); + + let bucket1 = format!("test-list-buckets-1-{}", Uuid::new_v4()); + let bucket1_str = bucket1.as_str(); + let bucket2 = format!("test-list-buckets-2-{}", Uuid::new_v4()); + let bucket2_str = bucket2.as_str(); + + create_bucket(&c, bucket1_str).await?; + create_bucket(&c, bucket2_str).await?; + + let response2 = log_and_unwrap!(c.list_buckets().send().await); + let bucket_names: Vec<_> = response2 + .buckets() + .iter() + .filter_map(|bucket| bucket.name()) + .collect(); + assert!(bucket_names.contains(&bucket1_str)); + assert!(bucket_names.contains(&bucket2_str)); + + Ok(()) +} + +#[tokio::test] +#[tracing::instrument] +async fn test_multipart() -> Result<()> { + let _guard = serial().await; + + let c = Client::new(config()); + + let bucket = format!("test-multipart-{}", Uuid::new_v4()); + let bucket = bucket.as_str(); + create_bucket(&c, bucket).await?; + + let key = "sample.txt"; + let content = "abcdefghijklmnopqrstuvwxyz/0123456789/!@#$%^&*();\n"; + + let upload_id = { + let ans = c + .create_multipart_upload() + .bucket(bucket) + .key(key) + .send() + .await?; + ans.upload_id.unwrap() + }; + let upload_id = upload_id.as_str(); + + let upload_parts = { + let body = ByteStream::from_static(content.as_bytes()); + let part_number = 1; + + let ans = c + .upload_part() + .bucket(bucket) + .key(key) + .upload_id(upload_id) + .body(body) + .part_number(part_number) + .send() + .await?; + + let part = CompletedPart::builder() + .e_tag(ans.e_tag.unwrap_or_default()) + .part_number(part_number) + .build(); + + vec![part] + }; + + { + let upload = CompletedMultipartUpload::builder() + .set_parts(Some(upload_parts)) + .build(); + + let _ = c + .complete_multipart_upload() + .bucket(bucket) + .key(key) + .multipart_upload(upload) + .upload_id(upload_id) + .send() + .await?; + } + + { + let ans = c.get_object().bucket(bucket).key(key).send().await?; + + let content_length: usize = ans.content_length().unwrap().try_into().unwrap(); + let body = ans.body.collect().await?.into_bytes(); + + assert_eq!(content_length, content.len()); + assert_eq!(body.as_ref(), content.as_bytes()); + } + + { + delete_object(&c, bucket, key).await?; + delete_bucket(&c, bucket).await?; + } + + Ok(()) +} + async fn delete_object(c: &Client, bucket: &str, key: &str) -> Result<()> { c.delete_object().bucket(bucket).key(key).send().await?; Ok(())