-
Notifications
You must be signed in to change notification settings - Fork 14
/
Copy pathenumerate.rs
67 lines (58 loc) · 1.71 KB
/
enumerate.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
use core::pin::Pin;
use core::task::{Context, Poll};
use async_std::task::ready;
use pin_project_lite::pin_project;
use crate::ParallelStream;
pin_project! {
/// A stream that yields the current count and element.
///
/// This `struct` is created by the [`enumerate`] method on [`ParallelStream`]. See its
/// documentation for more.
///
/// [`enumerate`]: trait.ParallelStream.html#method.enumerate
/// [`ParallelStream`]: trait.ParallelStream.html
#[derive(Clone, Debug)]
pub struct Enumerate<S> {
#[pin]
stream: S,
count: usize,
limit: Option<usize>,
}
}
impl<S: ParallelStream> Enumerate<S> {
pub(super) fn new(stream: S) -> Self {
Self {
limit: stream.get_limit(),
count: 0,
stream,
}
}
}
impl<S: ParallelStream> ParallelStream for Enumerate<S> {
type Item = (usize, S::Item);
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.project();
let next = ready!(this.stream.poll_next(cx));
*this.count += 1;
let count = *this.count;
Poll::Ready(next.map(|val| (count, val)))
}
fn limit(mut self, limit: impl Into<Option<usize>>) -> Self {
self.limit = limit.into();
self
}
fn get_limit(&self) -> Option<usize> {
self.limit
}
}
#[async_std::test]
async fn smoke() {
use async_std::prelude::*;
let s = async_std::stream::repeat(5usize).enumerate().take(3);
let mut output = vec![];
let mut stream = crate::from_stream(s);
while let Some(n) = stream.next().await {
output.push(n);
}
assert_eq!(output, vec![(0, 5), (1, 5), (2, 5)]);
}