From 3df85e107033681569d66a5950fe22234ac09701 Mon Sep 17 00:00:00 2001 From: Justin Dennison Date: Mon, 23 Mar 2020 17:19:58 -0400 Subject: [PATCH 1/2] completed implementation of skip --- src/par_stream/mod.rs | 10 ++++++ src/par_stream/skip.rs | 73 ++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 83 insertions(+) create mode 100644 src/par_stream/skip.rs diff --git a/src/par_stream/mod.rs b/src/par_stream/mod.rs index 500befe..69dc2a7 100644 --- a/src/par_stream/mod.rs +++ b/src/par_stream/mod.rs @@ -9,11 +9,13 @@ pub use for_each::ForEach; pub use map::Map; pub use next::NextFuture; pub use take::Take; +pub use skip::Skip; mod for_each; mod map; mod next; mod take; +mod skip; /// Parallel version of the standard `Stream` trait. pub trait ParallelStream: Sized + Send + Sync + Unpin + 'static { @@ -54,6 +56,14 @@ pub trait ParallelStream: Sized + Send + Sync + Unpin + 'static { Take::new(self, n) } + /// Creates a stream that skips the first `n` elements. + fn skip(self, n: usize) -> Skip + where + Self: Sized + { + Skip::new(self, n) + } + /// Applies `f` to each item of this stream in parallel. fn for_each(self, f: F) -> ForEach where diff --git a/src/par_stream/skip.rs b/src/par_stream/skip.rs new file mode 100644 index 0000000..eba8760 --- /dev/null +++ b/src/par_stream/skip.rs @@ -0,0 +1,73 @@ +use core::pin::Pin; +use core::task::{Context, Poll}; + +use async_std::task::ready; +use pin_project_lite::pin_project; + +use crate::ParallelStream; + +pin_project! { + /// A stream that skips the first `n` items of another stream. + /// + /// This `struct` is created by the [`skip`] method on [`ParallelStream`]. See its + /// documentation for more. + /// + /// [`skip`]: trait.ParallelStream.html#method.take + /// [`ParallelStream`]: trait.ParallelStream.html + #[derive(Clone, Debug)] + pub struct Skip { + #[pin] + stream: S, + skipped: usize, + limit: Option, + } +} + +impl Skip { + pub(super) fn new(stream: S, skipped: usize) -> Self { + Self { + limit: stream.get_limit(), + skipped, + stream, + } + } +} + +impl ParallelStream for Skip { + type Item = S::Item; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let this = self.project(); + if *this.skipped > 0 { + let next = ready!(this.stream.poll_next(cx)); + match next { + Some(_) => *this.skipped -= 1, + None => *this.skipped = 0, + } + Poll::Ready(next) + } else { + Poll::Ready(None) + } + } + + fn limit(mut self, limit: impl Into>) -> Self { + self.limit = limit.into(); + self + } + + fn get_limit(&self) -> Option { + self.limit + } +} + +#[async_std::test] +async fn smoke() { + use async_std::prelude::*; + let s = async_std::stream::from_iter(vec![1, 2, 3, 4, 5, 6]).skip(3); + let mut output = vec![]; + let mut stream = crate::from_stream(s); + while let Some(n) = stream.next().await { + output.push(n); + } + assert_eq!(output, vec![4, 5, 6]); +} From 921fed37d0153b6e3d7f5a45fcf2975302dfaad4 Mon Sep 17 00:00:00 2001 From: Justin Dennison Date: Tue, 24 Mar 2020 09:41:58 -0400 Subject: [PATCH 2/2] fixed some errors with typos and misuse of when testing --- src/par_stream/skip.rs | 58 +++++++++++++++++++++--------------------- 1 file changed, 29 insertions(+), 29 deletions(-) diff --git a/src/par_stream/skip.rs b/src/par_stream/skip.rs index eba8760..278e5df 100644 --- a/src/par_stream/skip.rs +++ b/src/par_stream/skip.rs @@ -1,7 +1,7 @@ +use async_std::sync::{self, Receiver}; +use async_std::task; use core::pin::Pin; use core::task::{Context, Poll}; - -use async_std::task::ready; use pin_project_lite::pin_project; use crate::ParallelStream; @@ -12,42 +12,43 @@ pin_project! { /// This `struct` is created by the [`skip`] method on [`ParallelStream`]. See its /// documentation for more. /// - /// [`skip`]: trait.ParallelStream.html#method.take + /// [`skip`]: trait.ParallelStream.html#method.skip /// [`ParallelStream`]: trait.ParallelStream.html #[derive(Clone, Debug)] - pub struct Skip { + pub struct Skip { #[pin] - stream: S, - skipped: usize, + receiver: Receiver, limit: Option, } } -impl Skip { - pub(super) fn new(stream: S, skipped: usize) -> Self { - Self { - limit: stream.get_limit(), - skipped, - stream, - } +impl Skip { + pub(super) fn new(mut stream: S, mut skipped: usize) -> Self + where + S: ParallelStream + { + let limit = stream.get_limit(); + let (sender, receiver) = sync::channel(1); + task::spawn(async move { + while let Some(val) = stream.next().await { + if skipped == 0 { + sender.send(val).await + } else { + skipped -= 1; + } + } + }); + + Skip { limit, receiver } } } -impl ParallelStream for Skip { - type Item = S::Item; +impl ParallelStream for Skip { + type Item = T; - fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { let this = self.project(); - if *this.skipped > 0 { - let next = ready!(this.stream.poll_next(cx)); - match next { - Some(_) => *this.skipped -= 1, - None => *this.skipped = 0, - } - Poll::Ready(next) - } else { - Poll::Ready(None) - } + this.receiver.poll_next(cx) } fn limit(mut self, limit: impl Into>) -> Self { @@ -62,10 +63,9 @@ impl ParallelStream for Skip { #[async_std::test] async fn smoke() { - use async_std::prelude::*; - let s = async_std::stream::from_iter(vec![1, 2, 3, 4, 5, 6]).skip(3); + let s = async_std::stream::from_iter(vec![1, 2, 3, 4, 5, 6]); let mut output = vec![]; - let mut stream = crate::from_stream(s); + let mut stream = crate::from_stream(s).skip(3); while let Some(n) = stream.next().await { output.push(n); }