Skip to content

Commit 4f9e7d3

Browse files
bors[bot]Tyler Neelyyoshuawuyts
authored
Merge #40
40: Add initial Fuse implementation for Stream r=yoshuawuyts a=spacejam @matklad does this address your use case? Co-authored-by: Tyler Neely <[email protected]> Co-authored-by: Yoshua Wuyts <[email protected]>
2 parents af6ed7d + aa94d45 commit 4f9e7d3

File tree

3 files changed

+65
-1
lines changed

3 files changed

+65
-1
lines changed

src/stream/mod.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ pub use from_stream::FromStream;
2727
pub use into_stream::IntoStream;
2828
pub use once::{once, Once};
2929
pub use repeat::{repeat, Repeat};
30-
pub use stream::{Scan, Stream, Take, Zip};
30+
pub use stream::{Fuse, Scan, Stream, Take, Zip};
3131

3232
mod double_ended_stream;
3333
mod empty;

src/stream/stream/fuse.rs

+33
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
use std::pin::Pin;
2+
use std::task::{Context, Poll};
3+
4+
/// A `Stream` that is permanently closed once a single call to `poll` results in
5+
/// `Poll::Ready(None)`, returning `Poll::Ready(None)` for all future calls to `poll`.
6+
#[derive(Clone, Debug)]
7+
pub struct Fuse<S> {
8+
pub(crate) stream: S,
9+
pub(crate) done: bool,
10+
}
11+
12+
impl<S: Unpin> Unpin for Fuse<S> {}
13+
14+
impl<S: futures_core::Stream> Fuse<S> {
15+
pin_utils::unsafe_pinned!(stream: S);
16+
pin_utils::unsafe_unpinned!(done: bool);
17+
}
18+
19+
impl<S: futures_core::Stream> futures_core::Stream for Fuse<S> {
20+
type Item = S::Item;
21+
22+
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<S::Item>> {
23+
if self.done {
24+
Poll::Ready(None)
25+
} else {
26+
let next = futures_core::ready!(self.as_mut().stream().poll_next(cx));
27+
if next.is_none() {
28+
*self.as_mut().done() = true;
29+
}
30+
Poll::Ready(next)
31+
}
32+
}
33+
}

src/stream/stream/mod.rs

+31
Original file line numberDiff line numberDiff line change
@@ -28,13 +28,15 @@ mod filter_map;
2828
mod find;
2929
mod find_map;
3030
mod fold;
31+
mod fuse;
3132
mod min_by;
3233
mod next;
3334
mod nth;
3435
mod scan;
3536
mod take;
3637
mod zip;
3738

39+
pub use fuse::Fuse;
3840
pub use scan::Scan;
3941
pub use take::Take;
4042
pub use zip::Zip;
@@ -246,6 +248,35 @@ pub trait Stream {
246248
Enumerate::new(self)
247249
}
248250

251+
/// Transforms this `Stream` into a "fused" `Stream` such that after the first time `poll`
252+
/// returns `Poll::Ready(None)`, all future calls to `poll` will also return
253+
/// `Poll::Ready(None)`.
254+
///
255+
/// # Examples
256+
///
257+
/// ```
258+
/// # fn main() { async_std::task::block_on(async {
259+
/// #
260+
/// use async_std::prelude::*;
261+
/// use async_std::stream;
262+
///
263+
/// let mut s = stream::once(1).fuse();
264+
/// assert_eq!(s.next().await, Some(1));
265+
/// assert_eq!(s.next().await, None);
266+
/// assert_eq!(s.next().await, None);
267+
/// #
268+
/// # }) }
269+
/// ```
270+
fn fuse(self) -> Fuse<Self>
271+
where
272+
Self: Sized,
273+
{
274+
Fuse {
275+
stream: self,
276+
done: false,
277+
}
278+
}
279+
249280
/// Both filters and maps a stream.
250281
///
251282
/// # Examples

0 commit comments

Comments
 (0)