Skip to content

Commit 25e51eb

Browse files
Web worker comms
1 parent 2339278 commit 25e51eb

File tree

4 files changed

+129
-5
lines changed

4 files changed

+129
-5
lines changed

Cargo.lock

+1
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

web/Cargo.toml

+1
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ clap = { workspace = true }
1818
console_error_panic_hook = { version = "0.1.7", optional = true }
1919
futures = { workspace = true }
2020
libp2p = { workspace = true }
21+
serde_json = "1.0.125"
2122
sp-io = { version = "30", features = ["disable_allocator", "disable_panic_handler"], default-features = false }
2223
tokio = { version = "^1", default-features = false, features = ["sync", "macros", "io-util", "rt"] }
2324
tokio_with_wasm = { version = "0.7.1", features = ["sync", "macros", "rt"] }

web/src/lib.rs

+116-5
Original file line numberDiff line numberDiff line change
@@ -2,26 +2,53 @@
22

33
use std::sync::Arc;
44

5-
use avail_light_core::data;
5+
use avail_light_core::api::configuration::SharedConfig;
6+
use avail_light_core::api::types::{PublishMessage, Request, Topic};
7+
use avail_light_core::api::v2::transactions::Submitter;
68
use avail_light_core::light_client::OutputEvent as LcEvent;
79
use avail_light_core::network::{self, p2p, rpc, Network};
810
use avail_light_core::shutdown::Controller;
911
use avail_light_core::types::{Delay, PeerAddress};
1012
use avail_light_core::utils::spawn_in_span;
13+
use avail_light_core::{api, data};
1114
use avail_rust::kate_recovery::couscous;
1215
use clap::ValueEnum;
1316
use libp2p::Multiaddr;
1417
use std::str::FromStr;
15-
use tokio::sync::{broadcast, mpsc};
18+
use tokio::sync::{
19+
broadcast,
20+
mpsc::{self, UnboundedSender},
21+
};
1622
use tokio_with_wasm::alias as tokio;
1723
use tracing::{error, info, warn};
1824
use wasm_bindgen::prelude::*;
25+
use web_sys::js_sys;
1926
use web_time::Duration;
2027

2128
#[tokio::main(flavor = "current_thread")]
2229
#[wasm_bindgen(start)]
2330
async fn main_js() {}
2431

32+
static mut SENDER: Option<UnboundedSender<String>> = None;
33+
34+
#[wasm_bindgen]
35+
pub fn post_message(message: String) {
36+
unsafe {
37+
if let Some(sender) = &SENDER {
38+
sender.send(message).expect("TODO");
39+
}
40+
}
41+
}
42+
43+
fn send_message_to_browser(message: &str) {
44+
let worker_scope = js_sys::global();
45+
worker_scope
46+
.dyn_ref::<web_sys::DedicatedWorkerGlobalScope>()
47+
.expect("Should be running in a Web Worker")
48+
.post_message(&JsValue::from_str(message))
49+
.expect("Failed to post message");
50+
}
51+
2552
#[wasm_bindgen]
2653
pub async fn run(network_param: Option<String>, bootstrap_param: Option<String>) {
2754
console_error_panic_hook::set_once();
@@ -62,6 +89,7 @@ pub async fn run(network_param: Option<String>, bootstrap_param: Option<String>)
6289

6390
let genesis_hash = &network.genesis_hash().to_string();
6491
let (rpc_event_sender, rpc_event_receiver) = broadcast::channel(1000);
92+
let mut publish_rpc_event_receiver = rpc_event_sender.subscribe();
6593

6694
let (rpc_client, rpc_subscriptions) = rpc::init(
6795
db.clone(),
@@ -75,7 +103,7 @@ pub async fn run(network_param: Option<String>, bootstrap_param: Option<String>)
75103

76104
let (id_keys, _peer_id) = p2p::identity(&cfg_libp2p, db.clone()).unwrap();
77105

78-
let (p2p_client, p2p_event_loop, _p2p_event_receiver) = p2p::init(
106+
let (p2p_client, _p2p_event_loop, _p2p_event_receiver) = p2p::init(
79107
cfg_libp2p.clone(),
80108
Default::default(),
81109
id_keys,
@@ -103,7 +131,7 @@ pub async fn run(network_param: Option<String>, bootstrap_param: Option<String>)
103131
)));
104132

105133
let (lc_sender, mut lc_receiver) = mpsc::unbounded_channel::<LcEvent>();
106-
let (block_tx, _block_rx) =
134+
let (block_tx, mut block_rx) =
107135
broadcast::channel::<avail_light_core::types::BlockVerified>(1 << 7);
108136

109137
let channels = avail_light_core::types::ClientChannels {
@@ -131,7 +159,55 @@ pub async fn run(network_param: Option<String>, bootstrap_param: Option<String>)
131159
lc_sender,
132160
));
133161

