Skip to content

Commit b8ee27d

Browse files
committed
Implement stream combinators using async_stream_block and generators
1 parent e5b89d3 commit b8ee27d

File tree

4 files changed

+107
-152
lines changed

4 files changed

+107
-152
lines changed

Cargo.toml

+1
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ maintenance = { status = "experimental" }
1919

2020
[dependencies]
2121
pin-utils = "=0.1.0-alpha.4"
22+
futures-async-stream = "0.1.0-alpha.1"
2223

2324
[dependencies.futures-core]
2425
version = "=0.3.0-alpha.19"

src/future.rs

+11-23
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
use futures_core::future::Future;
22
use futures_core::stream::Stream;
3+
use futures_async_stream::async_stream_block;
34

45
use core::pin::Pin;
56
use core::task::{Context, Poll};
@@ -430,22 +431,13 @@ where
430431
Fut: Future<Output = St>,
431432
St: Stream<Item = T>,
432433
{
433-
use crate::stream::next;
434-
crate::stream::unfold((Some(future), None), async move |(future, stream)| {
435-
match (future, stream) {
436-
(Some(future), None) => {
437-
let stream = future.await;
438-
let mut stream = Box::pin(stream);
439-
let item = next(&mut stream).await;
440-
item.map(|item| (item, (None, Some(stream))))
441-
}
442-
(None, Some(mut stream)) => {
443-
let item = next(&mut stream).await;
444-
item.map(|item| (item, (None, Some(stream))))
445-
}
446-
_ => unreachable!(),
434+
async_stream_block! {
435+
let stream = future.await;
436+
#[for_await]
437+
for item in stream {
438+
yield item
447439
}
448-
})
440+
}
449441
}
450442

451443
/// Convert this future into a single element stream.
@@ -470,14 +462,10 @@ pub fn into_stream<Fut>(future: Fut) -> impl Stream<Item = Fut::Output>
470462
where
471463
Fut: Future,
472464
{
473-
crate::stream::unfold(Some(future), async move |future| {
474-
if let Some(future) = future {
475-
let item = future.await;
476-
Some((item, (None)))
477-
} else {
478-
None
479-
}
480-
})
465+
async_stream_block! {
466+
let item = future.await;
467+
yield item
468+
}
481469
}
482470

483471
/// Creates a new future wrapping around a function returning [`Poll`](core::task::Poll).

src/lib.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
#![feature(async_closure, gen_future, generators)]
1+
#![feature(gen_future, generators, proc_macro_hygiene)]
22

33
pub mod future;
44
pub mod stream;

src/stream.rs

+94-128
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
1-
use core::future::Future;
1+
use futures_core::future::Future;
22
pub use futures_core::stream::Stream;
3+
use futures_async_stream::async_stream_block;
34

45
use core::iter::IntoIterator;
56
use core::pin::Pin;
@@ -97,11 +98,13 @@ where
9798
St: Stream,
9899
F: FnMut(St::Item) -> U,
99100
{
100-
let stream = Box::pin(stream);
101-
unfold((stream, f), async move |(mut stream, mut f)| {
102-
let item = next(&mut stream).await;
103-
item.map(|item| (f(item), (stream, f)))
104-
})
101+
let mut f = f;
102+
async_stream_block! {
103+
#[for_await]
104+
for item in stream {
105+
yield f(item)
106+
}
107+
}
105108
}
106109

107110
/// Filters the values produced by this stream according to the provided
@@ -136,18 +139,15 @@ where
136139
F: FnMut(&St::Item) -> Fut,
137140
Fut: Future<Output = bool>,
138141
{
139-
let stream = Box::pin(stream);
140-
unfold((stream, f), async move |(mut stream, mut f)| {
141-
while let Some(item) = next(&mut stream).await {
142-
let matched = f(&item).await;
143-
if matched {
144-
return Some((item, (stream, f)));
145-
} else {
146-
continue;
142+
let mut f = f;
143+
async_stream_block! {
144+
#[for_await]
145+
for item in stream {
146+
if f(&item).await {
147+
yield item
147148
}
148149
}
149-
None
150-
})
150+
}
151151
}
152152

153153
/// Filters the values produced by this stream while simultaneously mapping
@@ -183,17 +183,15 @@ where
183183
F: FnMut(St::Item) -> Fut,
184184
Fut: Future<Output = Option<U>>,
185185
{
186-
let stream = Box::pin(stream);
187-
unfold((stream, f), async move |(mut stream, mut f)| {
188-
while let Some(item) = next(&mut stream).await {
186+
let mut f = f;
187+
async_stream_block! {
188+
#[for_await]
189+
for item in stream {
189190
if let Some(item) = f(item).await {
190-
return Some((item, (stream, f)));
191-
} else {
192-
continue;
191+
yield item
193192
}
194193
}
195-
None
196-
})
194+
}
197195
}
198196

199197
/// Converts this stream into a future of `(next_item, tail_of_stream)`.
@@ -366,18 +364,18 @@ pub fn take<St>(stream: St, n: u64) -> impl Stream<Item = St::Item>
366364
where
367365
St: Stream,
368366
{
369-
let stream = Box::pin(stream);
370-
unfold((stream, n), async move |(mut stream, n)| {
371-
if n == 0 {
372-
None
373-
} else {
374-
if let Some(item) = next(&mut stream).await {
375-
Some((item, (stream, n - 1)))
367+
let mut n = n;
368+
async_stream_block! {
369+
#[for_await]
370+
for item in stream {
371+
if n == 0 {
372+
break;
376373
} else {
377-
None
374+
n = n - 1;
375+
yield item
378376
}
379377
}
380-
})
378+
}
381379
}
382380

383381
/// Create a stream which produces the same item repeatedly.
@@ -428,28 +426,15 @@ where
428426
SubSt: Stream<Item = T>,
429427
St: Stream<Item = SubSt>,
430428
{
431-
let stream = Box::pin(stream);
432-
unfold(
433-
(Some(stream), None),
434-
async move |(mut state_stream, mut state_substream)| loop {
435-
if let Some(mut substream) = state_substream.take() {
436-
if let Some(item) = next(&mut substream).await {
437-
return Some((item, (state_stream, Some(substream))));
438-
} else {
439-
continue;
440-
}
441-
}
442-
if let Some(mut stream) = state_stream.take() {
443-
if let Some(substream) = next(&mut stream).await {
444-
let substream = Box::pin(substream);
445-
state_stream = Some(stream);
446-
state_substream = Some(substream);
447-
continue;
448-
}
429+
async_stream_block! {
430+
#[for_await]
431+
for substream in stream {
432+
#[for_await]
433+
for item in substream {
434+
yield item
449435
}
450-
return None;
451-
},
452-
)
436+
}
437+
}
453438
}
454439

455440
/// Computes from this stream's items new items of a different type using
@@ -481,16 +466,14 @@ where
481466
F: FnMut(St::Item) -> Fut,
482467
Fut: Future<Output = St::Item>,
483468
{
484-
let stream = Box::pin(stream);
485-
unfold((stream, f), async move |(mut stream, mut f)| {
486-
let item = next(&mut stream).await;
487-
if let Some(item) = item {
469+
let mut f = f;
470+
async_stream_block! {
471+
#[for_await]
472+
for item in stream {
488473
let new_item = f(item).await;
489-
Some((new_item, (stream, f)))
490-
} else {
491-
None
474+
yield new_item
492475
}
493-
})
476+
}
494477
}
495478

496479
/// Creates a new stream which skips `n` items of the underlying stream.
@@ -515,22 +498,18 @@ pub fn skip<St>(stream: St, n: u64) -> impl Stream<Item = St::Item>
515498
where
516499
St: Stream,
517500
{
518-
let stream = Box::pin(stream);
519-
unfold((stream, n), async move |(mut stream, mut n)| {
520-
while n != 0 {
521-
if let Some(_) = next(&mut stream).await {
501+
let mut n = n;
502+
async_stream_block! {
503+
#[for_await]
504+
for item in stream {
505+
if n == 0 {
506+
yield item
507+
} else {
522508
n = n - 1;
523509
continue;
524-
} else {
525-
return None;
526510
}
527511
}
528-
if let Some(item) = next(&mut stream).await {
529-
Some((item, (stream, 0)))
530-
} else {
531-
None
532-
}
533-
})
512+
}
534513
}
535514

536515
/// An adapter for zipping two streams together.
@@ -559,16 +538,18 @@ where
559538
St1: Stream,
560539
St2: Stream,
561540
{
562-
let stream = Box::pin(stream);
563-
let other = Box::pin(other);
564-
unfold((stream, other), async move |(mut stream, mut other)| {
565-
let left = next(&mut stream).await;
566-
let right = next(&mut other).await;
567-
match (left, right) {
568-
(Some(left), Some(right)) => Some(((left, right), (stream, other))),
569-
_ => None,
541+
let mut stream = Box::pin(stream);
542+
let mut other = Box::pin(other);
543+
async_stream_block! {
544+
loop {
545+
let left = next(&mut stream).await;
546+
let right = next(&mut other).await;
547+
match (left, right) {
548+
(Some(left), Some(right)) => yield (left, right),
549+
_ => break,
550+
}
570551
}
571-
})
552+
}
572553
}
573554

574555
/// Adapter for chaining two stream.
@@ -598,24 +579,16 @@ pub fn chain<St>(stream: St, other: St) -> impl Stream<Item = St::Item>
598579
where
599580
St: Stream,
600581
{
601-
let stream = Box::pin(stream);
602-
let other = Box::pin(other);
603-
let start_with_first = true;
604-
unfold(
605-
(stream, other, start_with_first),
606-
async move |(mut stream, mut other, start_with_first)| {
607-
if start_with_first {
608-
if let Some(item) = next(&mut stream).await {
609-
return Some((item, (stream, other, start_with_first)));
610-
}
611-
}
612-
if let Some(item) = next(&mut other).await {
613-
Some((item, (stream, other, /* start_with_first */ false)))
614-
} else {
615-
None
616-
}
617-
},
618-
)
582+
async_stream_block! {
583+
#[for_await]
584+
for item in stream {
585+
yield item
586+
}
587+
#[for_await]
588+
for item in other {
589+
yield item
590+
}
591+
}
619592
}
620593

621594
/// Take elements from this stream while the provided asynchronous predicate
@@ -644,18 +617,17 @@ where
644617
F: FnMut(&St::Item) -> Fut,
645618
Fut: Future<Output = bool>,
646619
{
647-
let stream = Box::pin(stream);
648-
unfold((stream, f), async move |(mut stream, mut f)| {
649-
if let Some(item) = next(&mut stream).await {
620+
let mut f = f;
621+
async_stream_block! {
622+
#[for_await]
623+
for item in stream {
650624
if f(&item).await {
651-
Some((item, (stream, f)))
625+
yield item
652626
} else {
653-
None
627+
break;
654628
}
655-
} else {
656-
None
657629
}
658-
})
630+
}
659631
}
660632

661633
/// Skip elements on this stream while the provided asynchronous predicate
@@ -685,29 +657,23 @@ where
685657
F: FnMut(&St::Item) -> Fut,
686658
Fut: Future<Output = bool>,
687659
{
688-
let stream = Box::pin(stream);
689-
let should_skip = true;
690-
unfold(
691-
(stream, f, should_skip),
692-
async move |(mut stream, mut f, should_skip)| {
693-
while should_skip {
694-
if let Some(item) = next(&mut stream).await {
695-
if f(&item).await {
696-
continue;
697-
} else {
698-
return Some((item, (stream, f, /* should_skip */ false)));
699-
}
660+
let mut f = f;
661+
let mut should_skip = true;
662+
async_stream_block! {
663+
#[for_await]
664+
for item in stream {
665+
if should_skip {
666+
if f(&item).await {
667+
continue;
700668
} else {
701-
return None;
669+
should_skip = false;
670+
yield item
702671
}
703-
}
704-
if let Some(item) = next(&mut stream).await {
705-
Some((item, (stream, f, /* should_skip */ false)))
706672
} else {
707-
None
673+
yield item
708674
}
709-
},
710-
)
675+
}
676+
}
711677
}
712678

713679
/// Execute an accumulating asynchronous computation over a stream,

0 commit comments

Comments
 (0)