From b8ee27db528778899cc4356563328ea5aad5ab97 Mon Sep 17 00:00:00 2001 From: Roman Proskuryakov Date: Mon, 1 Jul 2019 11:17:06 +0300 Subject: [PATCH] Implement stream combinators using async_stream_block and generators --- Cargo.toml | 1 + src/future.rs | 34 +++----- src/lib.rs | 2 +- src/stream.rs | 222 +++++++++++++++++++++----------------------------- 4 files changed, 107 insertions(+), 152 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 6128728..bd02edf 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -19,6 +19,7 @@ maintenance = { status = "experimental" } [dependencies] pin-utils = "=0.1.0-alpha.4" +futures-async-stream = "0.1.0-alpha.1" [dependencies.futures-core] version = "=0.3.0-alpha.19" diff --git a/src/future.rs b/src/future.rs index a63aac2..25003fe 100644 --- a/src/future.rs +++ b/src/future.rs @@ -1,5 +1,6 @@ use futures_core::future::Future; use futures_core::stream::Stream; +use futures_async_stream::async_stream_block; use core::pin::Pin; use core::task::{Context, Poll}; @@ -430,22 +431,13 @@ where Fut: Future, St: Stream, { - use crate::stream::next; - crate::stream::unfold((Some(future), None), async move |(future, stream)| { - match (future, stream) { - (Some(future), None) => { - let stream = future.await; - let mut stream = Box::pin(stream); - let item = next(&mut stream).await; - item.map(|item| (item, (None, Some(stream)))) - } - (None, Some(mut stream)) => { - let item = next(&mut stream).await; - item.map(|item| (item, (None, Some(stream)))) - } - _ => unreachable!(), + async_stream_block! { + let stream = future.await; + #[for_await] + for item in stream { + yield item } - }) + } } /// Convert this future into a single element stream. @@ -470,14 +462,10 @@ pub fn into_stream(future: Fut) -> impl Stream where Fut: Future, { - crate::stream::unfold(Some(future), async move |future| { - if let Some(future) = future { - let item = future.await; - Some((item, (None))) - } else { - None - } - }) + async_stream_block! { + let item = future.await; + yield item + } } /// Creates a new future wrapping around a function returning [`Poll`](core::task::Poll). diff --git a/src/lib.rs b/src/lib.rs index f120953..b6c9e91 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,4 +1,4 @@ -#![feature(async_closure, gen_future, generators)] +#![feature(gen_future, generators, proc_macro_hygiene)] pub mod future; pub mod stream; diff --git a/src/stream.rs b/src/stream.rs index 2def61c..f160c36 100644 --- a/src/stream.rs +++ b/src/stream.rs @@ -1,5 +1,6 @@ -use core::future::Future; +use futures_core::future::Future; pub use futures_core::stream::Stream; +use futures_async_stream::async_stream_block; use core::iter::IntoIterator; use core::pin::Pin; @@ -97,11 +98,13 @@ where St: Stream, F: FnMut(St::Item) -> U, { - let stream = Box::pin(stream); - unfold((stream, f), async move |(mut stream, mut f)| { - let item = next(&mut stream).await; - item.map(|item| (f(item), (stream, f))) - }) + let mut f = f; + async_stream_block! { + #[for_await] + for item in stream { + yield f(item) + } + } } /// Filters the values produced by this stream according to the provided @@ -136,18 +139,15 @@ where F: FnMut(&St::Item) -> Fut, Fut: Future, { - let stream = Box::pin(stream); - unfold((stream, f), async move |(mut stream, mut f)| { - while let Some(item) = next(&mut stream).await { - let matched = f(&item).await; - if matched { - return Some((item, (stream, f))); - } else { - continue; + let mut f = f; + async_stream_block! { + #[for_await] + for item in stream { + if f(&item).await { + yield item } } - None - }) + } } /// Filters the values produced by this stream while simultaneously mapping @@ -183,17 +183,15 @@ where F: FnMut(St::Item) -> Fut, Fut: Future>, { - let stream = Box::pin(stream); - unfold((stream, f), async move |(mut stream, mut f)| { - while let Some(item) = next(&mut stream).await { + let mut f = f; + async_stream_block! { + #[for_await] + for item in stream { if let Some(item) = f(item).await { - return Some((item, (stream, f))); - } else { - continue; + yield item } } - None - }) + } } /// Converts this stream into a future of `(next_item, tail_of_stream)`. @@ -366,18 +364,18 @@ pub fn take(stream: St, n: u64) -> impl Stream where St: Stream, { - let stream = Box::pin(stream); - unfold((stream, n), async move |(mut stream, n)| { - if n == 0 { - None - } else { - if let Some(item) = next(&mut stream).await { - Some((item, (stream, n - 1))) + let mut n = n; + async_stream_block! { + #[for_await] + for item in stream { + if n == 0 { + break; } else { - None + n = n - 1; + yield item } } - }) + } } /// Create a stream which produces the same item repeatedly. @@ -428,28 +426,15 @@ where SubSt: Stream, St: Stream, { - let stream = Box::pin(stream); - unfold( - (Some(stream), None), - async move |(mut state_stream, mut state_substream)| loop { - if let Some(mut substream) = state_substream.take() { - if let Some(item) = next(&mut substream).await { - return Some((item, (state_stream, Some(substream)))); - } else { - continue; - } - } - if let Some(mut stream) = state_stream.take() { - if let Some(substream) = next(&mut stream).await { - let substream = Box::pin(substream); - state_stream = Some(stream); - state_substream = Some(substream); - continue; - } + async_stream_block! { + #[for_await] + for substream in stream { + #[for_await] + for item in substream { + yield item } - return None; - }, - ) + } + } } /// Computes from this stream's items new items of a different type using @@ -481,16 +466,14 @@ where F: FnMut(St::Item) -> Fut, Fut: Future, { - let stream = Box::pin(stream); - unfold((stream, f), async move |(mut stream, mut f)| { - let item = next(&mut stream).await; - if let Some(item) = item { + let mut f = f; + async_stream_block! { + #[for_await] + for item in stream { let new_item = f(item).await; - Some((new_item, (stream, f))) - } else { - None + yield new_item } - }) + } } /// Creates a new stream which skips `n` items of the underlying stream. @@ -515,22 +498,18 @@ pub fn skip(stream: St, n: u64) -> impl Stream where St: Stream, { - let stream = Box::pin(stream); - unfold((stream, n), async move |(mut stream, mut n)| { - while n != 0 { - if let Some(_) = next(&mut stream).await { + let mut n = n; + async_stream_block! { + #[for_await] + for item in stream { + if n == 0 { + yield item + } else { n = n - 1; continue; - } else { - return None; } } - if let Some(item) = next(&mut stream).await { - Some((item, (stream, 0))) - } else { - None - } - }) + } } /// An adapter for zipping two streams together. @@ -559,16 +538,18 @@ where St1: Stream, St2: Stream, { - let stream = Box::pin(stream); - let other = Box::pin(other); - unfold((stream, other), async move |(mut stream, mut other)| { - let left = next(&mut stream).await; - let right = next(&mut other).await; - match (left, right) { - (Some(left), Some(right)) => Some(((left, right), (stream, other))), - _ => None, + let mut stream = Box::pin(stream); + let mut other = Box::pin(other); + async_stream_block! { + loop { + let left = next(&mut stream).await; + let right = next(&mut other).await; + match (left, right) { + (Some(left), Some(right)) => yield (left, right), + _ => break, + } } - }) + } } /// Adapter for chaining two stream. @@ -598,24 +579,16 @@ pub fn chain(stream: St, other: St) -> impl Stream where St: Stream, { - let stream = Box::pin(stream); - let other = Box::pin(other); - let start_with_first = true; - unfold( - (stream, other, start_with_first), - async move |(mut stream, mut other, start_with_first)| { - if start_with_first { - if let Some(item) = next(&mut stream).await { - return Some((item, (stream, other, start_with_first))); - } - } - if let Some(item) = next(&mut other).await { - Some((item, (stream, other, /* start_with_first */ false))) - } else { - None - } - }, - ) + async_stream_block! { + #[for_await] + for item in stream { + yield item + } + #[for_await] + for item in other { + yield item + } + } } /// Take elements from this stream while the provided asynchronous predicate @@ -644,18 +617,17 @@ where F: FnMut(&St::Item) -> Fut, Fut: Future, { - let stream = Box::pin(stream); - unfold((stream, f), async move |(mut stream, mut f)| { - if let Some(item) = next(&mut stream).await { + let mut f = f; + async_stream_block! { + #[for_await] + for item in stream { if f(&item).await { - Some((item, (stream, f))) + yield item } else { - None + break; } - } else { - None } - }) + } } /// Skip elements on this stream while the provided asynchronous predicate @@ -685,29 +657,23 @@ where F: FnMut(&St::Item) -> Fut, Fut: Future, { - let stream = Box::pin(stream); - let should_skip = true; - unfold( - (stream, f, should_skip), - async move |(mut stream, mut f, should_skip)| { - while should_skip { - if let Some(item) = next(&mut stream).await { - if f(&item).await { - continue; - } else { - return Some((item, (stream, f, /* should_skip */ false))); - } + let mut f = f; + let mut should_skip = true; + async_stream_block! { + #[for_await] + for item in stream { + if should_skip { + if f(&item).await { + continue; } else { - return None; + should_skip = false; + yield item } - } - if let Some(item) = next(&mut stream).await { - Some((item, (stream, f, /* should_skip */ false))) } else { - None + yield item } - }, - ) + } + } } /// Execute an accumulating asynchronous computation over a stream,