134-
tokio::task::spawn(p2p_event_loop.run());
162+
let topic = Topic::HeaderVerified;
163+
spawn_in_span(shutdown.with_cancel(async move {
164+
loop {
165+
let message = match publish_rpc_event_receiver.recv().await {
166+
Ok(value) => value,
167+
Err(error) => {
168+
error!(?topic, "Cannot receive message: {error}");
169+
return;
170+
},
171+
};
172+
let message: Option<PublishMessage> = match message.try_into() {
173+
Ok(Some(message)) => Some(message),
174+
Ok(None) => continue, // Silently skip
175+
Err(error) => {
176+
error!(?topic, "Cannot create message: {error}");
177+
continue;
178+
},
179+
};
180+
181+
let message = serde_json::to_string(&message).unwrap();
182+
send_message_to_browser(&message)
183+
}
184+
}));
185+
186+
let topic = Topic::ConfidenceAchieved;
187+
spawn_in_span(shutdown.with_cancel(async move {
188+
loop {
189+
let message = match block_rx.recv().await {
190+
Ok(value) => value,
191+
Err(error) => {
192+
error!(?topic, "Cannot receive message: {error}");
193+
return;
194+
},
195+
};
196+
let message: Option<PublishMessage> = match message.try_into() {
197+
Ok(Some(message)) => Some(message),
198+
Ok(None) => continue, // Silently skip
199+
Err(error) => {
200+
error!(?topic, "Cannot create message: {error}");
201+
continue;
202+
},
203+
};
204+
205+
let message = serde_json::to_string(&message).unwrap();
206+
send_message_to_browser(&message)
207+
}
208+
}));
209+
210+
// tokio::task::spawn(_p2p_event_loop.run());
135211

136212
let bootstraps = cfg_libp2p.bootstraps.clone();
137213
let bootstrap_p2p_client = p2p_client.clone();
@@ -151,6 +227,41 @@ pub async fn run(network_param: Option<String>, bootstrap_param: Option<String>)
151227
}
152228
}));
153229

230+
let (sender, mut receiver) = mpsc::unbounded_channel::<String>();
231+
unsafe {
232+
SENDER = Some(sender);
233+
}
234+
235+
let config = SharedConfig::default();
236+
237+
spawn_in_span(async move {
238+
loop {
239+
if let Some(message) = receiver.recv().await {
240+
info!("Received message: {message}");
241+
let request: Request = match serde_json::from_str(&message) {
242+
Ok(request) => request,
243+
Err(error) => {
244+
error!("Failed to parse request: {error}");
245+
continue;
246+
},
247+
};
248+
249+
let Ok(response) = api::v2::messages::handle_request(
250+
request,
251+
version,
252+
&config,
253+
None::<Arc<Submitter<data::DB>>>,
254+
db.clone(),
255+
)
256+
.await
257+
else {
258+
continue;
259+
};
260+
send_message_to_browser(&serde_json::to_string(&response).unwrap());
261+
}
262+
}
263+
});
264+
154265
if let Err(error) = light_client_handle.await {
155266
error!("Error running light client: {error}")
156267
};

web/www/index.html

+11
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,19 @@
77
</head>
88
<body>
99
<h1>Avail Light Client Web</h1>
10+
<!-- <script type="module" src="./avail-light.js"></script> -->
1011
<script type="module">
1112
const worker = new Worker('./avail-light.js', { type: 'module' });
13+
worker.onmessage = (event) => {
14+
console.log(event.data);
15+
let { message, topic } = JSON.parse(event.data);
16+
if (topic == "header-verified") {
17+
document.getElementById("text").innerHTML = message.block_number;
18+
};
19+
};
20+
window.worker = worker;
21+
// worker.postMessage({"type":"version","request_id":"{uuid}"});
1222
</script>
23+
<span id="text" />
1324
</body>
1425
</html>

0 commit comments

Comments
 (0)