Skip to content

Commit dda70e1

Browse files
committed
refactor: use Task::stream instead of a channel subscription
1 parent 855ba59 commit dda70e1

File tree

30 files changed

+284
-385
lines changed

30 files changed

+284
-385
lines changed

cosmic-settings/src/app.rs

+7-48
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,7 @@ use cosmic::app::context_drawer::ContextDrawer;
2525
use cosmic::app::DbusActivationMessage;
2626
#[cfg(feature = "wayland")]
2727
use cosmic::cctk::{sctk::output::OutputInfo, wayland_client::protocol::wl_output::WlOutput};
28-
use cosmic::iced::futures::SinkExt;
29-
use cosmic::iced::{stream, Subscription};
28+
use cosmic::iced::Subscription;
3029
use cosmic::widget::{self, button, row, text_input};
3130
use cosmic::{
3231
app::{Core, Task},
@@ -65,7 +64,6 @@ pub struct SettingsApp {
6564
config: Config,
6665
core: Core,
6766
nav_model: nav_bar::Model,
68-
page_sender: Option<tokio::sync::mpsc::Sender<crate::pages::Message>>,
6967
pages: page::Binder<crate::pages::Message>,
7068
search_active: bool,
7169
search_id: cosmic::widget::Id,
@@ -144,7 +142,6 @@ impl SettingsApp {
144142
#[derive(Clone, Debug)]
145143
pub enum Message {
146144
CloseContextDrawer,
147-
DelayedInit(page::Entity),
148145
#[cfg(feature = "wayland")]
149146
DesktopInfo,
150147
Error(String),
@@ -158,7 +155,6 @@ pub enum Message {
158155
PageMessage(crate::pages::Message),
159156
#[cfg(feature = "wayland")]
160157
PanelConfig(CosmicPanelConfig),
161-
RegisterSubscriptionSender(tokio::sync::mpsc::Sender<pages::Message>),
162158
SearchActivate,
163159
SearchChanged(String),
164160
SearchClear,
@@ -193,7 +189,6 @@ impl cosmic::Application for SettingsApp {
193189
config,
194190
core,
195191
nav_model: nav_bar::Model::default(),
196-
page_sender: None,
197192
pages: page::Binder::default(),
198193
search_active: false,
199194
search_id: cosmic::widget::Id::unique(),
@@ -228,7 +223,8 @@ impl cosmic::Application for SettingsApp {
228223
}
229224
.unwrap_or(desktop_id);
230225

231-
(app, cosmic::task::message(Message::DelayedInit(active_id)))
226+
let task = app.activate_page(active_id);
227+
(app, task)
232228
}
233229

234230
fn nav_model(&self) -> Option<&nav_bar::Model> {
@@ -286,22 +282,6 @@ impl cosmic::Application for SettingsApp {
286282

287283
fn subscription(&self) -> Subscription<Message> {
288284
Subscription::batch(vec![
289-
// Creates a channel that listens to messages from pages.
290-
// The sender is given back to the application so that it may pass it on.
291-
Subscription::run_with_id(
292-
std::any::TypeId::of::<Self>(),
293-
stream::channel(4, move |mut output| async move {
294-
let (tx, mut rx) = tokio::sync::mpsc::channel::<pages::Message>(4);
295-
296-
let _res = output.send(Message::RegisterSubscriptionSender(tx)).await;
297-
298-
while let Some(event) = rx.recv().await {
299-
let _res = output.send(Message::PageMessage(event)).await;
300-
}
301-
302-
futures::future::pending::<()>().await;
303-
}),
304-
),
305285
#[cfg(feature = "ashpd")]
306286
crate::subscription::daytime().map(|daytime| {
307287
Message::PageMessage(pages::Message::Appearance(appearance::Message::Daytime(
@@ -754,19 +734,6 @@ impl cosmic::Application for SettingsApp {
754734
Message::Error(error) => {
755735
tracing::error!(error, "error occurred");
756736
}
757-
758-
Message::RegisterSubscriptionSender(sender) => {
759-
self.page_sender = Some(sender);
760-
}
761-
762-
// It is necessary to delay init to allow time for the page sender to be initialized
763-
Message::DelayedInit(active_id) => {
764-
if self.page_sender.is_none() {
765-
return cosmic::task::message(Message::DelayedInit(active_id));
766-
}
767-
768-
return self.activate_page(active_id);
769-
}
770737
}
771738

772739
Task::none()
@@ -860,17 +827,11 @@ impl SettingsApp {
860827
self.search_clear();
861828
self.search_active = false;
862829
self.activate_navbar(page);
863-
864-
let sender = self
865-
.page_sender
866-
.clone()
867-
.expect("sender should be available");
868-
869830
self.loaded_pages.insert(page);
870831

871832
let page_task = self
872833
.pages
873-
.on_enter(page, sender)
834+
.on_enter(page)
874835
.map(Message::PageMessage)
875836
.map(Into::into);
876837

@@ -1032,11 +993,9 @@ impl SettingsApp {
1032993
}
1033994
}
1034995

1035-
if let Some(ref sender) = self.page_sender {
1036-
for page in load {
1037-
self.loaded_pages.insert(page);
1038-
tasks.push(self.pages.on_enter(page, sender.clone()));
1039-
}
996+
for page in load {
997+
self.loaded_pages.insert(page);
998+
tasks.push(self.pages.on_enter(page));
1040999
}
10411000

10421001
for page in unload {

cosmic-settings/src/pages/accessibility/magnifier.rs

+17-17
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@ use cosmic_settings_page::{
1414
Entity,
1515
};
1616
use slotmap::SlotMap;
17-
use tokio::sync::mpsc;
1817
use tracing::error;
1918

2019
use super::{wayland, AccessibilityEvent, AccessibilityRequest};
@@ -114,28 +113,29 @@ impl page::Page<crate::pages::Message> for Page {
114113
])
115114
}
116115

117-
fn on_enter(
118-
&mut self,
119-
sender: mpsc::Sender<crate::pages::Message>,
120-
) -> cosmic::Task<crate::pages::Message> {
116+
fn on_enter(&mut self) -> cosmic::Task<crate::pages::Message> {
121117
if self.wayland_thread.is_none() {
122118
match wayland::spawn_wayland_connection() {
123119
Ok((tx, mut rx)) => {
124120
self.wayland_thread = Some(tx);
125-
tokio::task::spawn(async move {
126-
while let Some(event) = rx.recv().await {
127-
let _ = sender
128-
.send(crate::pages::Message::AccessibilityMagnifier(
129-
Message::Event(event),
121+
122+
return cosmic::Task::stream(async_fn_stream::fn_stream(
123+
|emitter| async move {
124+
while let Some(event) = rx.recv().await {
125+
let _ = emitter
126+
.emit(crate::pages::Message::AccessibilityMagnifier(
127+
Message::Event(event),
128+
))
129+
.await;
130+
}
131+
132+
let _ = emitter
133+
.emit(crate::pages::Message::AccessibilityMagnifier(
134+
Message::ProtocolUnavailable,
130135
))
131136
.await;
132-
}
133-
let _ = sender
134-
.send(crate::pages::Message::AccessibilityMagnifier(
135-
Message::ProtocolUnavailable,
136-
))
137-
.await;
138-
});
137+
},
138+
));
139139
}
140140
Err(err) => {
141141
tracing::warn!(

cosmic-settings/src/pages/accessibility/mod.rs

+18-16
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@ use cosmic_settings_page::{
1111
Insert,
1212
};
1313
use slotmap::SlotMap;
14-
use tokio::sync::mpsc;
1514

1615
pub mod magnifier;
1716
mod wayland;
@@ -53,27 +52,30 @@ impl page::Page<crate::pages::Message> for Page {
5352
Some(vec![sections.insert(vision())])
5453
}
5554

56-
fn on_enter(
57-
&mut self,
58-
sender: mpsc::Sender<crate::pages::Message>,
59-
) -> cosmic::Task<crate::pages::Message> {
55+
fn on_enter(&mut self) -> cosmic::Task<crate::pages::Message> {
6056
if self.wayland_thread.is_none() {
6157
match wayland::spawn_wayland_connection() {
6258
Ok((tx, mut rx)) => {
6359
self.wayland_available = true;
6460
self.wayland_thread = Some(tx);
65-
tokio::task::spawn(async move {
66-
while let Some(event) = rx.recv().await {
67-
let _ = sender
68-
.send(crate::pages::Message::Accessibility(Message::Event(event)))
61+
62+
return cosmic::Task::stream(async_fn_stream::fn_stream(
63+
|emitter| async move {
64+
while let Some(event) = rx.recv().await {
65+
let _ = emitter
66+
.emit(crate::pages::Message::Accessibility(Message::Event(
67+
event,
68+
)))
69+
.await;
70+
}
71+
72+
let _ = emitter
73+
.emit(crate::pages::Message::Accessibility(
74+
Message::ProtocolUnavailable,
75+
))
6976
.await;
70-
}
71-
let _ = sender
72-
.send(crate::pages::Message::Accessibility(
73-
Message::ProtocolUnavailable,
74-
))
75-
.await;
76-
});
77+
},
78+
));
7779
}
7880
Err(err) => {
7981
tracing::warn!(

cosmic-settings/src/pages/bluetooth/mod.rs

+40-24
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ use cosmic::{theme, Apply, Element, Task};
88
use cosmic_settings_page::{self as page, section, Section};
99
use cosmic_settings_subscriptions::bluetooth::*;
1010
use futures::channel::oneshot;
11+
use futures::StreamExt;
1112
use slab::Slab;
1213
use slotmap::SlotMap;
1314
use std::collections::{HashMap, HashSet};
@@ -69,14 +70,11 @@ impl page::Page<crate::pages::Message> for Page {
6970
])
7071
}
7172

72-
fn on_enter(
73-
&mut self,
74-
sender: tokio::sync::mpsc::Sender<crate::pages::Message>,
75-
) -> cosmic::Task<crate::pages::Message> {
73+
fn on_enter(&mut self) -> cosmic::Task<crate::pages::Message> {
7674
// TODO start stream for new device
7775
cosmic::task::future(async move {
7876
match zbus::Connection::system().await {
79-
Ok(connection) => Message::DBusConnect(connection, sender),
77+
Ok(connection) => Message::DBusConnect(connection),
8078
Err(why) => Message::DBusError(why.to_string()),
8179
}
8280
})
@@ -148,10 +146,7 @@ impl page::Page<crate::pages::Message> for Page {
148146
pub enum Message {
149147
BluetoothEvent(Event),
150148
ConnectDevice(OwnedObjectPath),
151-
DBusConnect(
152-
zbus::Connection,
153-
tokio::sync::mpsc::Sender<crate::pages::Message>,
154-
),
149+
DBusConnect(zbus::Connection),
155150
DBusError(String),
156151
DisconnectDevice(OwnedObjectPath),
157152
ForgetDevice(OwnedObjectPath),
@@ -378,26 +373,47 @@ impl Page {
378373
tracing::warn!("No DBus connection ready");
379374
}
380375

381-
Message::DBusConnect(connection, sender) => {
376+
Message::DBusConnect(connection) => {
382377
self.connection = Some(connection.clone());
383378

379+
let get_adapters_fut = get_adapters(connection.clone());
380+
384381
if self.subscription.is_none() {
385382
let connection = connection.clone();
386-
self.subscription = Some(crate::utils::forward_event_loop(
387-
sender,
388-
|response| {
389-
crate::pages::Message::Bluetooth(Message::BluetoothEvent(response))
390-
},
391-
move |tx| async move {
392-
_ = futures::join!(
393-
subscription::watch(connection.clone(), tx.clone()),
394-
agent::watch(connection, tx),
395-
);
396-
},
397-
));
398-
}
399383

400-
return cosmic::task::future(get_adapters(connection.clone()));
384+
let (cancellation, task) = crate::utils::forward_event_loop(move |emitter| {
385+
let connection = connection.clone();
386+
387+
async move {
388+
let (tx, mut rx) = futures::channel::mpsc::channel(1);
389+
390+
let watchers = std::pin::pin!(async move {
391+
_ = futures::future::join(
392+
subscription::watch(connection.clone(), tx.clone()),
393+
agent::watch(connection, tx),
394+
)
395+
.await;
396+
});
397+
398+
let forwarder = std::pin::pin!(async move {
399+
while let Some(message) = rx.next().await {
400+
_ = emitter
401+
.emit(crate::pages::Message::Bluetooth(
402+
Message::BluetoothEvent(message),
403+
))
404+
.await;
405+
}
406+
});
407+
408+
futures::future::select(watchers, forwarder).await;
409+
}
410+
});
411+
412+
self.subscription = Some(cancellation);
413+
return cosmic::task::batch(vec![cosmic::task::future(get_adapters_fut), task]);
414+
} else {
415+
return cosmic::task::future(get_adapters_fut);
416+
}
401417
}
402418
Message::PopupDevice(popup) => {
403419
self.popup_device = popup;

cosmic-settings/src/pages/desktop/appearance/mod.rs

+1-4
Original file line numberDiff line numberDiff line change
@@ -1562,10 +1562,7 @@ impl page::Page<crate::pages::Message> for Page {
15621562
.description(fl!("appearance", "desc"))
15631563
}
15641564

1565-
fn on_enter(
1566-
&mut self,
1567-
_sender: tokio::sync::mpsc::Sender<crate::pages::Message>,
1568-
) -> Task<crate::pages::Message> {
1565+
fn on_enter(&mut self) -> Task<crate::pages::Message> {
15691566
let (task, handle) = cosmic::task::batch(vec![
15701567
// Load icon themes
15711568
// cosmic::task::future(icon_themes::fetch()).map(crate::pages::Message::Appearance),

cosmic-settings/src/pages/desktop/panel/applets_inner.rs

+1-4
Original file line numberDiff line numberDiff line change
@@ -143,10 +143,7 @@ impl page::Page<crate::pages::Message> for Page {
143143
})
144144
}
145145

146-
fn on_enter(
147-
&mut self,
148-
_sender: tokio::sync::mpsc::Sender<crate::pages::Message>,
149-
) -> Task<crate::pages::Message> {
146+
fn on_enter(&mut self) -> Task<crate::pages::Message> {
150147
Task::none()
151148
}
152149
}

cosmic-settings/src/pages/desktop/wallpaper/mod.rs

+1-4
Original file line numberDiff line numberDiff line change
@@ -210,10 +210,7 @@ impl page::Page<crate::pages::Message> for Page {
210210
.description(fl!("wallpaper", "desc"))
211211
}
212212

213-
fn on_enter(
214-
&mut self,
215-
_sender: tokio::sync::mpsc::Sender<crate::pages::Message>,
216-
) -> Task<crate::pages::Message> {
213+
fn on_enter(&mut self) -> Task<crate::pages::Message> {
217214
// Check if the page is already being loaded.
218215
if self.on_enter_handle.is_some() {
219216
return Task::none();

0 commit comments

Comments
 (0)