Skip to content

Commit e6880e1

Browse files
Merge #125
125: from/into stream r=yoshuawuyts a=yoshuawuyts This adds `Stream` counterparts to `FromIterator`, `IntoIterator` and `Iterator::collect`, allowing to use the same patterns that are common in streams. Thanks! ## Tasks - [x] `FromStream` - [x] `IntoStream` - [x] `Stream::collect` ## Screenshot ![Screenshot_2019-08-29 async_std stream - Rust](https://user-images.githubusercontent.com/2467194/63928985-ec2bd200-ca50-11e9-868c-9899800e5b83.png) Co-authored-by: Yoshua Wuyts <[email protected]>
2 parents 60a62f9 + 98927a7 commit e6880e1

File tree

7 files changed

+164
-3
lines changed

7 files changed

+164
-3
lines changed

Diff for: src/lib.rs

+1
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ pub mod prelude;
5151
pub mod stream;
5252
pub mod sync;
5353
pub mod task;
54+
mod vec;
5455

5556
#[cfg_attr(feature = "docs", doc(cfg(unstable)))]
5657
#[cfg(feature = "unstable")]

Diff for: src/stream/from_stream.rs

+29
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
use super::IntoStream;
2+
3+
use std::pin::Pin;
4+
5+
/// Conversion from a `Stream`.
6+
///
7+
/// By implementing `FromStream` for a type, you define how it will be created from a stream.
8+
/// This is common for types which describe a collection of some kind.
9+
///
10+
/// See also: [`IntoStream`].
11+
///
12+
/// [`IntoStream`]: trait.IntoStream.html
13+
#[cfg_attr(feature = "docs", doc(cfg(unstable)))]
14+
pub trait FromStream<T: Send> {
15+
/// Creates a value from a stream.
16+
///
17+
/// # Examples
18+
///
19+
/// Basic usage:
20+
///
21+
/// ```
22+
/// // use async_std::stream::FromStream;
23+
///
24+
/// // let _five_fives = async_std::stream::repeat(5).take(5);
25+
/// ```
26+
fn from_stream<'a, S: IntoStream<Item = T> + Send + 'a>(
27+
stream: S,
28+
) -> Pin<Box<dyn core::future::Future<Output = Self> + Send + 'a>>;
29+
}

Diff for: src/stream/into_stream.rs

+36
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
use futures_core::stream::Stream;
2+
3+
/// Conversion into a `Stream`.
4+
///
5+
/// By implementing `IntoIterator` for a type, you define how it will be
6+
/// converted to an iterator. This is common for types which describe a
7+
/// collection of some kind.
8+
///
9+
/// [`from_stream`]: #tymethod.from_stream
10+
/// [`Stream`]: trait.Stream.html
11+
/// [`collect`]: trait.Stream.html#method.collect
12+
///
13+
/// See also: [`FromStream`].
14+
///
15+
/// [`FromStream`]: trait.FromStream.html
16+
#[cfg_attr(feature = "docs", doc(cfg(unstable)))]
17+
pub trait IntoStream {
18+
/// The type of the elements being iterated over.
19+
type Item;
20+
21+
/// Which kind of stream are we turning this into?
22+
type IntoStream: Stream<Item = Self::Item> + Send;
23+
24+
/// Creates a stream from a value.
25+
fn into_stream(self) -> Self::IntoStream;
26+
}
27+
28+
impl<I: Stream + Send> IntoStream for I {
29+
type Item = I::Item;
30+
type IntoStream = I;
31+
32+
#[inline]
33+
fn into_stream(self) -> I {
34+
self
35+
}
36+
}

Diff for: src/stream/mod.rs

+4
Original file line numberDiff line numberDiff line change
@@ -23,12 +23,16 @@
2323
2424
pub use double_ended_stream::DoubleEndedStream;
2525
pub use empty::{empty, Empty};
26+
pub use from_stream::FromStream;
27+
pub use into_stream::IntoStream;
2628
pub use once::{once, Once};
2729
pub use repeat::{repeat, Repeat};
2830
pub use stream::{Scan, Stream, Take, Zip};
2931

3032
mod double_ended_stream;
3133
mod empty;
34+
mod from_stream;
35+
mod into_stream;
3236
mod once;
3337
mod repeat;
3438
mod stream;

Diff for: src/stream/stream/mod.rs

+60-3
Original file line numberDiff line numberDiff line change
@@ -50,14 +50,14 @@ use min_by::MinByFuture;
5050
use next::NextFuture;
5151
use nth::NthFuture;
5252

53+
use super::from_stream::FromStream;
5354
use std::cmp::Ordering;
5455
use std::marker::PhantomData;
5556
use std::pin::Pin;
57+
use std::task::{Context, Poll};
5658

