Skip to content

Commit ad05101

Browse files
committed
Added the ability to collect a stream of results
1 parent e6880e1 commit ad05101

File tree

4 files changed

+69
-0
lines changed

4 files changed

+69
-0
lines changed

src/lib.rs

+1
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ pub mod stream;
5252
pub mod sync;
5353
pub mod task;
5454
mod vec;
55+
mod result;
5556

5657
#[cfg_attr(feature = "docs", doc(cfg(unstable)))]
5758
#[cfg(feature = "unstable")]

src/result/from_stream.rs

+43
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
use crate::stream::{FromStream, IntoStream, Stream};
2+
3+
use std::pin::Pin;
4+
5+
impl<T: Send, E: Send, V> FromStream<Result<T, E>> for Result<V, E>
6+
where V: FromStream<T> {
7+
/// Takes each element in the stream: if it is an `Err`, no further
8+
/// elements are taken, and the `Err` is returned. Should no `Err`
9+
/// occur, a container with the values of each `Result` is returned.
10+
#[inline]
11+
fn from_stream<'a, S: IntoStream<Item = Result<T, E>>>(
12+
stream: S,
13+
) -> Pin<Box<dyn core::future::Future<Output = Self> + Send + 'a>>
14+
where
15+
<S as IntoStream>::IntoStream: Send + 'a,
16+
{
17+
let stream = stream.into_stream();
18+
19+
Pin::from(Box::new(async move {
20+
pin_utils::pin_mut!(stream);
21+
22+
// Using `scan` here because it is able to stop the stream early
23+
// if a failure occurs
24+
let mut found_error = None;
25+
let out: V = stream.scan((), |_, elem| {
26+
match elem {
27+
Ok(elem) => Some(elem),
28+
Err(err) => {
29+
found_error = Some(err);
30+
// Stop processing the stream on error
31+
None
32+
},
33+
}
34+
}).collect().await;
35+
36+
match found_error {
37+
Some(err) => Err(err),
38+
None => Ok(out),
39+
}
40+
}))
41+
}
42+
}
43+

src/result/mod.rs

+9
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
//! The Rust core error handling type
2+
//!
3+
//! This module provides the `Result<T, E>` type for returning and
4+
//! propagating errors.
5+
6+
mod from_stream;
7+
8+
#[doc(inline)]
9+
pub use std::result::Result;

src/stream/stream/mod.rs

+16
Original file line numberDiff line numberDiff line change
@@ -695,6 +695,22 @@ pub trait Stream {
695695
/// let buf: Vec<u8> = s.collect().await;
696696
///
697697
/// assert_eq!(buf, vec![9; 3]);
698+
///
699+
/// // You can also collect streams of Result values
700+
/// // into any collection that implements FromStream
701+
/// let s = stream::repeat(Ok(9)).take(3);
702+
/// // We are using Vec here, but other collections
703+
/// // are supported as well
704+
/// let buf: Result<Vec<u8>, ()> = s.collect().await;
705+
///
706+
/// assert_eq!(buf, Ok(vec![9; 3]));
707+
///
708+
/// // The stream will stop on the first Err and
709+
/// // return that instead
710+
/// let s = stream::repeat(Err(5)).take(3);
711+
/// let buf: Result<Vec<u8>, u8> = s.collect().await;
712+
///
713+
/// assert_eq!(buf, Err(5));
698714
/// #
699715
/// # }) }
700716
/// ```

0 commit comments

Comments
 (0)