diff --git a/src/par_stream/count.rs b/src/par_stream/count.rs new file mode 100644 index 0000000..99a66b3 --- /dev/null +++ b/src/par_stream/count.rs @@ -0,0 +1,54 @@ +use async_std::prelude::*; +use async_std::task::{self, Context, Poll}; + +use std::pin::Pin; + +use crate::ParallelStream; + +pin_project_lite::pin_project! { + /// Count the number of items of the stream. + /// + /// This `struct` is created by the [`count`] method on [`ParallelStream`]. See its + /// documentation for more. + /// + /// [`count`]: trait.ParallelStream.html#method.count + /// [`ParallelStream`]: trait.ParallelStream.html + #[derive(Clone, Debug)] + pub struct Count { + #[pin] + stream: S, + count: usize, + } +} + +impl Count { + pub(super) fn new(stream: S) -> Self { + Self { stream, count: 0 } + } +} + +impl Future for Count { + type Output = usize; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let this = self.project(); + + match task::ready!(this.stream.poll_next(cx)) { + None => Poll::Ready(*this.count), + Some(_) => { + *this.count += 1; + cx.waker().wake_by_ref(); + Poll::Pending + } + } + } +} + +#[async_std::test] +async fn smoke() { + let s = async_std::stream::repeat(5usize); + + let cnt = crate::from_stream(s).take(10).count().await; + + assert_eq!(cnt, 10); +} diff --git a/src/par_stream/mod.rs b/src/par_stream/mod.rs index 500befe..c62d239 100644 --- a/src/par_stream/mod.rs +++ b/src/par_stream/mod.rs @@ -5,11 +5,13 @@ use std::pin::Pin; use crate::FromParallelStream; +pub use count::Count; pub use for_each::ForEach; pub use map::Map; pub use next::NextFuture; pub use take::Take; +mod count; mod for_each; mod map; mod next; @@ -29,6 +31,11 @@ pub trait ParallelStream: Sized + Send + Sync + Unpin + 'static { /// Get the max concurrency limit fn get_limit(&self) -> Option; + /// Counts the number of items of this stream. + fn count(self) -> Count { + Count::new(self) + } + /// Applies `f` to each item of this stream in parallel, producing a new /// stream with the results. fn map(self, f: F) -> Map