Skip to content

Commit a456f01

Browse files
committed
Implement stream combinators using async_stream_block and generators
1 parent 6a93397 commit a456f01

File tree

4 files changed

+109
-153
lines changed

4 files changed

+109
-153
lines changed

Cargo.toml

+1
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ maintenance = { status = "experimental" }
1919

2020
[dependencies]
2121
pin-utils = "=0.1.0-alpha.4"
22+
futures-async-stream = "0.1.0-alpha.1"
2223

2324
[dependencies.futures]
2425
version = "=0.3.0-alpha.18"

src/future.rs

+12-24
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
1-
use futures::future::Future;
1+
use core::future::Future;
22
use futures::stream::Stream;
3+
use futures_async_stream::async_stream_block;
34

45
use core::task::{Context, Poll};
56

@@ -429,22 +430,13 @@ where
429430
Fut: Future<Output = St>,
430431
St: Stream<Item = T>,
431432
{
432-
use crate::stream::next;
433-
futures::stream::unfold((Some(future), None), async move |(future, stream)| {
434-
match (future, stream) {
435-
(Some(future), None) => {
436-
let stream = future.await;
437-
let mut stream = Box::pin(stream);
438-
let item = next(&mut stream).await;
439-
item.map(|item| (item, (None, Some(stream))))
440-
}
441-
(None, Some(mut stream)) => {
442-
let item = next(&mut stream).await;
443-
item.map(|item| (item, (None, Some(stream))))
444-
}
445-
_ => unreachable!(),
433+
async_stream_block! {
434+
let stream = future.await;
435+
#[for_await]
436+
for item in stream {
437+
yield item
446438
}
447-
})
439+
}
448440
}
449441

450442
/// Convert this future into a single element stream.
@@ -469,14 +461,10 @@ pub fn into_stream<Fut>(future: Fut) -> impl Stream<Item = Fut::Output>
469461
where
470462
Fut: Future,
471463
{
472-
futures::stream::unfold(Some(future), async move |future| {
473-
if let Some(future) = future {
474-
let item = future.await;
475-
Some((item, (None)))
476-
} else {
477-
None
478-
}
479-
})
464+
async_stream_block! {
465+
let item = future.await;
466+
yield item
467+
}
480468
}
481469

482470
/// Creates a new future wrapping around a function returning [`Poll`](core::task::Poll).

src/lib.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
#![feature(async_closure, gen_future, generators)]
1+
#![feature(gen_future, generators, proc_macro_hygiene)]
22

33
pub mod future;
44
pub mod stream;

src/stream.rs

+95-128
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
1-
use futures::future::Future;
1+
use core::future::Future;
22
pub use futures::stream::Stream;
33

4+
use futures_async_stream::async_stream_block;
5+
46
use core::iter::IntoIterator;
57
use core::pin::Pin;
68

@@ -97,11 +99,13 @@ where
9799
St: Stream,
98100
F: FnMut(St::Item) -> U,
99101
{
100-
let stream = Box::pin(stream);
101-
unfold((stream, f), async move |(mut stream, mut f)| {
102-
let item = next(&mut stream).await;
103-
item.map(|item| (f(item), (stream, f)))
104-
})
102+
let mut f = f;
103+
async_stream_block! {
104+
#[for_await]
105+
for item in stream {
106+
yield f(item)
107+
}
108+
}
105109
}
106110

107111
/// Filters the values produced by this stream according to the provided
@@ -136,18 +140,15 @@ where
136140
F: FnMut(&St::Item) -> Fut,
137141
Fut: Future<Output = bool>,
138142
{
139-
let stream = Box::pin(stream);
140-
unfold((stream, f), async move |(mut stream, mut f)| {
141-
while let Some(item) = next(&mut stream).await {
142-
let matched = f(&item).await;
143-
if matched {
144-
return Some((item, (stream, f)));
145-
} else {
146-
continue;
143+
let mut f = f;
144+
async_stream_block! {
145+
#[for_await]
146+
for item in stream {
147+
if f(&item).await {
148+
yield item
147149
}
148150
}
149-
None
150-
})
151+
}
151152
}
152153

153154
/// Filters the values produced by this stream while simultaneously mapping
@@ -183,17 +184,15 @@ where
183184
F: FnMut(St::Item) -> Fut,
184185
Fut: Future<Output = Option<U>>,
185186
{
186-
let stream = Box::pin(stream);
187-
unfold((stream, f), async move |(mut stream, mut f)| {
188-
while let Some(item) = next(&mut stream).await {
187+
let mut f = f;
188+
async_stream_block! {
189+
#[for_await]
190+
for item in stream {
189191
if let Some(item) = f(item).await {
190-
return Some((item, (stream, f)));
191-
} else {
192-
continue;
192+
yield item
193193
}
194194
}
195-
None
196-
})
195+
}
197196
}
198197

199198
/// Converts this stream into a future of `(next_item, tail_of_stream)`.
@@ -367,18 +366,18 @@ pub fn take<St>(stream: St, n: u64) -> impl Stream<Item = St::Item>
367366
where
368367
St: Stream,
369368
{
370-
let stream = Box::pin(stream);
371-
unfold((stream, n), async move |(mut stream, n)| {
372-
if n == 0 {
373-
None
374-
} else {
375-
if let Some(item) = next(&mut stream).await {
376-
Some((item, (stream, n - 1)))
369+
let mut n = n;
370+
async_stream_block! {
371+
#[for_await]
372+
for item in stream {
373+
if n == 0 {
374+
break;
377375
} else {
378-
None
376+
n = n - 1;
377+
yield item
379378
}
380379
}
381-
})
380+
}
382381
}
383382

384383
/// Create a stream which produces the same item repeatedly.
@@ -430,28 +429,15 @@ where
430429
SubSt: Stream<Item = T>,
431430
St: Stream<Item = SubSt>,
432431
{
433-
let stream = Box::pin(stream);
434-
unfold(
435-
(Some(stream), None),
436-
async move |(mut state_stream, mut state_substream)| loop {
437-
if let Some(mut substream) = state_substream.take() {
438-
if let Some(item) = next(&mut substream).await {
439-
return Some((item, (state_stream, Some(substream))));
440-
} else {
441-
continue;
442-
}
443-
}
444-
if let Some(mut stream) = state_stream.take() {
445-
if let Some(substream) = next(&mut stream).await {
446-
let substream = Box::pin(substream);
447-
state_stream = Some(stream);
448-
state_substream = Some(substream);
449-
continue;
450-
}
432+
async_stream_block! {
433+
#[for_await]
434+
for substream in stream {
435+
#[for_await]
436+
for item in substream {
437+
yield item
451438
}
452-
return None;
453-
},
454-
)
439+
}
440+
}
455441
}
456442

457443
/// Computes from this stream's items new items of a different type using
@@ -483,16 +469,14 @@ where
483469
F: FnMut(St::Item) -> Fut,
484470
Fut: Future<Output = St::Item>,
485471
{
486-
let stream = Box::pin(stream);
487-
unfold((stream, f), async move |(mut stream, mut f)| {
488-
let item = next(&mut stream).await;
489-
if let Some(item) = item {
472+
let mut f = f;
473+
async_stream_block! {
474+
#[for_await]
475+
for item in stream {
490476
let new_item = f(item).await;
491-
Some((new_item, (stream, f)))
492-
} else {
493-
None
477+
yield new_item
494478
}
495-
})
479+
}
496480
}
497481

498482
/// Creates a new stream which skips `n` items of the underlying stream.
@@ -517,22 +501,18 @@ pub fn skip<St>(stream: St, n: u64) -> impl Stream<Item = St::Item>
517501
where
518502
St: Stream,
519503
{
520-
let stream = Box::pin(stream);
521-
unfold((stream, n), async move |(mut stream, mut n)| {
522-
while n != 0 {
523-
if let Some(_) = next(&mut stream).await {
504+
let mut n = n;
505+
async_stream_block! {
506+
#[for_await]
507+
for item in stream {
508+
if n == 0 {
509+
yield item
510+
} else {
524511
n = n - 1;
525512
continue;
526-
} else {
527-
return None;
528513
}
529514
}
530-
if let Some(item) = next(&mut stream).await {
531-
Some((item, (stream, 0)))
532-
} else {
533-
None
534-
}
535-
})
515+
}
536516
}
537517

538518
/// An adapter for zipping two streams together.
@@ -561,16 +541,18 @@ where
561541
St1: Stream,
562542
St2: Stream,
563543
{
564-
let stream = Box::pin(stream);
565-
let other = Box::pin(other);
566-
unfold((stream, other), async move |(mut stream, mut other)| {
567-
let left = next(&mut stream).await;
568-
let right = next(&mut other).await;
569-
match (left, right) {
570-
(Some(left), Some(right)) => Some(((left, right), (stream, other))),
571-
_ => None,
544+
let mut stream = Box::pin(stream);
545+
let mut other = Box::pin(other);
546+
async_stream_block! {
547+
loop {
548+
let left = next(&mut stream).await;
549+
let right = next(&mut other).await;
550+
match (left, right) {
551+
(Some(left), Some(right)) => yield (left, right),
552+
_ => break,
553+
}
572554
}
573-
})
555+
}
574556
}
575557

576558
/// Adapter for chaining two stream.
@@ -600,24 +582,16 @@ pub fn chain<St>(stream: St, other: St) -> impl Stream<Item = St::Item>
600582
where
601583
St: Stream,
602584
{
603-
let stream = Box::pin(stream);
604-
let other = Box::pin(other);
605-
let start_with_first = true;
606-
unfold(
607-
(stream, other, start_with_first),
608-
async move |(mut stream, mut other, start_with_first)| {
609-
if start_with_first {
610-
if let Some(item) = next(&mut stream).await {
611-
return Some((item, (stream, other, start_with_first)));
612-
}
613-
}
614-
if let Some(item) = next(&mut other).await {
615-
Some((item, (stream, other, /* start_with_first */ false)))
616-
} else {
617-
None
618-
}
619-
},
620-
)
585+
async_stream_block! {
586+
#[for_await]
587+
for item in stream {
588+
yield item
589+
}
590+
#[for_await]
591+
for item in other {
592+
yield item
593+
}
594+
}
621595
}
622596

623597
/// Take elements from this stream while the provided asynchronous predicate
@@ -646,18 +620,17 @@ where
646620
F: FnMut(&St::Item) -> Fut,
647621
Fut: Future<Output = bool>,
648622
{
649-
let stream = Box::pin(stream);
650-
unfold((stream, f), async move |(mut stream, mut f)| {
651-
if let Some(item) = next(&mut stream).await {
623+
let mut f = f;
624+
async_stream_block! {
625+
#[for_await]
626+
for item in stream {
652627
if f(&item).await {
653-
Some((item, (stream, f)))
628+
yield item
654629
} else {
655-
None
630+
break;
656631
}
657-
} else {
658-
None
659632
}
660-
})
633+
}
661634
}
662635

663636
/// Skip elements on this stream while the provided asynchronous predicate
@@ -687,29 +660,23 @@ where
687660
F: FnMut(&St::Item) -> Fut,
688661
Fut: Future<Output = bool>,
689662
{
690-
let stream = Box::pin(stream);
691-
let should_skip = true;
692-
unfold(
693-
(stream, f, should_skip),
694-
async move |(mut stream, mut f, should_skip)| {
695-
while should_skip {
696-
if let Some(item) = next(&mut stream).await {
697-
if f(&item).await {
698-
continue;
699-
} else {
700-
return Some((item, (stream, f, /* should_skip */ false)));
701-
}
663+
let mut f = f;
664+
let mut should_skip = true;
665+
async_stream_block! {
666+
#[for_await]
667+
for item in stream {
668+
if should_skip {
669+
if f(&item).await {
670+
continue;
702671
} else {
703-
return None;
672+
should_skip = false;
673+
yield item
704674
}
705-
}
706-
if let Some(item) = next(&mut stream).await {
707-
Some((item, (stream, f, /* should_skip */ false)))
708675
} else {
709-
None
676+
yield item
710677
}
711-
},
712-
)
678+
}
679+
}
713680
}
714681

715682
/// Execute an accumulating asynchronous computation over a stream,

0 commit comments

Comments
 (0)