Skip to content

Commit 8267f73

Browse files
committed
completion: refactor to use references instead of Arc
1 parent c5ada27 commit 8267f73

File tree

2 files changed

+31
-37
lines changed

2 files changed

+31
-37
lines changed

src/ceph.rs

Lines changed: 15 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -960,15 +960,15 @@ impl IoCtx {
960960

961961
/// Async variant of rados_object_write
962962
pub async fn rados_async_object_write(
963-
self: &Arc<Self>,
963+
&self,
964964
object_name: &str,
965965
buffer: &[u8],
966966
offset: u64,
967967
) -> RadosResult<i32> {
968968
self.ioctx_guard()?;
969969
let obj_name_str = CString::new(object_name)?;
970970

971-
with_completion(self.clone(), |c| unsafe {
971+
with_completion(self, |c| unsafe {
972972
rados_aio_write(
973973
self.ioctx,
974974
obj_name_str.as_ptr(),
@@ -977,67 +977,67 @@ impl IoCtx {
977977
buffer.len(),
978978
offset,
979979
)
980-
})
980+
})?
981981
.await
982982
}
983983

984984
/// Async variant of rados_object_append
985985
pub async fn rados_async_object_append(
986-
self: &Arc<Self>,
986+
&self,
987987
object_name: &str,
988988
buffer: &[u8],
989989
) -> RadosResult<i32> {
990990
self.ioctx_guard()?;
991991
let obj_name_str = CString::new(object_name)?;
992992

993-
with_completion(self.clone(), |c| unsafe {
993+
with_completion(self, |c| unsafe {
994994
rados_aio_append(
995995
self.ioctx,
996996
obj_name_str.as_ptr(),
997997
c,
998998
buffer.as_ptr() as *const ::libc::c_char,
999999
buffer.len(),
10001000
)
1001-
})
1001+
})?
10021002
.await
10031003
}
10041004

10051005
/// Async variant of rados_object_write_full
10061006
pub async fn rados_async_object_write_full(
1007-
self: &Arc<Self>,
1007+
&self,
10081008
object_name: &str,
10091009
buffer: &[u8],
10101010
) -> RadosResult<i32> {
10111011
self.ioctx_guard()?;
10121012
let obj_name_str = CString::new(object_name)?;
10131013

1014-
with_completion(self.clone(), |c| unsafe {
1014+
with_completion(self, |c| unsafe {
10151015
rados_aio_write_full(
10161016
self.ioctx,
10171017
obj_name_str.as_ptr(),
10181018
c,
10191019
buffer.as_ptr() as *const ::libc::c_char,
10201020
buffer.len(),
10211021
)
1022-
})
1022+
})?
10231023
.await
10241024
}
10251025

10261026
/// Async variant of rados_object_remove
1027-
pub async fn rados_async_object_remove(self: &Arc<Self>, object_name: &str) -> RadosResult<()> {
1027+
pub async fn rados_async_object_remove(&self, object_name: &str) -> RadosResult<()> {
10281028
self.ioctx_guard()?;
10291029
let object_name_str = CString::new(object_name)?;
10301030

1031-
with_completion(self.clone(), |c| unsafe {
1031+
with_completion(self, |c| unsafe {
10321032
rados_aio_remove(self.ioctx, object_name_str.as_ptr() as *const c_char, c)
1033-
})
1033+
})?
10341034
.await
10351035
.map(|_r| ())
10361036
}
10371037

10381038
/// Async variant of rados_object_read
10391039
pub async fn rados_async_object_read(
1040-
self: &Arc<Self>,
1040+
&self,
10411041
object_name: &str,
10421042
fill_buffer: &mut Vec<u8>,
10431043
read_offset: u64,
@@ -1049,7 +1049,7 @@ impl IoCtx {
10491049
fill_buffer.reserve_exact(DEFAULT_READ_BYTES);
10501050
}
10511051

1052-
with_completion(self.clone(), |c| unsafe {
1052+
with_completion(self, |c| unsafe {
10531053
rados_aio_read(
10541054
self.ioctx,
10551055
obj_name_str.as_ptr(),
@@ -1058,7 +1058,7 @@ impl IoCtx {
10581058
fill_buffer.capacity(),
10591059
read_offset,
10601060
)
1061-
})
1061+
})?
10621062
.await
10631063
}
10641064

src/completion.rs

Lines changed: 16 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414

1515
use std::ffi::c_void;
1616
use std::pin::Pin;
17-
use std::sync::{Arc, Mutex};
17+
use std::sync::Mutex;
1818
use std::task::{Context, Poll, Waker};
1919

2020
use crate::ceph::IoCtx;
@@ -25,7 +25,7 @@ use crate::rados::{
2525
rados_completion_t,
2626
};
2727

28-
struct Completion {
28+
pub(crate) struct Completion<'a> {
2929
inner: rados_completion_t,
3030

3131
// Box to provide a stable address for completion_complete callback
@@ -34,15 +34,15 @@ struct Completion {
3434

3535
// A reference to the IOCtx is required to issue a cancel on
3636
// the operation if we are dropped before ready. This needs
37-
// to be an Arc rather than a raw rados_ioctx_t because otherwise
37+
// to be a Rust reference rather than a raw rados_ioctx_t because otherwise
3838
// there would be nothing to stop the rados_ioctx_t being invalidated
3939
// during the lifetime of this Completion.
4040
// (AioCompletionImpl does hold a reference to IoCtxImpl for writes, but
4141
// not for reads.)
42-
ioctx: Arc<IoCtx>,
42+
ioctx: &'a IoCtx,
4343
}
4444

45-
unsafe impl Send for Completion {}
45+
unsafe impl Send for Completion<'_> {}
4646

4747
#[no_mangle]
4848
pub extern "C" fn completion_complete(_cb: rados_completion_t, arg: *mut c_void) -> () {
@@ -58,7 +58,7 @@ pub extern "C" fn completion_complete(_cb: rados_completion_t, arg: *mut c_void)
5858
}
5959
}
6060

61-
impl Drop for Completion {
61+
impl Drop for Completion<'_> {
6262
fn drop(&mut self) {
6363
// Ensure that after dropping the Completion, the AIO callback
6464
// will not be called on our dropped waker Box. Only necessary
@@ -67,7 +67,11 @@ impl Drop for Completion {
6767
let am_complete = unsafe { rados_aio_is_complete(self.inner) } != 0;
6868
if !am_complete {
6969
unsafe {
70-
rados_aio_cancel(self.ioctx.ioctx, self.inner);
70+
let cancel_r = rados_aio_cancel(self.ioctx.ioctx, self.inner);
71+
72+
// It is unsound to proceed if the Objecter op is still in flight
73+
assert!(cancel_r == 0 || cancel_r == -libc::ENOENT);
74+
7175
rados_aio_wait_for_complete_and_cb(self.inner);
7276
}
7377
}
@@ -78,7 +82,7 @@ impl Drop for Completion {
7882
}
7983
}
8084

81-
impl std::future::Future for Completion {
85+
impl std::future::Future for Completion<'_> {
8286
type Output = crate::error::RadosResult<i32>;
8387

8488
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
@@ -97,7 +101,10 @@ impl std::future::Future for Completion {
97101
}
98102
}
99103

100-
fn with_completion_impl<F>(ioctx: Arc<IoCtx>, f: F) -> RadosResult<Completion>
104+
/// Completions are only created via this wrapper, in order to ensure
105+
/// that the Completion struct is only constructed around 'armed' rados_completion_t
106+
/// instances (i.e. those that have been used to start an I/O).
107+
pub(crate) fn with_completion<F>(ioctx: &IoCtx, f: F) -> RadosResult<Completion<'_>>
101108
where
102109
F: FnOnce(rados_completion_t) -> libc::c_int,
103110
{
@@ -134,16 +141,3 @@ where
134141
})
135142
}
136143
}
137-
/// Completions are only created via this wrapper, in order to ensure
138-
/// that the Completion struct is only constructed around 'armed' rados_completion_t
139-
/// instances (i.e. those that have been used to start an I/O).
140-
pub async fn with_completion<F>(ioctx: Arc<IoCtx>, f: F) -> RadosResult<i32>
141-
where
142-
F: FnOnce(rados_completion_t) -> libc::c_int,
143-
{
144-
// Hide c_void* temporaries in a non-async function so that the future generated
145-
// by this function isn't encumbered by their non-Send-ness.
146-
let completion = with_completion_impl(ioctx, f)?;
147-
148-
completion.await
149-
}

0 commit comments

Comments
 (0)