Skip to content

Commit

Permalink
list objects v1
Browse files Browse the repository at this point in the history
  • Loading branch information
iwanbk committed Jan 16, 2025
1 parent c349179 commit 1a44078
Show file tree
Hide file tree
Showing 3 changed files with 124 additions and 90 deletions.
76 changes: 4 additions & 72 deletions src/cas/fs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ use s3_server::{
CreateMultipartUploadRequest, GetBucketLocationOutput, GetBucketLocationRequest,
GetObjectOutput, GetObjectRequest, HeadBucketOutput, HeadBucketRequest, HeadObjectOutput,
HeadObjectRequest, ListBucketsOutput, ListBucketsRequest, ListObjectsOutput,
ListObjectsRequest, ListObjectsV2Output, ListObjectsV2Request, Object as S3Object,
UploadPartOutput, UploadPartRequest,
ListObjectsRequest, ListObjectsV2Output, ListObjectsV2Request, UploadPartOutput,
UploadPartRequest,
},
errors::S3StorageResult,
headers::AmzCopySource,
Expand All @@ -48,7 +48,6 @@ const BLOCK_TREE: &str = "_BLOCKS";
const PATH_TREE: &str = "_PATHS";
const MULTIPART_TREE: &str = "_MULTIPART_PARTS";
pub const PTR_SIZE: usize = mem::size_of::<usize>(); // Size of a `usize` in bytes
const MAX_KEYS: i64 = 1000;