5759
use cfg_if::cfg_if;
5860

59-
use crate::task::{Context, Poll};
60-
6161
cfg_if! {
6262
if #[cfg(feature = "docs")] {
6363
#[doc(hidden)]
@@ -80,6 +80,21 @@ cfg_if! {
8080
}
8181
}
8282

83+
cfg_if! {
84+
if #[cfg(feature = "docs")] {
85+
#[doc(hidden)]
86+
pub struct DynFuture<'a, T>(std::marker::PhantomData<&'a T>);
87+
88+
macro_rules! dyn_ret {
89+
($a:lifetime, $o:ty) => (DynFuture<$a, $o>);
90+
}
91+
} else {
92+
macro_rules! dyn_ret {
93+
($a:lifetime, $o:ty) => (Pin<Box<dyn core::future::Future<Output = $o> + Send + 'a>>)
94+
}
95+
}
96+
}
97+
8398
/// An asynchronous stream of values.
8499
///
85100
/// This trait is an async version of [`std::iter::Iterator`].
@@ -536,7 +551,6 @@ pub trait Stream {
536551
///
537552
/// let mut s = stream::repeat::<u32>(42).take(3);
538553
/// assert!(s.any(|x| x == 42).await);
539-
///
540554
/// #
541555
/// # }) }
542556
/// ```
@@ -652,6 +666,49 @@ pub trait Stream {
652666
{
653667
Zip::new(self, other)
654668
}
669+
670+
/// Transforms a stream into a collection.
671+
///
672+
/// `collect()` can take anything streamable, and turn it into a relevant
673+
/// collection. This is one of the more powerful methods in the async
674+
/// standard library, used in a variety of contexts.
675+
///
676+
/// The most basic pattern in which `collect()` is used is to turn one
677+
/// collection into another. You take a collection, call [`stream`] on it,
678+
/// do a bunch of transformations, and then `collect()` at the end.
679+
///
680+
/// Because `collect()` is so general, it can cause problems with type
681+
/// inference. As such, `collect()` is one of the few times you'll see
682+
/// the syntax affectionately known as the 'turbofish': `::<>`. This
683+
/// helps the inference algorithm understand specifically which collection
684+
/// you're trying to collect into.
685+
///
686+
/// # Examples
687+
///
688+
/// ```
689+
/// # fn main() { async_std::task::block_on(async {
690+
/// #
691+
/// use async_std::prelude::*;
692+
/// use async_std::stream;
693+
///
694+
/// let s = stream::repeat(9u8).take(3);
695+
/// let buf: Vec<u8> = s.collect().await;
696+
///
697+
/// assert_eq!(buf, vec![9; 3]);
698+
/// #
699+
/// # }) }
700+
/// ```
701+
///
702+
/// [`stream`]: trait.Stream.html#tymethod.next
703+
#[must_use = "if you really need to exhaust the iterator, consider `.for_each(drop)` instead (TODO)"]
704+
fn collect<'a, B>(self) -> dyn_ret!('a, B)
705+
where
706+
Self: futures_core::stream::Stream + Sized + Send + 'a,
707+
<Self as futures_core::stream::Stream>::Item: Send,
708+
B: FromStream<<Self as futures_core::stream::Stream>::Item>,
709+
{
710+
FromStream::from_stream(self)
711+
}
655712
}
656713

657714
impl<T: futures_core::stream::Stream + ?Sized> Stream for T {

Diff for: src/vec/from_stream.rs

+25
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
use crate::stream::{FromStream, IntoStream, Stream};
2+
3+
use std::pin::Pin;
4+
5+
impl<T: Send> FromStream<T> for Vec<T> {
6+
#[inline]
7+
fn from_stream<'a, S: IntoStream<Item = T>>(
8+
stream: S,
9+
) -> Pin<Box<dyn core::future::Future<Output = Self> + Send + 'a>>
10+
where
11+
<S as IntoStream>::IntoStream: Send + 'a,
12+
{
13+
let stream = stream.into_stream();
14+
15+
Pin::from(Box::new(async move {
16+
pin_utils::pin_mut!(stream);
17+
18+
let mut out = vec![];
19+
while let Some(item) = stream.next().await {
20+
out.push(item);
21+
}
22+
out
23+
}))
24+
}
25+
}

Diff for: src/vec/mod.rs

+9
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
//! The Rust core allocation and collections library
2+
//!
3+
//! This library provides smart pointers and collections for managing
4+
//! heap-allocated values.
5+
6+
mod from_stream;
7+
8+
#[doc(inline)]
9+
pub use std::vec::Vec;

0 commit comments

Comments
 (0)