Skip to content

Commit c76f843

Browse files
committed
async: add rados_async_write_stream -> WriteSink
This helps writers who would like to stream data into a RADOS object without managing their own backlog of futures.
1 parent 3ad3fca commit c76f843

File tree

4 files changed

+198
-10
lines changed

4 files changed

+198
-10
lines changed

src/ceph.rs

+42-4
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ use std::time::{Duration, SystemTime, UNIX_EPOCH};
4343

4444
use crate::list_stream::ListStream;
4545
use crate::read_stream::ReadStream;
46+
pub use crate::write_sink::WriteSink;
4647
use std::pin::Pin;
4748
use std::sync::Arc;
4849
use std::task::{Context, Poll};
@@ -1076,10 +1077,47 @@ impl IoCtx {
10761077
/// Streaming read of a RADOS object. The `ReadStream` object implements `futures::Stream`
10771078
/// for use with Stream-aware code like hyper's Body::wrap_stream.
10781079
///
1079-
/// This will usually issue more read ops than needed if used on a small object: for
1080-
/// small objects `rados_async_object_read` is more appropriate.
1081-
pub fn rados_async_object_read_stream(&self, object_name: &str) -> ReadStream<'_> {
1082-
ReadStream::new(self, object_name, None, None)
1080+
/// Useful for reading large objects incrementally, or anywhere you are using an interface
1081+
/// that expects a stream (such as proxying objects via an HTTP server).
1082+
///
1083+
/// Efficiency: If size_hint is not specified, and this function is used on a small object, it will
1084+
/// issue spurious read-ahead operations beyond the object's size.
1085+
/// If you have an object that you know is small, prefer to use a single `rados_async_object_read`
1086+
/// instead of this streaming variant.
1087+
///
1088+
/// * `buffer_size` - How much data should be read per rados read operation. This is also
1089+
/// how much data is emitted in each Item from the stream.
1090+
/// * `concurrency` - How many RADOS operations should be run in parallel for this stream,
1091+
/// or None to use a default.
1092+
/// * `size_hint` - If you have prior knowledge of the object's size in bytes, pass it here to enable
1093+
/// the stream to issue fewer read-ahead operations than it would by default. This is just
1094+
/// a hint, and does not bound the data returned -- if the object is smaller or larger
1095+
/// than `size_hint` then the actual object size will be reflected in the stream's output.
1096+
pub fn rados_async_object_read_stream(
1097+
&self,
1098+
object_name: &str,
1099+
buffer_size: Option<usize>,
1100+
concurrency: Option<usize>,
1101+
size_hint: Option<u64>,
1102+
) -> ReadStream<'_> {
1103+
ReadStream::new(self, object_name, buffer_size, concurrency, size_hint)
1104+
}
1105+
1106+
/// Streaming write of a RADOS object. The `WriteSink` object implements `futures::Sink`. Combine
1107+
/// it with other stream-aware code, or bring the SinkExt trait into scope to get methods
1108+
/// like send, send_all.
1109+
///
1110+
/// Efficiency: this class does not coalesce writes, so each Item you send into it,
1111+
///
1112+
///
1113+
/// * `concurrency` - How many RADOS operations should be run in parallel for this stream,
1114+
/// or None to use a default.
1115+
pub fn rados_async_object_write_stream(
1116+
&self,
1117+
object_name: &str,
1118+
concurrency: Option<usize>,
1119+
) -> WriteSink<'_> {
1120+
WriteSink::new(self, object_name, concurrency)
10831121
}
10841122

10851123
/// Get object stats (size,SystemTime)

src/lib.rs

+1
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,7 @@ pub(crate) mod completion;
8282
pub(crate) mod list_stream;
8383
mod mon_command;
8484
pub(crate) mod read_stream;
85+
pub(crate) mod write_sink;
8586

8687
pub use crate::ceph_client::CephClient;
8788
pub use crate::ceph_version::CephVersion;

src/read_stream.rs

+32-6
Original file line numberDiff line numberDiff line change
@@ -36,10 +36,18 @@ pub struct ReadStream<'a> {
3636
// Number of concurrent RADOS read ops to issue
3737
concurrency: usize,
3838

39+
// Caller's hint as to the object size (not required to be accurate)
40+
size_hint: Option<u64>,
41+
3942
in_flight: Vec<IOSlot<'a>>,
4043

44+
// Counter for how many bytes we have issued reads for
4145
next: u64,
4246

47+
// Counter for how many bytes we have yielded from poll_next()
48+
// (i.e. the size of the object so far)
49+
yielded: u64,
50+
4351
object_name: String,
4452

