Skip to content

Commit fb48526

Browse files
committed
async: add rados_async_object_list
1 parent 6e4fc21 commit fb48526

File tree

3 files changed

+143
-3
lines changed

3 files changed

+143
-3
lines changed

src/ceph.rs

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ use std::io::{BufRead, Cursor};
4141
use std::net::IpAddr;
4242
use std::time::{Duration, SystemTime, UNIX_EPOCH};
4343

44+
use crate::list_stream::ListStream;
4445
use std::pin::Pin;
4546
use std::sync::Arc;
4647
use std::task::{Context, Poll};
@@ -1094,13 +1095,26 @@ impl IoCtx {
10941095
Ok((psize, (UNIX_EPOCH + Duration::from_secs(time as u64))))
10951096
}
10961097

1098+
pub fn rados_async_object_list(&self) -> RadosResult<ListStream> {
1099+
self.ioctx_guard()?;
1100+
let mut rados_list_ctx: rados_list_ctx_t = ptr::null_mut();
1101+
unsafe {
1102+
let r = rados_nobjects_list_open(self.ioctx, &mut rados_list_ctx);
1103+
if r == 0 {
1104+
Ok(ListStream::new(rados_list_ctx))
1105+
} else {
1106+
Err(r.into())
1107+
}
1108+
}
1109+
}
1110+
10971111
/// Async variant of rados_object_getxattr
10981112
pub async fn rados_async_object_getxattr(
10991113
&self,
11001114
object_name: &str,
11011115
attr_name: &str,
11021116
fill_buffer: &mut [u8],
1103-
) -> RadosResult<i32> {
1117+
) -> RadosResult<u32> {
11041118
self.ioctx_guard()?;
11051119
let object_name_str = CString::new(object_name)?;
11061120
let attr_name_str = CString::new(attr_name)?;
@@ -1124,7 +1138,7 @@ impl IoCtx {
11241138
object_name: &str,
11251139
attr_name: &str,
11261140
attr_value: &[u8],
1127-
) -> RadosResult<i32> {
1141+
) -> RadosResult<u32> {
11281142
self.ioctx_guard()?;
11291143
let object_name_str = CString::new(object_name)?;
11301144
let attr_name_str = CString::new(attr_name)?;
@@ -1147,7 +1161,7 @@ impl IoCtx {
11471161
&self,
11481162
object_name: &str,
11491163
attr_name: &str,
1150-
) -> RadosResult<i32> {
1164+
) -> RadosResult<u32> {
11511165
self.ioctx_guard()?;
11521166
let object_name_str = CString::new(object_name)?;
11531167
let attr_name_str = CString::new(attr_name)?;

src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,7 @@ pub mod utils;
7979
mod ceph_client;
8080
mod ceph_version;
8181
pub(crate) mod completion;
82+
pub(crate) mod list_stream;
8283
mod mon_command;
8384

8485
pub use crate::ceph_client::CephClient;

src/list_stream.rs

Lines changed: 125 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,125 @@
1+
use std::ffi::CStr;
2+
use std::pin::Pin;
3+
use std::task::{Context, Poll};
4+
5+
use futures::executor::ThreadPool;
6+
use futures::task::SpawnExt;
7+
use futures::{Future, Stream};
8+
9+
use crate::ceph::CephObject;
10+
use crate::error::{RadosError, RadosResult};
11+
use crate::rados::{rados_list_ctx_t, rados_nobjects_list_close, rados_nobjects_list_next};
12+
13+
/// Wrap rados_list_ctx_t to make it Send (hold across .await)
14+
#[derive(Copy, Clone)]
15+
struct ListCtxHandle(rados_list_ctx_t);
16+
unsafe impl Send for ListCtxHandle {}
17+
18+
/// A high level Stream interface to the librados 'nobjects_list' functionality.
19+
///
20+
/// librados does not expose asynchronous calls for object listing, so we use
21+
/// a background helper thread.
22+
pub struct ListStream {
23+
ctx: ListCtxHandle,
24+
workers: ThreadPool,
25+
26+
// We only have a single call to nobjects_list_next outstanding at
27+
// any time: rely on underlying librados/Objecter to do
28+
// batching/readahead
29+
next: Option<Pin<Box<dyn Future<Output = Option<RadosResult<CephObject>>>>>>,
30+
}
31+
32+
unsafe impl Send for ListStream {}
33+
34+
impl ListStream {
35+
pub fn new(ctx: rados_list_ctx_t) -> Self {
36+
Self {
37+
ctx: ListCtxHandle(ctx),
38+
workers: ThreadPool::builder()
39+
.pool_size(1)
40+
.create()
41+
.expect("Could not spawn worker thread"),
42+
next: None,
43+
}
44+
}
45+
}
46+
47+
impl Stream for ListStream {
48+
type Item = Result<CephObject, RadosError>;
49+
50+
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
51+
if self.next.is_none() {
52+
let list_ctx = self.ctx;
53+
self.next = Some(Box::pin(
54+
self.workers
55+
.spawn_with_handle(async move {
56+
let mut entry_ptr: *mut *const ::libc::c_char = std::ptr::null_mut();
57+
let mut key_ptr: *mut *const ::libc::c_char = std::ptr::null_mut();
58+
let mut nspace_ptr: *mut *const ::libc::c_char = std::ptr::null_mut();
59+
unsafe {
60+
let r = rados_nobjects_list_next(
61+
list_ctx.0,
62+
&mut entry_ptr,
63+
&mut key_ptr,
64+
&mut nspace_ptr,
65+
);
66+
67+
if r == -libc::ENOENT {
68+
None
69+
} else if r < 0 {
70+
Some(Err(r.into()))
71+
} else {
72+
let object_name =
73+
CStr::from_ptr(entry_ptr as *const ::libc::c_char);
74+
let mut object_locator = String::new();
75+
let mut namespace = String::new();
76+
if !key_ptr.is_null() {
77+
object_locator.push_str(
78+
&CStr::from_ptr(key_ptr as *const ::libc::c_char)
79+
.to_string_lossy(),
80+
);
81+
}
82+
if !nspace_ptr.is_null() {
83+
namespace.push_str(
84+
&CStr::from_ptr(nspace_ptr as *const ::libc::c_char)
85+
.to_string_lossy(),
86+
);
87+
}
88+
89+
Some(Ok(CephObject {
90+
name: object_name.to_string_lossy().into_owned(),
91+
entry_locator: object_locator,
92+
namespace,
93+
}))
94+
}
95+
}
96+
})
97+
.expect("Could not spawn background task"),
98+
));
99+
}
100+
101+
let result = self.next.as_mut().unwrap().as_mut().poll(cx);
102+
match &result {
103+
Poll::Pending => Poll::Pending,
104+
_ => {
105+
self.next = None;
106+
result
107+
}
108+
}
109+
110+
// match self.next.as_mut().unwrap().as_mut().poll(cx) {
111+
// Poll::Pending => Poll: Pending,
112+
// Poll::Ready(None) => Poll::Ready(None),
113+
// Poll::Ready(Some(Err(rados_error))) => Poll::Ready(Some(Err(rados_error))),
114+
// Poll::Ready(Some(Ok(ceph_object))) => Poll::Ready(Some(Err(rados_error))),
115+
// }
116+
}
117+
}
118+
119+
impl Drop for ListStream {
120+
fn drop(&mut self) {
121+
unsafe {
122+
rados_nobjects_list_close(self.ctx.0);
123+
}
124+
}
125+
}

0 commit comments

Comments
 (0)