#[derive(Debug)]
pub struct CasFS {
Expand Down Expand Up @@ -549,76 +548,9 @@ impl S3Storage for CasFS {

async fn list_objects(
&self,
input: ListObjectsRequest,
_input: ListObjectsRequest,
) -> S3StorageResult<ListObjectsOutput, s3_server::dto::ListObjectsError> {
let ListObjectsRequest {
bucket,
delimiter,
prefix,
encoding_type,
marker,
max_keys,
..
} = input;

let key_count = max_keys
.map(|mk| if mk > MAX_KEYS { MAX_KEYS } else { mk })
.unwrap_or(MAX_KEYS);

let b = trace_try!(self.bucket(&bucket));

let start_bytes = if let Some(ref marker) = marker {
marker.as_bytes()
} else if let Some(ref prefix) = prefix {
prefix.as_bytes()
} else {
&[]
};
let prefix_bytes = prefix.as_deref().or(Some("")).unwrap().as_bytes();

let mut objects = b
.range(start_bytes..)
.filter_map(|read_result| match read_result {
Err(_) => None,
Ok((k, v)) => Some((k, v)),
})
.take_while(|(raw_key, _)| raw_key.starts_with(prefix_bytes))
.map(|(raw_key, raw_value)| {
// SAFETY: we only insert valid utf8 strings
let key = unsafe { String::from_utf8_unchecked(raw_key.to_vec()) };
// unwrap is fine as it would mean either a coding error or a corrupt DB
let obj = Object::try_from(&*raw_value).unwrap();

S3Object {
key: Some(key),
e_tag: Some(obj.format_e_tag()),
last_modified: Some(obj.format_ctime()),
owner: None,
size: Some(obj.size() as i64),
storage_class: None,
}
})
.take((key_count + 1) as usize)
.collect::<Vec<_>>();

let mut next_marker = None;
let truncated = objects.len() == key_count as usize + 1;
if truncated {
next_marker = Some(objects.pop().unwrap().key.unwrap())
}

Ok(ListObjectsOutput {
contents: Some(objects),
delimiter,
encoding_type,
name: Some(bucket),
common_prefixes: None,
is_truncated: Some(truncated),
next_marker: if marker.is_some() { next_marker } else { None },
marker,
max_keys: Some(key_count),
prefix,
})
Err(code_error!(NotImplemented, "Not Implemented").into())
}

async fn list_objects_v2(
Expand Down
82 changes: 80 additions & 2 deletions src/s3fs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,9 @@ use s3s::dto::{
DeleteBucketOutput, DeleteObjectInput, DeleteObjectOutput, DeleteObjectsInput,
DeleteObjectsOutput, DeletedObject, GetBucketLocationInput, GetBucketLocationOutput,
GetObjectInput, GetObjectOutput, HeadBucketInput, HeadBucketOutput, HeadObjectInput,
HeadObjectOutput, ListBucketsInput, ListBucketsOutput, ListObjectsV2Input, ListObjectsV2Output,
PutObjectInput, PutObjectOutput, UploadPartInput, UploadPartOutput,
HeadObjectOutput, ListBucketsInput, ListBucketsOutput, ListObjectsInput, ListObjectsOutput,
ListObjectsV2Input, ListObjectsV2Output, PutObjectInput, PutObjectOutput, UploadPartInput,
UploadPartOutput,
};
use s3s::s3_error;
use s3s::S3Result;
Expand Down Expand Up @@ -429,6 +430,83 @@ impl S3 for S3FS {
Ok(S3Response::new(output))
}

async fn list_objects(
&self,
req: S3Request<ListObjectsInput>,
) -> S3Result<S3Response<ListObjectsOutput>> {
let ListObjectsInput {
bucket,
delimiter,
prefix,
encoding_type,
marker,
max_keys,
..
} = req.input;

let key_count = max_keys
.map(|mk| if mk > MAX_KEYS { MAX_KEYS } else { mk })
.unwrap_or(MAX_KEYS);

let b = try_!(self.casfs.bucket(&bucket));

let start_bytes = if let Some(ref marker) = marker {
marker.as_bytes()
} else if let Some(ref prefix) = prefix {
prefix.as_bytes()
} else {
&[]
};
let prefix_bytes = prefix.as_deref().or(Some("")).unwrap().as_bytes();

let mut objects = b
.range(start_bytes..)
.filter_map(|read_result| match read_result {
Err(_) => None,
Ok((k, v)) => Some((k, v)),
})
.take_while(|(raw_key, _)| raw_key.starts_with(prefix_bytes))
.map(|(raw_key, raw_value)| {
// SAFETY: we only insert valid utf8 strings
let key = unsafe { String::from_utf8_unchecked(raw_key.to_vec()) };
// unwrap is fine as it would mean either a coding error or a corrupt DB
let obj = Object::try_from(&*raw_value).unwrap();

s3s::dto::Object {
key: Some(key),
e_tag: Some(obj.format_e_tag()),
last_modified: Some(obj.last_modified().into()),
owner: None,
size: Some(obj.size() as i64),
storage_class: None,
..Default::default()
}
})
.take((key_count + 1) as usize)
.collect::<Vec<_>>();

let mut next_marker = None;
let truncated = objects.len() == key_count as usize + 1;
if truncated {
next_marker = Some(objects.pop().unwrap().key.unwrap())
}

let output = ListObjectsOutput {
contents: Some(objects),
delimiter,
encoding_type,
name: Some(bucket),
//common_prefixes: None,
is_truncated: Some(truncated),
next_marker: if marker.is_some() { next_marker } else { None },
marker,
max_keys: Some(key_count),
prefix,
..Default::default()
};
Ok(S3Response::new(output))
}

async fn list_objects_v2(
&self,
req: S3Request<ListObjectsV2Input>,
Expand Down
56 changes: 40 additions & 16 deletions tests/it_s3.rs
Original file line number Diff line number Diff line change
Expand Up @@ -235,23 +235,47 @@ async fn test_list_objects_v2() -> Result<()> {
.await?;
}

let result = c
.list_objects_v2()
.bucket(bucket_str)
.prefix(test_prefix)
.send()
.await;

let response = log_and_unwrap!(result);
{
// list objects v1
let result = c
.list_objects()
.bucket(bucket_str)
.prefix(test_prefix)
.send()
.await;

let response = log_and_unwrap!(result);

let contents: Vec<_> = response
.contents()
.iter()
.filter_map(|obj| obj.key())
.collect();
assert!(!contents.is_empty());
assert!(contents.contains(&key1));
assert!(contents.contains(&key2));
}

let contents: Vec<_> = response
.contents()
.iter()
.filter_map(|obj| obj.key())
.collect();
assert!(!contents.is_empty());
assert!(contents.contains(&key1));
assert!(contents.contains(&key2));
{
// list objects v2
let result = c
.list_objects_v2()
.bucket(bucket_str)
.prefix(test_prefix)
.send()
.await;

let response = log_and_unwrap!(result);

let contents: Vec<_> = response
.contents()
.iter()
.filter_map(|obj| obj.key())
.collect();
assert!(!contents.is_empty());
assert!(contents.contains(&key1));
assert!(contents.contains(&key2));
}

Ok(())
}
Expand Down

0 comments on commit 1a44078

Please sign in to comment.