Skip to content

Commit 14ed012

Browse files
committed
fix: spawn tasks in Stream::buffered and Stream::buffer_unordered to max concurrency
1 parent de9274e commit 14ed012

File tree

4 files changed

+109
-69
lines changed

4 files changed

+109
-69
lines changed

futures-util/src/stream/stream/buffer_unordered.rs

Lines changed: 41 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
use crate::stream::{Fuse, FuturesUnordered, StreamExt};
2+
use alloc::vec::Vec;
23
use core::fmt;
34
use core::num::NonZeroUsize;
45
use core::pin::Pin;
@@ -13,20 +14,23 @@ pin_project! {
1314
/// Stream for the [`buffer_unordered`](super::StreamExt::buffer_unordered)
1415
/// method.
1516
#[must_use = "streams do nothing unless polled"]
16-
pub struct BufferUnordered<St>
17+
pub struct BufferUnordered<St, F>
1718
where
18-
St: Stream,
19+
St: Stream<Item = F>,
20+
F: Future,
1921
{
2022
#[pin]
2123
stream: Fuse<St>,
2224
in_progress_queue: FuturesUnordered<St::Item>,
25+
ready_queue: Vec<F::Output>,
2326
max: Option<NonZeroUsize>,
2427
}
2528
}
2629

27-
impl<St> fmt::Debug for BufferUnordered<St>
30+
impl<St, F> fmt::Debug for BufferUnordered<St, F>
2831
where
29-
St: Stream + fmt::Debug,
32+
St: Stream<Item = F> + fmt::Debug,
33+
F: Future,
3034
{
3135
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
3236
f.debug_struct("BufferUnordered")
@@ -37,26 +41,27 @@ where
3741
}
3842
}
3943

40-
impl<St> BufferUnordered<St>
44+
impl<St, F> BufferUnordered<St, F>
4145
where
42-
St: Stream,
43-
St::Item: Future,
46+
St: Stream<Item = F>,
47+
F: Future,
4448
{
4549
pub(super) fn new(stream: St, n: Option<usize>) -> Self {
4650
Self {
4751
stream: super::Fuse::new(stream),
4852
in_progress_queue: FuturesUnordered::new(),
53+
ready_queue: Vec::new(),
4954
max: n.and_then(NonZeroUsize::new),
5055
}
5156
}
5257

5358
delegate_access_inner!(stream, St, (.));
5459
}
5560

56-
impl<St> Stream for BufferUnordered<St>
61+
impl<St, F> Stream for BufferUnordered<St, F>
5762
where
58-
St: Stream,
59-
St::Item: Future,
63+
St: Stream<Item = F>,
64+
F: Future,
6065
{
6166
type Item = <St::Item as Future>::Output;
6267

@@ -72,14 +77,28 @@ where
7277
}
7378
}
7479

