Skip to content

Commit bd8e435

Browse files
Remove timeout from subscription stream in wasm
1 parent aabdc45 commit bd8e435

File tree

1 file changed

+51
-30
lines changed

1 file changed

+51
-30
lines changed

core/src/network/rpc/client.rs

Lines changed: 51 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -15,14 +15,18 @@ use color_eyre::{
1515
Report, Result,
1616
};
1717
use futures::{Stream, TryStreamExt};
18-
use std::{iter::Iterator, pin::Pin, sync::Arc, time::Duration};
18+
#[cfg(not(target_arch = "wasm32"))]
19+
use std::time::Duration;
20+
use std::{iter::Iterator, pin::Pin, sync::Arc};
1921
#[cfg(not(target_arch = "wasm32"))]
2022
use thiserror::Error;
2123
#[cfg(target_arch = "wasm32")]
2224
use thiserror_no_std::Error;
2325
use tokio::sync::{broadcast::Sender, RwLock};
2426
use tokio_retry::Retry;
25-
use tokio_stream::{Elapsed, StreamExt, StreamMap};
27+
#[cfg(not(target_arch = "wasm32"))]
28+
use tokio_stream::Elapsed;
29+
use tokio_stream::{StreamExt, StreamMap};
2630
use tracing::{error, info, warn};
2731

2832
use super::{configuration::RetryConfig, Node, Nodes, Subscription, WrappedProof};
@@ -117,9 +121,14 @@ impl GenesisHash {
117121
}
118122
}
119123

124+
#[cfg(not(target_arch = "wasm32"))]
120125
type SubscriptionStream =
121126
Pin<Box<dyn Stream<Item = Result<Result<Subscription, subxt::error::Error>, Elapsed>> + Send>>;
122127

128+
#[cfg(target_arch = "wasm32")]
129+
type SubscriptionStream =
130+
Pin<Box<dyn Stream<Item = Result<Subscription, subxt::error::Error>> + Send>>;
131+
123132
#[derive(Clone)]
124133
pub struct Client<T: Database> {
125134
subxt_client: Arc<RwLock<SDK>>,
@@ -425,10 +434,17 @@ impl<D: Database> Client<D> {
425434
Ok(mut stream) => {
426435
loop {
427436
match stream.next().await {
437+
#[cfg(not(target_arch = "wasm32"))]
428438
Some(Ok(Ok(item))) => {
439+
yield Ok(item);
440+
continue;
441+
},
442+
#[cfg(target_arch = "wasm32")]
443+
Some(Ok(item)) => {
429444
yield Ok(item);
430445
continue;
431446
},
447+
#[cfg(not(target_arch = "wasm32"))]
432448
Some(Ok(Err(error))) => warn!(%error, "Received error on RPC Subscription stream. Creating new connection."),
433449
Some(Err(error)) => warn!(%error, "Received error on RPC Subscription stream. Creating new connection."),
434450
None => warn!("RPC Subscription Stream exhausted. Creating new connection."),
@@ -452,39 +468,44 @@ impl<D: Database> Client<D> {
452468
}
453469

454470
async fn create_rpc_subscriptions(client: SDK) -> Result<SubscriptionStream> {
471+
// NOTE: current tokio stream implementation doesn't support timeouts on web
472+
#[cfg(not(target_arch = "wasm32"))]
455473
let timeout_in = Duration::from_secs(30);
456474

475+
let headers_stream = client
476+
.api
477+
.backend()
478+
.stream_finalized_block_headers()
479+
.await?
480+
.map_ok(|(header, _)| Subscription::Header(header))
481+
.inspect_ok(|_| info!("Received header on the stream"))
482+
.inspect_err(|error| warn!(%error, "Received error on headers stream"));
483+
457484
// Create fused Avail Header subscription
458-
let headers: SubscriptionStream = Box::pin(
459-
client
460-
.api
461-
.backend()
462-
.stream_finalized_block_headers()
463-
.await?
464-
.map_ok(|(header, _)| Subscription::Header(header))
465-
.inspect_ok(|_| info!("Received header on the stream"))
466-
.inspect_err(|error| warn!(%error, "Received error on headers stream"))
467-
.timeout(timeout_in)
468-
.fuse(),
469-
);
485+
#[cfg(not(target_arch = "wasm32"))]
486+
let headers: SubscriptionStream = Box::pin(headers_stream.timeout(timeout_in).fuse());
487+
#[cfg(target_arch = "wasm32")]
488+
let headers: SubscriptionStream = Box::pin(headers_stream.fuse());
489+
490+
let justifications_stream = client
491+
.rpc
492+
.client
493+
.subscribe(
494+
"grandpa_subscribeJustifications",
495+
rpc_params![],
496+
"grandpa_unsubscribeJustifications",
497+
)
498+
.await?
499+
.map_ok(Subscription::Justification)
500+
.inspect_ok(|_| info!("Received justification on the stream"))
501+
.inspect_err(|error| warn!(%error, "Received error on justifications stream"));
470502

503+
#[cfg(not(target_arch = "wasm32"))]
471504
// Create fused GrandpaJustification subscription
472-
let justifications: SubscriptionStream = Box::pin(
473-
client
474-
.rpc
475-
.client
476-
.subscribe(
477-
"grandpa_subscribeJustifications",
478-
rpc_params![],
479-
"grandpa_unsubscribeJustifications",
480-
)
481-
.await?
482-
.map_ok(Subscription::Justification)
483-
.inspect_ok(|_| info!("Received justification on the stream"))
484-
.inspect_err(|error| warn!(%error, "Received error on justifications stream"))
485-
.timeout(timeout_in)
486-
.fuse(),
487-
);
505+
let justifications: SubscriptionStream =
506+
Box::pin(justifications_stream.timeout(timeout_in).fuse());
507+
#[cfg(target_arch = "wasm32")]
508+
let justifications: SubscriptionStream = Box::pin(justifications_stream.fuse());
488509

489510
let mut last_stream = 0;
490511
let mut per_stream_count = 0;

0 commit comments

Comments
 (0)