From f37d068af52700570f863960c3432df85a244a7c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?H=C3=A9ctor=20Ram=C3=B3n=20Jim=C3=A9nez?= Date: Tue, 11 Feb 2025 01:27:51 +0100 Subject: [PATCH] Rewrite `websocket` example using `sipper` --- examples/websocket/src/echo.rs | 98 +++++++++++------------------ futures/src/backend/native/tokio.rs | 11 ++-- futures/src/lib.rs | 1 - src/lib.rs | 1 - 4 files changed, 43 insertions(+), 68 deletions(-) diff --git a/examples/websocket/src/echo.rs b/examples/websocket/src/echo.rs index 1465293643..6fa4c54a0f 100644 --- a/examples/websocket/src/echo.rs +++ b/examples/websocket/src/echo.rs @@ -1,73 +1,63 @@ pub mod server; use iced::futures; -use iced::stream; +use iced::task::{sipper, Never, Sipper}; use iced::widget::text; use futures::channel::mpsc; use futures::sink::SinkExt; -use futures::stream::{Stream, StreamExt}; +use futures::stream::StreamExt; use async_tungstenite::tungstenite; use std::fmt; -pub fn connect() -> impl Stream { - stream::channel(100, |mut output| async move { - let mut state = State::Disconnected; - +pub fn connect() -> impl Sipper { + sipper(|mut output| async move { loop { - match &mut state { - State::Disconnected => { - const ECHO_SERVER: &str = "ws://127.0.0.1:3030"; + const ECHO_SERVER: &str = "ws://127.0.0.1:3030"; - match async_tungstenite::tokio::connect_async(ECHO_SERVER) - .await - { - Ok((websocket, _)) => { - let (sender, receiver) = mpsc::channel(100); + let (mut websocket, mut input) = + match async_tungstenite::tokio::connect_async(ECHO_SERVER).await + { + Ok((websocket, _)) => { + let (sender, receiver) = mpsc::channel(100); - let _ = output - .send(Event::Connected(Connection(sender))) - .await; + let _ = output + .send(Event::Connected(Connection(sender))) + .await; - state = State::Connected(websocket, receiver); - } - Err(_) => { - tokio::time::sleep( - tokio::time::Duration::from_secs(1), - ) + (websocket.fuse(), receiver) + } + Err(_) => { + tokio::time::sleep(tokio::time::Duration::from_secs(1)) .await; - let _ = output.send(Event::Disconnected).await; - } + let _ = output.send(Event::Disconnected).await; + + continue; } - } - State::Connected(websocket, input) => { - let mut fused_websocket = websocket.by_ref().fuse(); - - futures::select! { - received = fused_websocket.select_next_some() => { - match received { - Ok(tungstenite::Message::Text(message)) => { - let _ = output.send(Event::MessageReceived(Message::User(message))).await; - } - Err(_) => { - let _ = output.send(Event::Disconnected).await; - - state = State::Disconnected; - } - Ok(_) => continue, + }; + + loop { + futures::select! { + received = websocket.select_next_some() => { + match received { + Ok(tungstenite::Message::Text(message)) => { + let _ = output.send(Event::MessageReceived(Message::User(message))).await; + } + Err(_) => { + let _ = output.send(Event::Disconnected).await; + break; } + Ok(_) => continue, } + } - message = input.select_next_some() => { - let result = websocket.send(tungstenite::Message::Text(message.to_string())).await; + message = input.select_next_some() => { + let result = websocket.send(tungstenite::Message::Text(message.to_string())).await; - if result.is_err() { - let _ = output.send(Event::Disconnected).await; - - state = State::Disconnected; - } + if result.is_err() { + let _ = output.send(Event::Disconnected).await; } } } @@ -76,18 +66,6 @@ pub fn connect() -> impl Stream { }) } -#[derive(Debug)] -#[allow(clippy::large_enum_variant)] -enum State { - Disconnected, - Connected( - async_tungstenite::WebSocketStream< - async_tungstenite::tokio::ConnectStream, - >, - mpsc::Receiver, - ), -} - #[derive(Debug, Clone)] pub enum Event { Connected(Connection), diff --git a/futures/src/backend/native/tokio.rs b/futures/src/backend/native/tokio.rs index e0be83a63a..c38ef566f4 100644 --- a/futures/src/backend/native/tokio.rs +++ b/futures/src/backend/native/tokio.rs @@ -23,11 +23,10 @@ impl crate::Executor for Executor { pub mod time { //! Listen and react to time. use crate::core::time::{Duration, Instant}; - use crate::stream; use crate::subscription::Subscription; use crate::MaybeSend; - use futures::SinkExt; + use futures::stream; use std::future::Future; /// Returns a [`Subscription`] that produces messages at a set interval. @@ -66,12 +65,12 @@ pub mod time { let f = *f; let interval = *interval; - stream::channel(1, move |mut output| async move { - loop { - let _ = output.send(f().await).await; - + stream::unfold(0, move |i| async move { + if i > 0 { tokio::time::sleep(interval).await; } + + Some((f().await, i + 1)) }) }) } diff --git a/futures/src/lib.rs b/futures/src/lib.rs index 317388234d..a874a6184c 100644 --- a/futures/src/lib.rs +++ b/futures/src/lib.rs @@ -15,7 +15,6 @@ pub mod backend; pub mod event; pub mod executor; pub mod keyboard; -pub mod stream; pub mod subscription; pub use executor::Executor; diff --git a/src/lib.rs b/src/lib.rs index e4649938af..79992a3e04 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -478,7 +478,6 @@ use iced_winit::core; use iced_winit::runtime; pub use iced_futures::futures; -pub use iced_futures::stream; #[cfg(feature = "highlighter")] pub use iced_highlighter as highlighter;