4553
// Flag is set when we see a short read - means do not issue any more IOs,
@@ -55,13 +63,16 @@ impl<'a> ReadStream<'a> {
5563
object_name: &str,
5664
buffer_size: Option<usize>,
5765
concurrency: Option<usize>,
66+
size_hint: Option<u64>,
5867
) -> Self {
5968
let mut inst = Self {
6069
ioctx,
6170
buffer_size: buffer_size.unwrap_or(DEFAULT_BUFFER_SIZE),
6271
concurrency: concurrency.unwrap_or(DEFAULT_CONCURRENCY),
72+
size_hint,
6373
in_flight: Vec::new(),
6474
next: 0,
75+
yielded: 0,
6576
object_name: object_name.to_string(),
6677
done: false,
6778
};
@@ -80,8 +91,20 @@ enum IOSlot<'a> {
8091

8192
impl<'a> ReadStream<'a> {
8293
fn maybe_issue(&mut self) {
83-
// Issue reads
84-
while self.in_flight.len() < self.concurrency {
94+
// Issue reads if any of these are true:
95+
// - Nothing is in flight
96+
// - No size bound, and in flight < concurrency
97+
// - A size bound, and we're within it, and in flight < concurrency
98+
// - A size bound, and it has been disproved, and in flight < concurrency
99+
100+
while !self.done
101+
&& (self.in_flight.is_empty()
102+
|| (((self.size_hint.is_some()
103+
&& (self.next < self.size_hint.unwrap()
104+
|| self.yielded > self.size_hint.unwrap()))
105+
|| self.size_hint.is_none())
106+
&& (self.in_flight.len() < self.concurrency)))
107+
{
85108
let read_at = self.next;
86109
self.next += self.buffer_size as u64;
87110

@@ -163,10 +186,8 @@ impl<'a> Stream for ReadStream<'a> {
163186
},
164187
};
165188

166-
self.maybe_issue();
167-
168189
// A result is ready, handle it.
169-
match result {
190+
let r = match result {
170191
Ok(length) => {
171192
if (length as usize) < self.buffer_size {
172193
// Cancel outstanding ops
@@ -175,9 +196,14 @@ impl<'a> Stream for ReadStream<'a> {
175196
// Flag to return Ready(None) on next call to poll.
176197
self.done = true;
177198
}
199+
self.yielded += buffer.len() as u64;
178200
Poll::Ready(Some(Ok(buffer)))
179201
}
180202
Err(e) => Poll::Ready(Some(Err(e))),
181-
}
203+
};
204+
205+
self.maybe_issue();
206+
207+
r
182208
}
183209
}

src/write_sink.rs

+123
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,123 @@
1+
use futures::{FutureExt, Sink, Stream};
2+
use std::future::Future;
3+
use std::pin::Pin;
4+
use std::task::{Context, Poll};
5+
6+
use crate::ceph::IoCtx;
7+
use crate::completion::with_completion;
8+
use crate::error::{RadosError, RadosResult};
9+
use crate::rados::rados_aio_write;
10+
use futures::stream::FuturesUnordered;
11+
use std::ffi::CString;
12+
use std::os::raw::c_char;
13+
14+
const DEFAULT_CONCURRENCY: usize = 2;
15+
16+
pub struct WriteSink<'a> {
17+
ioctx: &'a IoCtx,
18+
in_flight: Pin<Box<FuturesUnordered<Pin<Box<dyn Future<Output = RadosResult<u32>> + 'a>>>>>,
19+
object_name: String,
20+
21+
// Offset into object where the next write will land
22+
next: u64,
23+
24+
// How many RADOS ops in flight at same time?
25+
concurrency: usize,
26+
}
27+
28+
unsafe impl Send for WriteSink<'_> {}
29+
30+
impl<'a> WriteSink<'a> {
31+
pub fn new(ioctx: &'a IoCtx, object_name: &str, concurrency: Option<usize>) -> Self {
32+
let concurrency = concurrency.unwrap_or(DEFAULT_CONCURRENCY);
33+
assert!(concurrency > 0);
34+
35+
Self {
36+
ioctx,
37+
in_flight: Box::pin(FuturesUnordered::new()),
38+
object_name: object_name.to_string(),
39+
next: 0,
40+
concurrency,
41+
}
42+
}
43+
44+
fn trim_in_flight(
45+
mut self: Pin<&mut Self>,
46+
cx: &mut Context<'_>,
47+
target_len: usize,
48+
) -> Poll<Result<(), <Self as Sink<Vec<u8>>>::Error>> {
49+
while self.in_flight.len() > target_len {
50+
match self.in_flight.as_mut().poll_next(cx) {
51+
Poll::Pending => return Poll::Pending,
52+
Poll::Ready(None) => {
53+
// (because we check for in_flight size first)
54+
unreachable!()
55+
}
56+
Poll::Ready(Some(result)) => match result {
57+
Err(e) => return Poll::Ready(Err(e)),
58+
Ok(sz) => {
59+
debug!("trim_in_flight: IO completed with r={}", sz);
60+
}
61+
},
62+
};
63+
}
64+
65+
// Nothing left in flight, we're done
66+
Poll::Ready(Ok(()))
67+
}
68+
}
69+
70+
impl<'a> Sink<Vec<u8>> for WriteSink<'a> {
71+
type Error = RadosError;
72+
73+
fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
74+
// If we have fewer than 1 slots available, this will try to wait on some outstanding futures
75+
let target = self.as_ref().concurrency - 1;
76+
if self.in_flight.len() > target {
77+
self.trim_in_flight(cx, target)
78+
} else {
79+
Poll::Ready(Ok(()))
80+
}
81+
}
82+
83+
fn start_send(mut self: Pin<&mut Self>, item: Vec<u8>) -> Result<(), Self::Error> {
84+
let ioctx = self.ioctx;
85+
let obj_name_str = CString::new(self.object_name.clone()).expect("CString error");
86+
let write_at = self.next;
87+
self.next += item.len() as u64;
88+
89+
let mut fut = Box::pin(async move {
90+
let c = with_completion(ioctx, |c| unsafe {
91+
rados_aio_write(
92+
ioctx.ioctx,
93+
obj_name_str.as_ptr(),
94+
c,
95+
item.as_ptr() as *mut c_char,
96+
item.len(),
97+
write_at,
98+
)
99+
})?;
100+
101+
c.await
102+
});
103+
104+
// Kick the async{} future to get the RADOS op sent
105+
match fut.as_mut().now_or_never() {
106+
Some(Ok(_)) => Ok(()),
107+
Some(Err(e)) => return Err(e),
108+
None => {
109+
self.in_flight.push(fut);
110+
Ok(())
111+
}
112+
}
113+
}
114+
115+
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
116+
self.trim_in_flight(cx, 0)
117+
}
118+
119+
fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
120+
// There is no special work to be done on close
121+
self.poll_flush(cx)
122+
}
123+
}

0 commit comments

Comments
 (0)