Skip to content

Commit 3ad3fca

Browse files
committed
async: add ReadStream for streaming reads of large objects
1 parent fb48526 commit 3ad3fca

File tree

3 files changed

+194
-0
lines changed

3 files changed

+194
-0
lines changed

src/ceph.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ use std::net::IpAddr;
4242
use std::time::{Duration, SystemTime, UNIX_EPOCH};
4343

4444
use crate::list_stream::ListStream;
45+
use crate::read_stream::ReadStream;
4546
use std::pin::Pin;
4647
use std::sync::Arc;
4748
use std::task::{Context, Poll};
@@ -1072,6 +1073,15 @@ impl IoCtx {
10721073
result
10731074
}
10741075

1076+
/// Streaming read of a RADOS object. The `ReadStream` object implements `futures::Stream`
1077+
/// for use with Stream-aware code like hyper's Body::wrap_stream.
1078+
///
1079+
/// This will usually issue more read ops than needed if used on a small object: for
1080+
/// small objects `rados_async_object_read` is more appropriate.
1081+
pub fn rados_async_object_read_stream(&self, object_name: &str) -> ReadStream<'_> {
1082+
ReadStream::new(self, object_name, None, None)
1083+
}
1084+
10751085
/// Get object stats (size,SystemTime)
10761086
pub async fn rados_async_object_stat(
10771087
&self,

src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,7 @@ mod ceph_version;
8181
pub(crate) mod completion;
8282
pub(crate) mod list_stream;
8383
mod mon_command;
84+
pub(crate) mod read_stream;
8485

8586
pub use crate::ceph_client::CephClient;
8687
pub use crate::ceph_version::CephVersion;

src/read_stream.rs

Lines changed: 183 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,183 @@
1+
// Copyright 2021 John Spray All rights reserved.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License
14+
15+
use futures::{FutureExt, Stream};
16+
use std::ffi::CString;
17+
use std::future::Future;
18+
use std::os::raw::c_char;
19+
use std::pin::Pin;
20+
use std::task::{Context, Poll};
21+
22+
use crate::ceph::IoCtx;
23+
use crate::completion::with_completion;
24+
use crate::error::RadosResult;
25+
use crate::rados::rados_aio_read;
26+
27+
const DEFAULT_BUFFER_SIZE: usize = 4 * 1024 * 1024;
28+
const DEFAULT_CONCURRENCY: usize = 2;
29+
30+
pub struct ReadStream<'a> {
31+
ioctx: &'a IoCtx,
32+
33+
// Size of each RADOS read op
34+
buffer_size: usize,
35+
36+
// Number of concurrent RADOS read ops to issue
37+
concurrency: usize,
38+
39+
in_flight: Vec<IOSlot<'a>>,
40+
41+
next: u64,
42+
43+
object_name: String,
44+
45+
// Flag is set when we see a short read - means do not issue any more IOs,
46+
// and return Poll::Ready(None) on next poll
47+
done: bool,
48+
}
49+
50+
unsafe impl Send for ReadStream<'_> {}
51+
52+
impl<'a> ReadStream<'a> {
53+
pub(crate) fn new(
54+
ioctx: &'a IoCtx,
55+
object_name: &str,
56+
buffer_size: Option<usize>,
57+
concurrency: Option<usize>,
58+
) -> Self {
59+
let mut inst = Self {
60+
ioctx,
61+
buffer_size: buffer_size.unwrap_or(DEFAULT_BUFFER_SIZE),
62+
concurrency: concurrency.unwrap_or(DEFAULT_CONCURRENCY),
63+
in_flight: Vec::new(),
64+
next: 0,
65+
object_name: object_name.to_string(),
66+
done: false,
67+
};
68+
69+
// Start IOs early, don't wait for the first poll.
70+
inst.maybe_issue();
71+
72+
inst
73+
}
74+
}
75+
76+
enum IOSlot<'a> {
77+
Pending(Pin<Box<dyn Future<Output = (Vec<u8>, RadosResult<u32>)> + 'a>>),
78+
Complete((Vec<u8>, RadosResult<u32>)),
79+
}
80+
81+
impl<'a> ReadStream<'a> {
82+
fn maybe_issue(&mut self) {
83+
// Issue reads
84+
while self.in_flight.len() < self.concurrency {
85+
let read_at = self.next;
86+
self.next += self.buffer_size as u64;
87+
88+
// Inefficient: copying out string to dodge ownership issues for the moment
89+
let object_name_bg = self.object_name.clone();
90+
91+
// Grab items for use inside async{} block to avoid referencing self from in there.
92+
let ioctx = self.ioctx;
93+
let read_size = self.buffer_size;
94+
95+
// Use an async block to tie together the lifetime of a Vec and the Completion that uses it
96+
let fut = async move {
97+
let obj_name_str = CString::new(object_name_bg).expect("CString error");
98+
let mut fill_buffer = Vec::with_capacity(read_size);
99+
let completion = with_completion(ioctx, |c| unsafe {
100+
rados_aio_read(
101+
ioctx.ioctx,
102+
obj_name_str.as_ptr(),
103+
c,
104+
fill_buffer.as_mut_ptr() as *mut c_char,
105+
fill_buffer.capacity(),
106+
read_at,
107+
)
108+
})
109+
.expect("Can't issue read");
110+
111+
let result = completion.await;
112+
if let Ok(rval) = &result {
113+
unsafe {
114+
let len = *rval as usize;
115+
assert!(len <= fill_buffer.capacity());
116+
fill_buffer.set_len(len);
117+
}
118+
}
119+
120+
(fill_buffer, result)
121+
};
122+
123+
let mut fut = Box::pin(fut);
124+
125+
let slot = match fut.as_mut().now_or_never() {
126+
Some(result) => IOSlot::Complete(result),
127+
None => IOSlot::Pending(fut),
128+
};
129+
130+
self.in_flight.push(slot);
131+
}
132+
}
133+
}
134+
135+
impl<'a> Stream for ReadStream<'a> {
136+
type Item = RadosResult<Vec<u8>>;
137+
138+
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
139+
if self.done {
140+
// Our last read result was a short one: we know nothing else needs doing.
141+
return Poll::Ready(None);
142+
}
143+
144+
self.maybe_issue();
145+
146+
// Poll next read: maybe return pending if none is ready
147+
let next_op = &mut self.in_flight[0];
148+
let (buffer, result) = match next_op {
149+
IOSlot::Complete(_) => {
150+
let complete = self.in_flight.remove(0);
151+
if let IOSlot::Complete(c) = complete {
152+
c
153+
} else {
154+
panic!("Cannot happen")
155+
}
156+
}
157+
IOSlot::Pending(fut) => match fut.as_mut().poll(cx) {
158+
Poll::Pending => return Poll::Pending,
159+
Poll::Ready(r) => {
160+
self.in_flight.remove(0);
161+
r
162+
}
163+
},
164+
};
165+
166+
self.maybe_issue();
167+
168+
// A result is ready, handle it.
169+
match result {
170+
Ok(length) => {
171+
if (length as usize) < self.buffer_size {
172+
// Cancel outstanding ops
173+
self.in_flight.clear();
174+
175+
// Flag to return Ready(None) on next call to poll.
176+
self.done = true;
177+
}
178+
Poll::Ready(Some(Ok(buffer)))
179+
}
180+
Err(e) => Poll::Ready(Some(Err(e))),
181+
}
182+
}
183+
}

0 commit comments

Comments
 (0)