Skip to content

Commit f37d068

Browse files
committed
Rewrite websocket example using sipper
1 parent 05618ea commit f37d068

File tree

4 files changed

+43
-68
lines changed

4 files changed

+43
-68
lines changed

examples/websocket/src/echo.rs

Lines changed: 38 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -1,73 +1,63 @@
11
pub mod server;
22

33
use iced::futures;
4-
use iced::stream;
4+
use iced::task::{sipper, Never, Sipper};
55
use iced::widget::text;
66

77
use futures::channel::mpsc;
88
use futures::sink::SinkExt;
9-
use futures::stream::{Stream, StreamExt};
9+
use futures::stream::StreamExt;
1010

1111
use async_tungstenite::tungstenite;
1212
use std::fmt;
1313

14-
pub fn connect() -> impl Stream<Item = Event> {
15-
stream::channel(100, |mut output| async move {
16-
let mut state = State::Disconnected;
17-
14+
pub fn connect() -> impl Sipper<Never, Event> {
15+
sipper(|mut output| async move {
1816
loop {
19-
match &mut state {
20-
State::Disconnected => {
21-
const ECHO_SERVER: &str = "ws://127.0.0.1:3030";
17+
const ECHO_SERVER: &str = "ws://127.0.0.1:3030";
2218

23-
match async_tungstenite::tokio::connect_async(ECHO_SERVER)
24-
.await
25-
{
26-
Ok((websocket, _)) => {
27-
let (sender, receiver) = mpsc::channel(100);
19+
let (mut websocket, mut input) =
20+
match async_tungstenite::tokio::connect_async(ECHO_SERVER).await
21+
{
22+
Ok((websocket, _)) => {
23+
let (sender, receiver) = mpsc::channel(100);
2824

29-
let _ = output
30-
.send(Event::Connected(Connection(sender)))
31-
.await;
25+
let _ = output
26+
.send(Event::Connected(Connection(sender)))
27+
.await;
3228

33-
state = State::Connected(websocket, receiver);
34-
}
35-
Err(_) => {
36-
tokio::time::sleep(
37-
tokio::time::Duration::from_secs(1),
38-
)
29+
(websocket.fuse(), receiver)
30+
}
31+
Err(_) => {
32+
tokio::time::sleep(tokio::time::Duration::from_secs(1))
3933
.await;
4034

41-
let _ = output.send(Event::Disconnected).await;
42-
}
35+
let _ = output.send(Event::Disconnected).await;
36+
37+
continue;
4338
}
44-
}
45-
State::Connected(websocket, input) => {
46-
let mut fused_websocket = websocket.by_ref().fuse();
47-
48-
futures::select! {
49-
received = fused_websocket.select_next_some() => {
50-
match received {
51-
Ok(tungstenite::Message::Text(message)) => {
52-
let _ = output.send(Event::MessageReceived(Message::User(message))).await;
53-
}
54-
Err(_) => {
55-
let _ = output.send(Event::Disconnected).await;
56-
57-
state = State::Disconnected;
58-
}
59-
Ok(_) => continue,
39+
};
40+
41+
loop {
42+
futures::select! {
43+
received = websocket.select_next_some() => {
44+
match received {
45+
Ok(tungstenite::Message::Text(message)) => {
46+
let _ = output.send(Event::MessageReceived(Message::User(message))).await;
47+
}
48+
Err(_) => {
49+
let _ = output.send(Event::Disconnected).await;
50+
break;
6051
}
52+
Ok(_) => continue,
6153
}
54+
}
6255

63-
message = input.select_next_some() => {
64-
let result = websocket.send(tungstenite::Message::Text(message.to_string())).await;
56+
message = input.select_next_some() => {
57+
let result = websocket.send(tungstenite::Message::Text(message.to_string())).await;
6558

66-
if result.is_err() {
67-
let _ = output.send(Event::Disconnected).await;
68-
69-
state = State::Disconnected;
70-
}
59+
if result.is_err() {
60+
let _ = output.send(Event::Disconnected).await;
7161
}
7262
}
7363
}
@@ -76,18 +66,6 @@ pub fn connect() -> impl Stream<Item = Event> {
7666
})
7767
}
7868

79-
#[derive(Debug)]
80-
#[allow(clippy::large_enum_variant)]
81-
enum State {
82-
Disconnected,
83-
Connected(
84-
async_tungstenite::WebSocketStream<
85-
async_tungstenite::tokio::ConnectStream,
86-
>,
87-
mpsc::Receiver<Message>,
88-
),
89-
}
90-
9169
#[derive(Debug, Clone)]
9270
pub enum Event {
9371
Connected(Connection),

futures/src/backend/native/tokio.rs

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -23,11 +23,10 @@ impl crate::Executor for Executor {
2323
pub mod time {
2424
//! Listen and react to time.
2525
use crate::core::time::{Duration, Instant};
26-
use crate::stream;
2726
use crate::subscription::Subscription;
2827
use crate::MaybeSend;
2928

30-
use futures::SinkExt;
29+
use futures::stream;
3130
use std::future::Future;
3231

3332
/// Returns a [`Subscription`] that produces messages at a set interval.
@@ -66,12 +65,12 @@ pub mod time {
6665
let f = *f;
6766
let interval = *interval;
6867

69-
stream::channel(1, move |mut output| async move {
70-
loop {
71-
let _ = output.send(f().await).await;
72-
68+
stream::unfold(0, move |i| async move {
69+
if i > 0 {
7370
tokio::time::sleep(interval).await;
7471
}
72+
73+
Some((f().await, i + 1))
7574
})
7675
})
7776
}

futures/src/lib.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@ pub mod backend;
1515
pub mod event;
1616
pub mod executor;
1717
pub mod keyboard;
18-
pub mod stream;
1918
pub mod subscription;
2019

2120
pub use executor::Executor;

src/lib.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -478,7 +478,6 @@ use iced_winit::core;
478478
use iced_winit::runtime;
479479

480480
pub use iced_futures::futures;
481-
pub use iced_futures::stream;
482481

483482
#[cfg(feature = "highlighter")]
484483
pub use iced_highlighter as highlighter;

0 commit comments

Comments
 (0)