From cd90b82943b006ca0e5401d7615de510e063994c Mon Sep 17 00:00:00 2001 From: Roman Proskuryakov Date: Wed, 21 Aug 2019 11:10:31 +0300 Subject: [PATCH] Remove async_await feature gate --- .travis.yml | 4 +- README.md | 2 +- examples/future.rs | 8 +- examples/stream.rs | 6 +- src/future.rs | 109 +++++++-------- src/lib.rs | 2 +- src/stream.rs | 325 ++++++++++++++++++++++++++------------------- 7 files changed, 247 insertions(+), 209 deletions(-) diff --git a/.travis.yml b/.travis.yml index 3f4341c..669ca72 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,7 +1,7 @@ language: rust sudo: false rust: - - nightly-2019-08-12 + - nightly-2019-08-21 os: - linux @@ -12,7 +12,7 @@ script: matrix: include: - os: linux - rust: nightly-2019-08-12 + rust: nightly-2019-08-21 sudo: required name: coverage addons: # needed for `cargo install cargo-travis` diff --git a/README.md b/README.md index ea3198a..ca293f8 100644 --- a/README.md +++ b/README.md @@ -10,7 +10,7 @@ like `FutureExt::map`, `TryFutureExt::and_then`... # Requirements -Rust nightly-2019-08-12 for async_await. +Rust nightly-2019-08-21 for async_await. # State diff --git a/examples/future.rs b/examples/future.rs index cfb628c..bdffa09 100644 --- a/examples/future.rs +++ b/examples/future.rs @@ -1,13 +1,13 @@ -#![feature(async_await)] - -use futures_async_combinators::future::*; use futures::executor; +use futures_async_combinators::future::*; fn main() { executor::block_on(async { let future = ready(Ok::(1)); let future = and_then(future, |x| ready(Ok::(x + 3))); - let future = inspect(future, |x| { dbg!(x); }); + let future = inspect(future, |x| { + dbg!(x); + }); assert_eq!(future.await, Ok(4)); }); } diff --git a/examples/stream.rs b/examples/stream.rs index 35fd5ad..23fc9ec 100644 --- a/examples/stream.rs +++ b/examples/stream.rs @@ -1,7 +1,5 @@ -#![feature(async_await)] - -use futures_async_combinators::stream::*; use futures::executor; +use futures_async_combinators::stream::*; fn main() { let stream = iter(1..=3); @@ -9,7 +7,7 @@ fn main() { let stream = map(stream, |x| x * 2); let collect_future = collect(stream); - let collection : Vec<_> = executor::block_on(collect_future); + let collection: Vec<_> = executor::block_on(collect_future); assert_eq!(vec![4, 6, 8], collection); } diff --git a/src/future.rs b/src/future.rs index 72a1b9c..e5b53b8 100644 --- a/src/future.rs +++ b/src/future.rs @@ -1,14 +1,13 @@ use futures::future::Future; use futures::stream::Stream; -use core::task::{Poll, Context}; +use core::task::{Context, Poll}; /// Create a future that is immediately ready with a value. /// /// # Examples /// /// ``` -/// #![feature(async_await)] /// # futures::executor::block_on(async { /// use futures_async_combinators::future; /// @@ -34,7 +33,6 @@ pub async fn ready(value: T) -> T { /// # Examples /// /// ``` -/// #![feature(async_await)] /// # futures::executor::block_on(async { /// use futures_async_combinators::future::{ready, map}; /// @@ -44,8 +42,9 @@ pub async fn ready(value: T) -> T { /// # }); /// ``` pub async fn map(future: Fut, f: F) -> U - where F: FnOnce(Fut::Output) -> U, - Fut: Future, +where + F: FnOnce(Fut::Output) -> U, + Fut: Future, { f(future.await) } @@ -66,7 +65,6 @@ pub async fn map(future: Fut, f: F) -> U /// # Examples /// /// ``` -/// #![feature(async_await)] /// # futures::executor::block_on(async { /// use futures_async_combinators::future::{ready, then}; /// @@ -76,9 +74,10 @@ pub async fn map(future: Fut, f: F) -> U /// # }); /// ``` pub async fn then(future: FutA, f: F) -> FutB::Output - where F: FnOnce(FutA::Output) -> FutB, - FutA: Future, - FutB: Future, +where + F: FnOnce(FutA::Output) -> FutB, + FutA: Future, + FutB: Future, { let new_future = f(future.await); new_future.await @@ -99,7 +98,6 @@ pub async fn then(future: FutA, f: F) -> FutB::Output /// # Examples /// /// ``` -/// #![feature(async_await)] /// use futures_async_combinators::future::{ready, and_then}; /// /// # futures::executor::block_on(async { @@ -113,7 +111,6 @@ pub async fn then(future: FutA, f: F) -> FutB::Output /// effect: /// /// ``` -/// #![feature(async_await)] /// use futures_async_combinators::future::{ready, and_then}; /// /// # futures::executor::block_on(async { @@ -123,15 +120,16 @@ pub async fn then(future: FutA, f: F) -> FutB::Output /// # }); /// ``` pub async fn and_then(future: FutA, f: F) -> Result - where F: FnOnce(T) -> FutB, - FutA: Future>, - FutB: Future>, +where + F: FnOnce(T) -> FutB, + FutA: Future>, + FutB: Future>, { match future.await { Ok(ok) => { let new_future = f(ok); new_future.await - }, + } Err(err) => Err(err), } } @@ -151,7 +149,6 @@ pub async fn and_then(future: FutA, f: F) -> Result(future: FutA, f: F) -> Result(future: FutA, f: F) -> Result(future: FutA, f: F) -> Result - where F: FnOnce(E) -> FutB, - FutA: Future>, - FutB: Future>, +where + F: FnOnce(E) -> FutB, + FutA: Future>, + FutB: Future>, { match future.await { Ok(ok) => Ok(ok), Err(err) => { let new_future = f(err); new_future.await - }, + } } } @@ -205,7 +202,6 @@ pub async fn or_else(future: FutA, f: F) -> Result /// # Examples /// /// ``` -/// #![feature(async_await)] /// use futures_async_combinators::future::{ready, map_ok}; /// /// # futures::executor::block_on(async { @@ -219,7 +215,6 @@ pub async fn or_else(future: FutA, f: F) -> Result /// effect: /// /// ``` -/// #![feature(async_await)] /// use futures_async_combinators::future::{ready, map_ok}; /// /// # futures::executor::block_on(async { @@ -229,8 +224,9 @@ pub async fn or_else(future: FutA, f: F) -> Result /// # }); /// ``` pub async fn map_ok(future: Fut, f: F) -> Result - where F: FnOnce(T) -> U, - Fut: Future>, +where + F: FnOnce(T) -> U, + Fut: Future>, { future.await.map(f) } @@ -253,7 +249,6 @@ pub async fn map_ok(future: Fut, f: F) -> Result /// # Examples /// /// ``` -/// #![feature(async_await)] /// use futures_async_combinators::future::{ready, map_err}; /// /// # futures::executor::block_on(async { @@ -267,7 +262,6 @@ pub async fn map_ok(future: Fut, f: F) -> Result /// no effect: /// /// ``` -/// #![feature(async_await)] /// use futures_async_combinators::future::{ready, map_err}; /// /// # futures::executor::block_on(async { @@ -277,8 +271,9 @@ pub async fn map_ok(future: Fut, f: F) -> Result /// # }); /// ``` pub async fn map_err(future: Fut, f: F) -> Result - where F: FnOnce(E) -> U, - Fut: Future>, +where + F: FnOnce(E) -> U, + Fut: Future>, { future.await.map_err(f) } @@ -300,7 +295,6 @@ pub async fn map_err(future: Fut, f: F) -> Result /// # Examples /// /// ``` -/// #![feature(async_await)] /// # futures::executor::block_on(async { /// use futures_async_combinators::future::{ready, flatten}; /// @@ -310,8 +304,9 @@ pub async fn map_err(future: Fut, f: F) -> Result /// # }); /// ``` pub async fn flatten(future: FutA) -> FutB::Output - where FutA: Future, - FutB: Future, +where + FutA: Future, + FutB: Future, { let nested_future = future.await; nested_future.await @@ -327,7 +322,6 @@ pub async fn flatten(future: FutA) -> FutB::Output /// # Examples /// /// ``` -/// #![feature(async_await)] /// # futures::executor::block_on(async { /// use futures_async_combinators::future::{ready, inspect}; /// @@ -337,8 +331,9 @@ pub async fn flatten(future: FutA) -> FutB::Output /// # }); /// ``` pub async fn inspect(future: Fut, f: F) -> Fut::Output - where Fut: Future, - F: FnOnce(&Fut::Output), +where + Fut: Future, + F: FnOnce(&Fut::Output), { let future_result = future.await; f(&future_result); @@ -360,7 +355,6 @@ pub async fn inspect(future: Fut, f: F) -> Fut::Output /// # Examples /// /// ``` -/// #![feature(async_await)] /// use futures_async_combinators::future::{ready, err_into}; /// /// # futures::executor::block_on(async { @@ -368,9 +362,10 @@ pub async fn inspect(future: Fut, f: F) -> Fut::Output /// let future_err_i32 = err_into::(future_err_u8); /// # }); /// ``` -pub async fn err_into(future: Fut) -> Result - where Fut: Future>, - E: Into, +pub async fn err_into(future: Fut) -> Result +where + Fut: Future>, + E: Into, { future.await.map_err(Into::into) } @@ -388,7 +383,6 @@ pub async fn err_into(future: Fut) -> Result /// # Examples /// /// ``` -/// #![feature(async_await)] /// use futures_async_combinators::future::{ready, unwrap_or_else}; /// /// # futures::executor::block_on(async { @@ -398,8 +392,9 @@ pub async fn err_into(future: Fut) -> Result /// # }); /// ``` pub async fn unwrap_or_else(future: Fut, f: F) -> T - where Fut: Future>, - F: FnOnce(E) -> T, +where + Fut: Future>, + F: FnOnce(E) -> T, { future.await.unwrap_or_else(f) } @@ -417,7 +412,6 @@ pub async fn unwrap_or_else(future: Fut, f: F) -> T /// # Examples /// /// ``` -/// #![feature(async_await)] /// # futures::executor::block_on(async { /// use futures_async_combinators::future::{ready, flatten_stream}; /// use futures_async_combinators::stream::{collect, iter}; @@ -431,23 +425,24 @@ pub async fn unwrap_or_else(future: Fut, f: F) -> T /// # }); /// ``` pub fn flatten_stream(future: Fut) -> impl Stream - where Fut: Future, - St: Stream, +where + Fut: Future, + St: Stream, { use crate::stream::next; - futures::stream::unfold((Some(future), None), async move | (future, stream)| { + futures::stream::unfold((Some(future), None), async move |(future, stream)| { match (future, stream) { (Some(future), None) => { let stream = future.await; let mut stream = Box::pin(stream); let item = next(&mut stream).await; item.map(|item| (item, (None, Some(stream)))) - }, + } (None, Some(mut stream)) => { let item = next(&mut stream).await; item.map(|item| (item, (None, Some(stream)))) - }, - _ => unreachable!() + } + _ => unreachable!(), } }) } @@ -460,7 +455,6 @@ pub fn flatten_stream(future: Fut) -> impl Stream /// # Examples /// /// ``` -/// #![feature(async_await)] /// # futures::executor::block_on(async { /// use futures_async_combinators::future::{ready, into_stream}; /// use futures_async_combinators::stream::collect; @@ -472,7 +466,8 @@ pub fn flatten_stream(future: Fut) -> impl Stream /// # }); /// ``` pub fn into_stream(future: Fut) -> impl Stream - where Fut: Future, +where + Fut: Future, { futures::stream::unfold(Some(future), async move |future| { if let Some(future) = future { @@ -491,7 +486,6 @@ pub fn into_stream(future: Fut) -> impl Stream /// # Examples /// /// ``` -/// #![feature(async_await)] /// # futures::executor::block_on(async { /// use futures_async_combinators::future::poll_fn; /// use core::task::{Poll, Context}; @@ -505,7 +499,8 @@ pub fn into_stream(future: Fut) -> impl Stream /// # }); /// ``` pub fn poll_fn(f: F) -> impl Future - where F: FnMut(&mut Context) -> Poll, +where + F: FnMut(&mut Context) -> Poll, { use std::future::from_generator; use std::future::get_task_context; @@ -523,8 +518,8 @@ pub fn poll_fn(f: F) -> impl Future #[cfg(test)] mod tests { - use futures::executor; use crate::future::*; + use futures::executor; #[test] fn test_ready() { @@ -661,13 +656,11 @@ mod tests { #[test] fn test_poll_fn() { executor::block_on(async { - let read_line = |_context: &mut Context| -> Poll { - Poll::Ready("Hello, World!".into()) - }; + let read_line = + |_context: &mut Context| -> Poll { Poll::Ready("Hello, World!".into()) }; let read_future = poll_fn(read_line); assert_eq!(read_future.await, "Hello, World!".to_owned()); }); } - } diff --git a/src/lib.rs b/src/lib.rs index aa29d3d..f120953 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,4 +1,4 @@ -#![feature(async_await, async_closure, gen_future, generators)] +#![feature(async_closure, gen_future, generators)] pub mod future; pub mod stream; diff --git a/src/stream.rs b/src/stream.rs index 954070f..ed29689 100644 --- a/src/stream.rs +++ b/src/stream.rs @@ -1,8 +1,8 @@ -pub use futures::stream::Stream; use futures::future::Future; +pub use futures::stream::Stream; -use core::pin::Pin; use core::iter::IntoIterator; +use core::pin::Pin; use pin_utils::pin_mut; @@ -18,7 +18,6 @@ use pin_utils::pin_mut; /// # Examples /// /// ``` -/// #![feature(async_await)] /// # futures::executor::block_on(async { /// use futures_async_combinators::stream::{iter, next}; /// @@ -31,7 +30,8 @@ use pin_utils::pin_mut; /// # }); /// ``` pub async fn next(stream: &mut St) -> Option - where St: Stream + Unpin, +where + St: Stream + Unpin, { use crate::future::poll_fn; let future_next = poll_fn(|context| Pin::new(&mut *stream).poll_next(context)); @@ -46,7 +46,6 @@ pub async fn next(stream: &mut St) -> Option /// # Examples /// /// ``` -/// #![feature(async_await)] /// # futures::executor::block_on(async { /// use futures_async_combinators::stream::{iter, collect}; /// @@ -57,8 +56,9 @@ pub async fn next(stream: &mut St) -> Option /// # }); /// ``` pub async fn collect(stream: St) -> C - where St: Stream, - C: Default + Extend +where + St: Stream, + C: Default + Extend, { pin_mut!(stream); let mut collection = C::default(); @@ -82,7 +82,6 @@ pub async fn collect(stream: St) -> C /// # Examples /// /// ``` -/// #![feature(async_await)] /// # futures::executor::block_on(async { /// use futures_async_combinators::stream::{iter, map, collect}; /// @@ -94,11 +93,12 @@ pub async fn collect(stream: St) -> C /// # }); /// ``` pub fn map(stream: St, f: F) -> impl Stream - where St: Stream, - F: FnMut(St::Item) -> U, +where + St: Stream, + F: FnMut(St::Item) -> U, { let stream = Box::pin(stream); - unfold((stream, f), async move | (mut stream, mut f)| { + unfold((stream, f), async move |(mut stream, mut f)| { let item = next(&mut stream).await; item.map(|item| (f(item), (stream, f))) }) @@ -120,7 +120,6 @@ pub fn map(stream: St, f: F) -> impl Stream /// # Examples /// /// ``` -/// #![feature(async_await)] /// # futures::executor::block_on(async { /// use futures_async_combinators::{future::ready, stream::{iter, filter, collect}}; /// @@ -132,20 +131,21 @@ pub fn map(stream: St, f: F) -> impl Stream /// # }); /// ``` pub fn filter(stream: St, f: F) -> impl Stream - where St: Stream, - F: FnMut(&St::Item) -> Fut, - Fut: Future +where + St: Stream, + F: FnMut(&St::Item) -> Fut, + Fut: Future, { let stream = Box::pin(stream); - unfold((stream, f), async move | (mut stream, mut f)| { + unfold((stream, f), async move |(mut stream, mut f)| { while let Some(item) = next(&mut stream).await { let matched = f(&item).await; if matched { - return Some((item, (stream, f))) + return Some((item, (stream, f))); } else { continue; } - }; + } None }) } @@ -164,7 +164,6 @@ pub fn filter(stream: St, f: F) -> impl Stream /// /// # Examples /// ``` -/// #![feature(async_await)] /// # futures::executor::block_on(async { /// use futures_async_combinators::{future::ready, stream::{iter, filter_map, collect}}; /// @@ -179,19 +178,20 @@ pub fn filter(stream: St, f: F) -> impl Stream /// # }); /// ``` pub fn filter_map(stream: St, f: F) -> impl Stream - where St: Stream, - F: FnMut(St::Item) -> Fut, - Fut: Future> +where + St: Stream, + F: FnMut(St::Item) -> Fut, + Fut: Future>, { let stream = Box::pin(stream); - unfold((stream, f), async move | (mut stream, mut f)| { + unfold((stream, f), async move |(mut stream, mut f)| { while let Some(item) = next(&mut stream).await { if let Some(item) = f(item).await { - return Some((item, (stream, f))) + return Some((item, (stream, f))); } else { continue; } - }; + } None }) } @@ -212,7 +212,6 @@ pub fn filter_map(stream: St, f: F) -> impl Stream /// # Examples /// /// ``` -/// #![feature(async_await)] /// # futures::executor::block_on(async { /// use futures_async_combinators::stream::{iter, into_future}; /// @@ -229,7 +228,8 @@ pub fn filter_map(stream: St, f: F) -> impl Stream /// # }); /// ``` pub async fn into_future(stream: St) -> (Option, impl Stream) - where St: Stream + Unpin, +where + St: Stream + Unpin, { let mut stream = stream; let next_item = next(&mut stream).await; @@ -243,7 +243,6 @@ pub async fn into_future(stream: St) -> (Option, impl Stream(stream: St) -> (Option, impl Stream(i: I) -> impl Stream - where I: IntoIterator, +where + I: IntoIterator, { use core::task::Poll; let mut iter = i.into_iter(); - futures::stream::poll_fn(move |_| -> Poll> { - Poll::Ready(iter.next()) - }) + futures::stream::poll_fn(move |_| -> Poll> { Poll::Ready(iter.next()) }) } /// Concatenate all items of a stream into a single extendable @@ -275,7 +273,6 @@ pub fn iter(i: I) -> impl Stream /// # Examples /// /// ``` -/// #![feature(async_await)] /// # futures::executor::block_on(async { /// use futures_async_combinators::stream::{iter, concat}; /// @@ -286,10 +283,11 @@ pub fn iter(i: I) -> impl Stream /// # }); /// ``` pub async fn concat(stream: St) -> St::Item - where St: Stream, - St::Item: Extend<::Item>, - St::Item: IntoIterator, - St::Item: Default, +where + St: Stream, + St::Item: Extend<::Item>, + St::Item: IntoIterator, + St::Item: Default, { pin_mut!(stream); let mut collection = ::default(); @@ -315,7 +313,6 @@ pub async fn concat(stream: St) -> St::Item /// # Examples /// /// ``` -/// #![feature(async_await)] /// # futures::executor::block_on(async { /// use futures::future; /// use futures_async_combinators::{future::ready, stream::{repeat, take, for_each}}; @@ -336,7 +333,8 @@ pub async fn concat(stream: St) -> St::Item /// # }); /// ``` pub async fn for_each(stream: St, f: F) -> () - where St: Stream, +where + St: Stream, F: FnMut(St::Item) -> Fut, Fut: Future, { @@ -347,7 +345,6 @@ pub async fn for_each(stream: St, f: F) -> () } } - /// Creates a new stream of at most `n` items of the underlying stream. /// /// Once `n` items have been yielded from this stream then it will always @@ -356,7 +353,6 @@ pub async fn for_each(stream: St, f: F) -> () /// # Examples /// /// ``` -/// #![feature(async_await)] /// # futures::executor::block_on(async { /// use futures_async_combinators::stream::{iter, take, collect}; /// @@ -368,10 +364,11 @@ pub async fn for_each(stream: St, f: F) -> () /// # }); /// ``` pub fn take(stream: St, n: u64) -> impl Stream - where St: Stream, +where + St: Stream, { let stream = Box::pin(stream); - unfold((stream, n), async move | (mut stream, n)| { + unfold((stream, n), async move |(mut stream, n)| { if n == 0 { None } else { @@ -391,7 +388,6 @@ pub fn take(stream: St, n: u64) -> impl Stream /// available memory as it tries to just fill up all RAM. /// /// ``` -/// #![feature(async_await)] /// # futures::executor::block_on(async { /// use futures_async_combinators::stream::{repeat, take, collect}; /// @@ -403,12 +399,11 @@ pub fn take(stream: St, n: u64) -> impl Stream /// # }); /// ``` pub fn repeat(item: T) -> impl Stream - where T: Clone, +where + T: Clone, { use core::task::Poll; - futures::stream::poll_fn(move |_| -> Poll> { - Poll::Ready(Some(item.clone())) - }) + futures::stream::poll_fn(move |_| -> Poll> { Poll::Ready(Some(item.clone())) }) } /// Flattens a stream of streams into just one continuous stream. @@ -416,7 +411,6 @@ pub fn repeat(item: T) -> impl Stream /// # Examples /// /// ``` -/// #![feature(async_await)] /// # futures::executor::block_on(async { /// use futures_async_combinators::stream::{iter, flatten, collect}; /// @@ -432,15 +426,17 @@ pub fn repeat(item: T) -> impl Stream /// # }); /// ``` pub fn flatten(stream: St) -> impl Stream - where SubSt: Stream, - St: Stream, +where + SubSt: Stream, + St: Stream, { let stream = Box::pin(stream); - unfold((Some(stream), None), async move | (mut state_stream, mut state_substream)| { - loop { + unfold( + (Some(stream), None), + async move |(mut state_stream, mut state_substream)| loop { if let Some(mut substream) = state_substream.take() { if let Some(item) = next(&mut substream).await { - return Some((item, (state_stream, Some(substream)))) + return Some((item, (state_stream, Some(substream)))); } else { continue; } @@ -454,8 +450,8 @@ pub fn flatten(stream: St) -> impl Stream } } return None; - } - }) + }, + ) } /// Computes from this stream's items new items of a different type using @@ -471,7 +467,6 @@ pub fn flatten(stream: St) -> impl Stream /// # Examples /// /// ``` -/// #![feature(async_await)] /// # futures::executor::block_on(async { /// use futures_async_combinators::{future::ready, stream::{iter, then, collect}}; /// @@ -483,12 +478,13 @@ pub fn flatten(stream: St) -> impl Stream /// # }); /// ``` pub fn then(stream: St, f: F) -> impl Stream - where St: Stream, - F: FnMut(St::Item) -> Fut, - Fut: Future +where + St: Stream, + F: FnMut(St::Item) -> Fut, + Fut: Future, { let stream = Box::pin(stream); - unfold((stream, f), async move | (mut stream, mut f)| { + unfold((stream, f), async move |(mut stream, mut f)| { let item = next(&mut stream).await; if let Some(item) = item { let new_item = f(item).await; @@ -507,7 +503,6 @@ pub fn then(stream: St, f: F) -> impl Stream /// # Examples /// /// ``` -/// #![feature(async_await)] /// # futures::executor::block_on(async { /// use futures_async_combinators::stream::{iter, skip, collect}; /// @@ -519,16 +514,17 @@ pub fn then(stream: St, f: F) -> impl Stream /// # }); /// ``` pub fn skip(stream: St, n: u64) -> impl Stream - where St: Stream, +where + St: Stream, { let stream = Box::pin(stream); - unfold((stream, n), async move | (mut stream, mut n)| { + unfold((stream, n), async move |(mut stream, mut n)| { while n != 0 { if let Some(_) = next(&mut stream).await { n = n - 1; - continue + continue; } else { - return None + return None; } } if let Some(item) = next(&mut stream).await { @@ -548,7 +544,6 @@ pub fn skip(stream: St, n: u64) -> impl Stream /// # Examples /// /// ``` -/// #![feature(async_await)] /// # futures::executor::block_on(async { /// use futures_async_combinators::stream::{iter, zip, collect}; /// @@ -562,17 +557,18 @@ pub fn skip(stream: St, n: u64) -> impl Stream /// ``` /// pub fn zip(stream: St1, other: St2) -> impl Stream - where St1: Stream, - St2: Stream, +where + St1: Stream, + St2: Stream, { let stream = Box::pin(stream); let other = Box::pin(other); - unfold((stream, other), async move | (mut stream, mut other)| { + unfold((stream, other), async move |(mut stream, mut other)| { let left = next(&mut stream).await; let right = next(&mut other).await; match (left, right) { (Some(left), Some(right)) => Some(((left, right), (stream, other))), - _ => None + _ => None, } }) } @@ -583,7 +579,6 @@ pub fn zip(stream: St1, other: St2) -> impl Stream(stream: St1, other: St2) -> impl Stream(stream: St, other: St) -> impl Stream - where St: Stream, +where + St: Stream, { let stream = Box::pin(stream); let other = Box::pin(other); let start_with_first = true; - unfold((stream, other, start_with_first), async move | (mut stream, mut other, start_with_first)| { - if start_with_first { - if let Some(item) = next(&mut stream).await { - return Some((item, (stream, other, start_with_first))) + unfold( + (stream, other, start_with_first), + async move |(mut stream, mut other, start_with_first)| { + if start_with_first { + if let Some(item) = next(&mut stream).await { + return Some((item, (stream, other, start_with_first))); + } } - } - if let Some(item) = next(&mut other).await { - Some((item, (stream, other, /* start_with_first */ false))) - } else { - None - } - }) + if let Some(item) = next(&mut other).await { + Some((item, (stream, other, /* start_with_first */ false))) + } else { + None + } + }, + ) } /// Take elements from this stream while the provided asynchronous predicate @@ -631,7 +630,6 @@ pub fn chain(stream: St, other: St) -> impl Stream /// # Examples /// /// ``` -/// #![feature(async_await)] /// # futures::executor::block_on(async { /// use futures_async_combinators::{future::ready, stream::{iter, take_while, collect}}; /// @@ -643,12 +641,13 @@ pub fn chain(stream: St, other: St) -> impl Stream /// # }); /// ``` pub fn take_while(stream: St, f: F) -> impl Stream - where St: Stream, - F: FnMut(&St::Item) -> Fut, - Fut: Future, +where + St: Stream, + F: FnMut(&St::Item) -> Fut, + Fut: Future, { let stream = Box::pin(stream); - unfold((stream, f), async move | (mut stream, mut f)| { + unfold((stream, f), async move |(mut stream, mut f)| { if let Some(item) = next(&mut stream).await { if f(&item).await { Some((item, (stream, f))) @@ -672,7 +671,6 @@ pub fn take_while(stream: St, f: F) -> impl Stream /// # Examples /// /// ``` -/// #![feature(async_await)] /// # futures::executor::block_on(async { /// use futures_async_combinators::{future::ready, stream::{iter, skip_while, collect}}; /// @@ -684,30 +682,34 @@ pub fn take_while(stream: St, f: F) -> impl Stream /// # }); /// ``` pub fn skip_while(stream: St, f: F) -> impl Stream - where St: Stream, - F: FnMut(&St::Item) -> Fut, - Fut: Future, +where + St: Stream, + F: FnMut(&St::Item) -> Fut, + Fut: Future, { let stream = Box::pin(stream); let should_skip = true; - unfold((stream, f, should_skip), async move | (mut stream, mut f, should_skip)| { - while should_skip { - if let Some(item) = next(&mut stream).await { - if f(&item).await { - continue; + unfold( + (stream, f, should_skip), + async move |(mut stream, mut f, should_skip)| { + while should_skip { + if let Some(item) = next(&mut stream).await { + if f(&item).await { + continue; + } else { + return Some((item, (stream, f, /* should_skip */ false))); + } } else { - return Some((item, (stream, f, /* should_skip */ false))) + return None; } + } + if let Some(item) = next(&mut stream).await { + Some((item, (stream, f, /* should_skip */ false))) } else { - return None + None } - } - if let Some(item) = next(&mut stream).await { - Some((item, (stream, f, /* should_skip */ false))) - } else { - None - } - }) + }, + ) } /// Execute an accumulating asynchronous computation over a stream, @@ -722,7 +724,6 @@ pub fn skip_while(stream: St, f: F) -> impl Stream /// # Examples /// /// ``` -/// #![feature(async_await)] /// # futures::executor::block_on(async { /// use futures_async_combinators::{future::ready, stream::{iter, fold}}; /// @@ -732,9 +733,10 @@ pub fn skip_while(stream: St, f: F) -> impl Stream /// # }); /// ``` pub async fn fold(stream: St, init: T, f: F) -> T - where St: Stream, - F: FnMut(T, St::Item) -> Fut, - Fut: Future, +where + St: Stream, + F: FnMut(T, St::Item) -> Fut, + Fut: Future, { pin_mut!(stream); let mut f = f; @@ -771,7 +773,6 @@ pub async fn fold(stream: St, init: T, f: F) -> T /// # Example /// /// ``` -/// #![feature(async_await)] /// # futures::executor::block_on(async { /// use futures_async_combinators::{future::ready, stream::{unfold, collect}}; /// @@ -790,8 +791,9 @@ pub async fn fold(stream: St, init: T, f: F) -> T /// # }); /// ``` pub fn unfold(init: T, mut f: F) -> impl Stream - where F: FnMut(T) -> Fut, - Fut: Future>, +where + F: FnMut(T) -> Fut, + Fut: Future>, { use core::task::Poll; enum State { @@ -799,7 +801,7 @@ pub fn unfold(init: T, mut f: F) -> impl Stream Running(Pin>), } let mut state = Some(State::Paused(init)); - futures::stream::poll_fn(move|waker| -> Poll> { + futures::stream::poll_fn(move |waker| -> Poll> { let mut future = match state.take() { Some(State::Running(fut)) => fut, Some(State::Paused(st)) => Box::pin(f(st)), @@ -809,21 +811,21 @@ pub fn unfold(init: T, mut f: F) -> impl Stream Poll::Pending => { state = Some(State::Running(future)); Poll::Pending - }, + } Poll::Ready(None) => Poll::Ready(None), Poll::Ready(Some((item, new_state))) => { state = Some(State::Paused(new_state)); Poll::Ready(Some(item)) - }, + } } }) } #[cfg(test)] mod tests { - use futures::executor; - use crate::stream::*; use crate::future::ready; + use crate::stream::*; + use futures::executor; #[test] fn test_next() { @@ -839,7 +841,7 @@ mod tests { fn test_collect() { let stream = iter(1..=5); - let collection : Vec = executor::block_on(collect(stream)); + let collection: Vec = executor::block_on(collect(stream)); assert_eq!(collection, vec![1, 2, 3, 4, 5]); } @@ -848,7 +850,10 @@ mod tests { let stream = iter(1..=3); let stream = map(stream, |x| x * 2); - assert_eq!(vec![2, 4, 6], executor::block_on(collect::<_, Vec<_>>(stream))); + assert_eq!( + vec![2, 4, 6], + executor::block_on(collect::<_, Vec<_>>(stream)) + ); } #[test] @@ -856,7 +861,10 @@ mod tests { let stream = iter(1..=10); let evens = filter(stream, |x| ready(x % 2 == 0)); - assert_eq!(vec![2, 4, 6, 8, 10], executor::block_on(collect::<_, Vec<_>>(evens))); + assert_eq!( + vec![2, 4, 6, 8, 10], + executor::block_on(collect::<_, Vec<_>>(evens)) + ); } #[test] @@ -867,7 +875,10 @@ mod tests { ready(ret) }); - assert_eq!(vec![3, 5, 7, 9, 11], executor::block_on(collect::<_, Vec<_>>(evens))); + assert_eq!( + vec![3, 5, 7, 9, 11], + executor::block_on(collect::<_, Vec<_>>(evens)) + ); } #[test] @@ -888,7 +899,7 @@ mod tests { fn test_iter() { let stream = iter(1..=5); - let collection : Vec = executor::block_on(collect(stream)); + let collection: Vec = executor::block_on(collect(stream)); assert_eq!(collection, vec![1, 2, 3, 4, 5]); } @@ -896,7 +907,7 @@ mod tests { fn test_concat() { let stream = iter(vec![vec![1, 2], vec![3], vec![4, 5]]); - let collection : Vec = executor::block_on(concat(stream)); + let collection: Vec = executor::block_on(concat(stream)); assert_eq!(collection, vec![1, 2, 3, 4, 5]); } @@ -921,7 +932,10 @@ mod tests { let stream = iter(1..=10); let stream = take(stream, 3); - assert_eq!(vec![1, 2, 3], executor::block_on(collect::<_, Vec<_>>(stream))); + assert_eq!( + vec![1, 2, 3], + executor::block_on(collect::<_, Vec<_>>(stream)) + ); } #[test] @@ -929,7 +943,10 @@ mod tests { let stream = iter(1..=3); let stream = take(stream, 10); - assert_eq!(vec![1, 2, 3], executor::block_on(collect::<_, Vec<_>>(stream))); + assert_eq!( + vec![1, 2, 3], + executor::block_on(collect::<_, Vec<_>>(stream)) + ); } #[test] @@ -937,7 +954,10 @@ mod tests { let stream = repeat(9); let stream = take(stream, 3); - assert_eq!(vec![9, 9, 9], executor::block_on(collect::<_, Vec<_>>(stream))); + assert_eq!( + vec![9, 9, 9], + executor::block_on(collect::<_, Vec<_>>(stream)) + ); } #[test] @@ -949,15 +969,21 @@ mod tests { let stream = iter(vec![stream0, stream1, stream2, stream3]); let stream = flatten(stream); - assert_eq!(vec![1, 2, 3, 4, 5, 6, 7, 8, 9], executor::block_on(collect::<_, Vec<_>>(stream))); + assert_eq!( + vec![1, 2, 3, 4, 5, 6, 7, 8, 9], + executor::block_on(collect::<_, Vec<_>>(stream)) + ); } #[test] fn test_then() { let stream = iter(1..=3); - let stream = then(stream, |x| ready(x+3)); + let stream = then(stream, |x| ready(x + 3)); - assert_eq!(vec![4, 5, 6], executor::block_on(collect::<_, Vec<_>>(stream))); + assert_eq!( + vec![4, 5, 6], + executor::block_on(collect::<_, Vec<_>>(stream)) + ); } #[test] @@ -965,7 +991,10 @@ mod tests { let stream = iter(1..=10); let stream = skip(stream, 5); - assert_eq!(vec![6, 7, 8, 9, 10], executor::block_on(collect::<_, Vec<_>>(stream))); + assert_eq!( + vec![6, 7, 8, 9, 10], + executor::block_on(collect::<_, Vec<_>>(stream)) + ); } #[test] @@ -982,7 +1011,10 @@ mod tests { let stream2 = iter(5..=10); let stream = zip(stream1, stream2); - assert_eq!(vec![(1, 5), (2, 6), (3, 7)], executor::block_on(collect::<_, Vec<_>>(stream))); + assert_eq!( + vec![(1, 5), (2, 6), (3, 7)], + executor::block_on(collect::<_, Vec<_>>(stream)) + ); } #[test] @@ -991,7 +1023,10 @@ mod tests { let stream2 = iter(3..=4); let stream = chain(stream1, stream2); - assert_eq!(vec![1, 2, 3, 4], executor::block_on(collect::<_, Vec<_>>(stream))); + assert_eq!( + vec![1, 2, 3, 4], + executor::block_on(collect::<_, Vec<_>>(stream)) + ); } #[test] @@ -999,7 +1034,10 @@ mod tests { let stream = iter(1..=10); let stream = take_while(stream, |x| ready(*x <= 5)); - assert_eq!(vec![1, 2, 3, 4, 5], executor::block_on(collect::<_, Vec<_>>(stream))); + assert_eq!( + vec![1, 2, 3, 4, 5], + executor::block_on(collect::<_, Vec<_>>(stream)) + ); } #[test] @@ -1007,7 +1045,10 @@ mod tests { let stream = iter(1..=3); let stream = take_while(stream, |x| ready(*x <= 5)); - assert_eq!(vec![1, 2, 3], executor::block_on(collect::<_, Vec<_>>(stream))); + assert_eq!( + vec![1, 2, 3], + executor::block_on(collect::<_, Vec<_>>(stream)) + ); } #[test] @@ -1015,7 +1056,10 @@ mod tests { let stream = iter(1..=10); let stream = skip_while(stream, |x| ready(*x <= 5)); - assert_eq!(vec![6, 7, 8, 9, 10], executor::block_on(collect::<_, Vec<_>>(stream))); + assert_eq!( + vec![6, 7, 8, 9, 10], + executor::block_on(collect::<_, Vec<_>>(stream)) + ); } #[test] @@ -1039,12 +1083,15 @@ mod tests { let stream = unfold(0, |state| { if state <= 2 { let next_state = state + 1; - let yielded = state * 2; + let yielded = state * 2; ready(Some((yielded, next_state))) } else { ready(None) } }); - assert_eq!(vec![0, 2, 4], executor::block_on(collect::<_, Vec<_>>(stream))); + assert_eq!( + vec![0, 2, 4], + executor::block_on(collect::<_, Vec<_>>(stream)) + ); } }