Skip to content

Commit aa94d45

Browse files
committed
update stream::fuse
Signed-off-by: Yoshua Wuyts <[email protected]>
1 parent 7b4bb26 commit aa94d45

File tree

2 files changed

+19
-41
lines changed

2 files changed

+19
-41
lines changed

src/stream/stream/fuse.rs

+8-30
Original file line numberDiff line numberDiff line change
@@ -1,51 +1,29 @@
11
use std::pin::Pin;
2+
use std::task::{Context, Poll};
23

3-
use crate::task::{Context, Poll};
4-
5-
/// A `Stream` that is permanently closed
6-
/// once a single call to `poll` results in
7-
/// `Poll::Ready(None)`, returning `Poll::Ready(None)`
8-
/// for all future calls to `poll`.
4+
/// A `Stream` that is permanently closed once a single call to `poll` results in
5+
/// `Poll::Ready(None)`, returning `Poll::Ready(None)` for all future calls to `poll`.
96
#[derive(Clone, Debug)]
107
pub struct Fuse<S> {
11-
stream: S,
12-
done: bool,
8+
pub(crate) stream: S,
9+
pub(crate) done: bool,
1310
}
1411

1512
impl<S: Unpin> Unpin for Fuse<S> {}
1613

17-
impl<S: futures::Stream> Fuse<S> {
14+
impl<S: futures_core::Stream> Fuse<S> {
1815
pin_utils::unsafe_pinned!(stream: S);
1916
pin_utils::unsafe_unpinned!(done: bool);
20-
21-
/// Returns `true` if the underlying stream is fused.
22-
///
23-
/// If this `Stream` is fused, all future calls to
24-
/// `poll` will return `Poll::Ready(None)`.
25-
pub fn is_done(&self) -> bool {
26-
self.done
27-
}
28-
29-
/// Consumes this `Fuse` and returns the inner
30-
/// `Stream`, unfusing it if it had become
31-
/// fused.
32-
pub fn into_inner(self) -> S
33-
where
34-
S: Sized,
35-
{
36-
self.stream
37-
}
3817
}
3918

40-
41-
impl<S: futures::Stream> futures::Stream for Fuse<S> {
19+
impl<S: futures_core::Stream> futures_core::Stream for Fuse<S> {
4220
type Item = S::Item;
4321

4422
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<S::Item>> {
4523
if self.done {
4624
Poll::Ready(None)
4725
} else {
48-
let next = futures::ready!(self.as_mut().stream().poll_next(cx));
26+
let next = futures_core::ready!(self.as_mut().stream().poll_next(cx));
4927
if next.is_none() {
5028
*self.as_mut().done() = true;
5129
}

src/stream/stream/mod.rs

+11-11
Original file line numberDiff line numberDiff line change
@@ -248,29 +248,29 @@ pub trait Stream {
248248
Enumerate::new(self)
249249
}
250250

251-
/// Transforms this `Stream` into a "fused" `Stream`
252-
/// such that after the first time `poll` returns
253-
/// `Poll::Ready(None)`, all future calls to
254-
/// `poll` will also return `Poll::Ready(None)`.
251+
/// Transforms this `Stream` into a "fused" `Stream` such that after the first time `poll`
252+
/// returns `Poll::Ready(None)`, all future calls to `poll` will also return
253+
/// `Poll::Ready(None)`.
255254
///
256255
/// # Examples
257256
///
258257
/// ```
259-
/// # #![feature(async_await)]
260258
/// # fn main() { async_std::task::block_on(async {
261259
/// #
262260
/// use async_std::prelude::*;
263261
/// use async_std::stream;
264262
///
265-
/// let mut s = stream::repeat(9).take(3);
266-
///
267-
/// while let Some(v) = s.next().await {
268-
/// assert_eq!(v, 9);
269-
/// }
263+
/// let mut s = stream::once(1).fuse();
264+
/// assert_eq!(s.next().await, Some(1));
265+
/// assert_eq!(s.next().await, None);
266+
/// assert_eq!(s.next().await, None);
270267
/// #
271268
/// # }) }
272269
/// ```
273-
fn fuse(self) -> Fuse<Self> {
270+
fn fuse(self) -> Fuse<Self>
271+
where
272+
Self: Sized,
273+
{
274274
Fuse {
275275
stream: self,
276276
done: false,

0 commit comments

Comments
 (0)