Skip to content

Commit b36d343

Browse files
committed
Fix edge case
1 parent 1dac81a commit b36d343

File tree

1 file changed

+36
-1
lines changed

1 file changed

+36
-1
lines changed

relay-threading/src/multiplexing.rs

Lines changed: 36 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -149,15 +149,26 @@ where
149149
loop {
150150
this.tasks.as_mut().poll_tasks_until_pending(cx);
151151

152+
// If we can't get anymore tasks, and we don't have anything else to process, we report
153+
// ready. Otherwise, if we have something to process, we report pending.
154+
if this.tasks.is_empty() && this.rx.is_terminated() {
155+
return Poll::Ready(());
156+
} else if this.rx.is_terminated() {
157+
return Poll::Pending;
158+
}
159+
160+
// If we could accept tasks, but we don't have space we report pending.
152161
if this.tasks.len() >= *this.max_concurrency {
153162
return Poll::Pending;
154163
}
155164

165+
// At this point, we are free to start driving another future.
156166
match this.rx.as_mut().poll_next(cx) {
157167
Poll::Ready(Some(task)) => this.tasks.push(task),
158168
// The stream is exhausted and there are no remaining tasks.
159169
Poll::Ready(None) if this.tasks.is_empty() => return Poll::Ready(()),
160-
// The stream is exhausted but tasks remain active.
170+
// The stream is exhausted but tasks remain active. Now we need to make sure we
171+
// stop polling the stream and just process tasks.
161172
Poll::Ready(None) => return Poll::Pending,
162173
Poll::Pending => return Poll::Pending,
163174
}
@@ -326,6 +337,30 @@ mod tests {
326337
assert_eq!(*entries.lock().unwrap(), (0..5).collect::<Vec<_>>());
327338
}
328339

340+
#[test]
341+
fn test_multiplexer_with_multiple_concurrency_and_less_multiple_futures() {
342+
let entries = Arc::new(Mutex::new(Vec::new()));
343+
let (tx, rx) = flume::bounded(10);
344+
345+
// We send 3 futures with a concurrency of 5, to make sure that if the stream returns
346+
// `Poll::Ready(None)` the system will stop polling from the stream and continue driving
347+
// the remaining futures.
348+
for i in 0..3 {
349+
let entries_clone = entries.clone();
350+
tx.send(future_with(move || {
351+
entries_clone.lock().unwrap().push(i);
352+
}))
353+
.unwrap();
354+
}
355+
356+
drop(tx);
357+
358+
futures::executor::block_on(Multiplexed::new(5, rx.into_stream(), None));
359+
360+
// The order of completion is expected to be the same as the order of submission.
361+
assert_eq!(*entries.lock().unwrap(), (0..3).collect::<Vec<_>>());
362+
}
363+
329364
#[test]
330365
fn test_multiplexer_with_multiple_concurrency_and_multiple_futures_from_multiple_threads() {
331366
let entries = Arc::new(Mutex::new(Vec::new()));

0 commit comments

Comments
 (0)