75-
// Attempt to pull the next value from the in_progress_queue
76-
match this.in_progress_queue.poll_next_unpin(cx) {
77-
x @ Poll::Pending | x @ Poll::Ready(Some(_)) => return x,
78-
Poll::Ready(None) => {}
80+
// Try to poll all ready futures in the in_progress_queue.
81+
loop {
82+
match this.in_progress_queue.poll_next_unpin(cx) {
83+
Poll::Ready(Some(output)) => {
84+
this.ready_queue.push(output);
85+
}
86+
Poll::Ready(None) => break,
87+
Poll::Pending => break,
88+
}
89+
}
90+
91+
// If we have any ready outputs, return the first one.
92+
if let Some(output) = this.ready_queue.pop() {
93+
// If there are still ready outputs, wake the task to poll again.
94+
if !this.ready_queue.is_empty() {
95+
cx.waker().wake_by_ref();
96+
}
97+
98+
return Poll::Ready(Some(output));
7999
}
80100

81-
// If more values are still coming from the stream, we're not done yet
82-
if this.stream.is_done() {
101+
if this.stream.is_done() && this.in_progress_queue.is_empty() {
83102
Poll::Ready(None)
84103
} else {
85104
Poll::Pending
@@ -98,10 +117,10 @@ where
98117
}
99118
}
100119

101-
impl<St> FusedStream for BufferUnordered<St>
120+
impl<St, F> FusedStream for BufferUnordered<St, F>
102121
where
103-
St: Stream,
104-
St::Item: Future,
122+
St: Stream<Item = F>,
123+
F: Future,
105124
{
106125
fn is_terminated(&self) -> bool {
107126
self.in_progress_queue.is_terminated() && self.stream.is_terminated()
@@ -110,10 +129,10 @@ where
110129

111130
// Forwarding impl of Sink from the underlying stream
112131
#[cfg(feature = "sink")]
113-
impl<S, Item> Sink<Item> for BufferUnordered<S>
132+
impl<S, F, Item> Sink<Item> for BufferUnordered<S, F>
114133
where
115-
S: Stream + Sink<Item>,
116-
S::Item: Future,
134+
S: Stream<Item = F> + Sink<Item>,
135+
F: Future,
117136
{
118137
type Error = S::Error;
119138

futures-util/src/stream/stream/buffered.rs

Lines changed: 41 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
11
use crate::stream::{Fuse, FusedStream, FuturesOrdered, StreamExt};
2+
use alloc::collections::VecDeque;
23
use core::fmt;
34
use core::num::NonZeroUsize;
45
use core::pin::Pin;
56
use futures_core::future::Future;
6-
use futures_core::ready;
77
use futures_core::stream::Stream;
88
use futures_core::task::{Context, Poll};
99
#[cfg(feature = "sink")]
@@ -13,22 +13,23 @@ use pin_project_lite::pin_project;
1313
pin_project! {
1414
/// Stream for the [`buffered`](super::StreamExt::buffered) method.
1515
#[must_use = "streams do nothing unless polled"]
16-
pub struct Buffered<St>
16+
pub struct Buffered<St, F>
1717
where
18-
St: Stream,
19-
St::Item: Future,
18+
St: Stream<Item = F>,
19+
F: Future,
2020
{
2121
#[pin]
2222
stream: Fuse<St>,
2323
in_progress_queue: FuturesOrdered<St::Item>,
24+
ready_queue: VecDeque<F::Output>,
2425
max: Option<NonZeroUsize>,
2526
}
2627
}
2728

28-
impl<St> fmt::Debug for Buffered<St>
29+
impl<St, F> fmt::Debug for Buffered<St, F>
2930
where
30-
St: Stream + fmt::Debug,
31-
St::Item: Future,
31+
St: Stream<Item = F> + fmt::Debug,
32+
F: Future,
3233
{
3334
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
3435
f.debug_struct("Buffered")
@@ -39,26 +40,27 @@ where
3940
}
4041
}
4142

42-
impl<St> Buffered<St>
43+
impl<St, F> Buffered<St, F>
4344
where
44-
St: Stream,
45-
St::Item: Future,
45+
St: Stream<Item = F>,
46+
F: Future,
4647
{
4748
pub(super) fn new(stream: St, n: Option<usize>) -> Self {
4849
Self {
4950
stream: super::Fuse::new(stream),
5051
in_progress_queue: FuturesOrdered::new(),
52+
ready_queue: VecDeque::new(),
5153
max: n.and_then(NonZeroUsize::new),
5254
}
5355
}
5456

5557
delegate_access_inner!(stream, St, (.));
5658
}
5759

58-
impl<St> Stream for Buffered<St>
60+
impl<St, F> Stream for Buffered<St, F>
5961
where
60-
St: Stream,
61-
St::Item: Future,
62+
St: Stream<Item = F>,
63+
F: Future,
6264
{
6365
type Item = <St::Item as Future>::Output;
6466

@@ -74,14 +76,28 @@ where
7476
}
7577
}
7678

77-
// Attempt to pull the next value from the in_progress_queue
78-
let res = this.in_progress_queue.poll_next_unpin(cx);
79-
if let Some(val) = ready!(res) {
80-
return Poll::Ready(Some(val));
79+
// Try to poll all ready futures in the in_progress_queue.
80+
loop {
81+
match this.in_progress_queue.poll_next_unpin(cx) {
82+
Poll::Ready(Some(output)) => {
83+
this.ready_queue.push_back(output);
84+
}
85+
Poll::Ready(None) => break,
86+
Poll::Pending => break,
87+
}
88+
}
89+
90+
// If we have any ready outputs, return the first one.
91+
if let Some(output) = this.ready_queue.pop_front() {
92+
// If there are still ready outputs, wake the task to poll again.
93+
if !this.ready_queue.is_empty() {
94+
cx.waker().wake_by_ref();
95+
}
96+
97+
return Poll::Ready(Some(output));
8198
}
8299

83-
// If more values are still coming from the stream, we're not done yet
84-
if this.stream.is_done() {
100+
if this.stream.is_done() && this.in_progress_queue.is_empty() {
85101
Poll::Ready(None)
86102
} else {
87103
Poll::Pending
@@ -100,10 +116,10 @@ where
100116
}
101117
}
102118

103-
impl<St> FusedStream for Buffered<St>
119+
impl<St, F> FusedStream for Buffered<St, F>
104120
where
105-
St: Stream,
106-
St::Item: Future,
121+
St: Stream<Item = F>,
122+
F: Future,
107123
{
108124
fn is_terminated(&self) -> bool {
109125
self.stream.is_done() && self.in_progress_queue.is_terminated()
@@ -112,10 +128,10 @@ where
112128

113129
// Forwarding impl of Sink from the underlying stream
114130
#[cfg(feature = "sink")]
115-
impl<S, Item> Sink<Item> for Buffered<S>
131+
impl<S, F, Item> Sink<Item> for Buffered<S, F>
116132
where
117-
S: Stream + Sink<Item>,
118-
S::Item: Future,
133+
S: Stream<Item = F> + Sink<Item>,
134+
F: Future,
119135
{
120136
type Error = S::Error;
121137

futures-util/src/stream/stream/mod.rs

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1487,10 +1487,11 @@ pub trait StreamExt: Stream {
14871487
/// library is activated, and it is activated by default.
14881488
#[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))]
14891489
#[cfg(feature = "alloc")]
1490-
fn buffered(self, n: impl Into<Option<usize>>) -> Buffered<Self>
1490+
fn buffered<F>(self, n: impl Into<Option<usize>>) -> Buffered<Self, F>
14911491
where
1492-
Self::Item: Future,
14931492
Self: Sized,
1493+
Self: Stream<Item = F>,
1494+
F: Future,
14941495
{
14951496
assert_stream::<<Self::Item as Future>::Output, _>(Buffered::new(self, n.into()))
14961497
}
@@ -1536,10 +1537,11 @@ pub trait StreamExt: Stream {
15361537
/// ```
15371538
#[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))]
15381539
#[cfg(feature = "alloc")]
1539-
fn buffer_unordered(self, n: impl Into<Option<usize>>) -> BufferUnordered<Self>
1540+
fn buffer_unordered<F>(self, n: impl Into<Option<usize>>) -> BufferUnordered<Self, F>
15401541
where
1541-
Self::Item: Future,
15421542
Self: Sized,
1543+
Self: Stream<Item = F>,
1544+
F: Future,
15431545
{
15441546
assert_stream::<<Self::Item as Future>::Output, _>(BufferUnordered::new(self, n.into()))
15451547
}

futures/tests/auto_traits.rs

Lines changed: 21 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1112,24 +1112,27 @@ mod stream {
11121112
assert_not_impl!(AndThen<PhantomPinned, (), ()>: Unpin);
11131113
assert_not_impl!(AndThen<(), PhantomPinned, ()>: Unpin);
11141114

1115-
assert_impl!(BufferUnordered<SendStream<()>>: Send);
1116-
assert_not_impl!(BufferUnordered<SendStream>: Send);
1117-
assert_not_impl!(BufferUnordered<LocalStream>: Send);
1118-
assert_impl!(BufferUnordered<SyncStream<()>>: Sync);
1119-
assert_not_impl!(BufferUnordered<SyncStream>: Sync);
1120-
assert_not_impl!(BufferUnordered<LocalStream>: Sync);
1121-
assert_impl!(BufferUnordered<UnpinStream>: Unpin);
1122-
assert_not_impl!(BufferUnordered<PinnedStream>: Unpin);
1123-
1124-
assert_impl!(Buffered<SendStream<SendFuture<()>>>: Send);
1125-
assert_not_impl!(Buffered<SendStream<SendFuture>>: Send);
1126-
assert_not_impl!(Buffered<SendStream<LocalFuture>>: Send);
1127-
assert_not_impl!(Buffered<LocalStream<SendFuture<()>>>: Send);
1128-
assert_impl!(Buffered<SyncStream<SendSyncFuture<()>>>: Sync);
1129-
assert_not_impl!(Buffered<SyncStream<SyncFuture<()>>>: Sync);
1130-
assert_not_impl!(Buffered<LocalStream<SendSyncFuture<()>>>: Sync);
1131-
assert_impl!(Buffered<UnpinStream<PinnedFuture>>: Unpin);
1132-
assert_not_impl!(Buffered<PinnedStream<PinnedFuture>>: Unpin);
1115+
assert_impl!(BufferUnordered<SendStream<SendFuture<()>>, SendFuture<()>>: Send);
1116+
assert_not_impl!(BufferUnordered<SendStream<SendFuture>, SendFuture>: Send);
1117+
assert_not_impl!(BufferUnordered<SendStream<LocalFuture>, LocalFuture>: Send);
1118+
assert_not_impl!(BufferUnordered<LocalStream<LocalFuture>, LocalFuture>: Send);
1119+
assert_impl!(BufferUnordered<SyncStream<SendSyncFuture<()>>, SendSyncFuture<()>>: Sync);
1120+
assert_not_impl!(BufferUnordered<SyncStream<SyncFuture<()>>, SyncFuture<()>>: Sync);
1121+
assert_not_impl!(BufferUnordered<SyncStream<LocalFuture>, LocalFuture>: Sync);
1122+
assert_not_impl!(BufferUnordered<LocalStream<LocalFuture>, LocalFuture>: Sync);
1123+
assert_impl!(BufferUnordered<UnpinStream<UnpinFuture>, UnpinFuture>: Unpin);
1124+
assert_not_impl!(BufferUnordered<PinnedStream<PinnedFuture>, PinnedFuture>: Unpin);
1125+
1126+
assert_impl!(Buffered<SendStream<SendFuture<()>>, SendFuture<()>>: Send);
1127+
assert_not_impl!(Buffered<SendStream<SendFuture>, SendFuture>: Send);
1128+
assert_not_impl!(Buffered<SendStream<LocalFuture>, LocalFuture>: Send);
1129+
assert_not_impl!(Buffered<LocalStream<LocalFuture>, LocalFuture>: Send);
1130+
assert_impl!(Buffered<SyncStream<SendSyncFuture<()>>, SendSyncFuture<()>>: Sync);
1131+
assert_not_impl!(Buffered<SyncStream<SyncFuture<()>>, SyncFuture<()>>: Sync);
1132+
assert_not_impl!(Buffered<SyncStream<LocalFuture>, LocalFuture>: Sync);
1133+
assert_not_impl!(Buffered<LocalStream<LocalFuture>, LocalFuture>: Sync);
1134+
assert_impl!(Buffered<UnpinStream<UnpinFuture>, UnpinFuture>: Unpin);
1135+
assert_not_impl!(Buffered<PinnedStream<PinnedFuture>, PinnedFuture>: Unpin);
11331136

11341137
assert_impl!(CatchUnwind<SendStream>: Send);
11351138
assert_not_impl!(CatchUnwind<LocalStream>: Send);

0 commit comments

Comments
 (0)