Skip to content

Commit

Permalink
fix: wasm websocket connect
Browse files Browse the repository at this point in the history
  • Loading branch information
appflowy committed Jan 23, 2024
1 parent 6304f20 commit 1cf3a05
Show file tree
Hide file tree
Showing 23 changed files with 288 additions and 177 deletions.
78 changes: 66 additions & 12 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

16 changes: 2 additions & 14 deletions libs/client-api-test-util/src/user.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,24 +5,12 @@ use dotenv::dotenv;
use lazy_static::lazy_static;
use uuid::Uuid;

#[cfg(not(target_arch = "wasm32"))]
lazy_static! {
pub static ref ADMIN_USER: User = {
dotenv().ok();
User {
email: std::env::var("GOTRUE_ADMIN_EMAIL").unwrap(),
password: std::env::var("GOTRUE_ADMIN_PASSWORD").unwrap(),
}
};
}

#[cfg(target_arch = "wasm32")]
lazy_static! {
pub static ref ADMIN_USER: User = {
dotenv().ok();
User {
email: "[email protected]".to_string(),
password: "password".to_string(),
email: std::env::var("GOTRUE_ADMIN_EMAIL").unwrap_or("[email protected]".to_string()),
password: std::env::var("GOTRUE_ADMIN_PASSWORD").unwrap_or("password".to_string()),
}
};
}
Expand Down
12 changes: 7 additions & 5 deletions libs/client-api/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ bytes = "1.5"
uuid = "1.6.1"
futures-util = "0.3.30"
futures-core = "0.3.30"
tokio-retry = "0.3"
parking_lot = "0.12.1"
brotli = "3.4.0"
mime_guess = "2.0.4"
Expand All @@ -45,10 +44,8 @@ database-entity.workspace = true
app-error = { workspace = true, features = ["tokio_error", "bincode_error"] }
scraper = { version = "0.17.1", optional = true }

[target.'cfg(target_arch = "wasm32")'.dependencies]
getrandom = { version = "0.2", features = ["js"]}
tokio = { workspace = true, features = ["sync"]}

[target.'cfg(not(target_arch = "wasm32"))'.dependencies]
tokio-retry = "0.3"

[target.'cfg(not(target_arch = "wasm32"))'.dependencies.tokio]
workspace = true
Expand All @@ -58,6 +55,11 @@ features = ["sync", "net"]
workspace = true
features = ["tungstenite"]

[target.'cfg(target_arch = "wasm32")'.dependencies]
wasm-bindgen-futures = "0.4.40"
getrandom = { version = "0.2", features = ["js"]}
tokio = { workspace = true, features = ["sync"]}
again = "0.1.2"

[features]
collab-sync = ["collab", "yrs"]
Expand Down
7 changes: 4 additions & 3 deletions libs/client-api/src/collab_sync/plugin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use crate::collab_sync::{SinkConfig, SyncQueue};
use tokio_stream::wrappers::WatchStream;
use tracing::trace;

use crate::platform_spawn;
use crate::ws::{ConnectState, WSConnectStateReceiver};
use yrs::updates::encoder::Encode;

Expand Down Expand Up @@ -63,7 +64,7 @@ where
);

let mut sync_state_stream = WatchStream::new(sync_queue.subscribe_sync_state());
tokio::spawn(async move {
platform_spawn(async move {
while let Some(new_state) = sync_state_stream.next().await {
if let Some(local_collab) = weak_local_collab.upgrade() {
if let Some(local_collab) = local_collab.try_lock() {
Expand All @@ -76,7 +77,7 @@ where
let sync_queue = Arc::new(sync_queue);
let weak_local_collab = collab;
let weak_sync_queue = Arc::downgrade(&sync_queue);
tokio::spawn(async move {
platform_spawn(async move {
while let Ok(connect_state) = ws_connect_state.recv().await {
match connect_state {
ConnectState::Connected => {
Expand Down Expand Up @@ -132,7 +133,7 @@ where
let object_id = self.object.object_id.clone();
let cloned_origin = origin.clone();

tokio::spawn(async move {
platform_spawn(async move {
if let Some(sync_queue) = weak_sync_queue.upgrade() {
let payload = Message::Sync(SyncMessage::Update(update)).encode_v1();
sync_queue
Expand Down
6 changes: 3 additions & 3 deletions libs/client-api/src/collab_sync/sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ use crate::collab_sync::pending_msg::{MessageState, PendingMsgQueue};
use crate::collab_sync::{SyncError, SyncObject, DEFAULT_SYNC_TIMEOUT};
use futures_util::SinkExt;

use crate::platform_spawn;
use realtime_entity::collab_msg::{CollabSinkMessage, MsgId};
use tokio::spawn;
use tokio::sync::{mpsc, oneshot, watch, Mutex};
use tokio::time::{interval, Instant, Interval};
use tracing::{debug, error, event, trace, warn};
Expand Down Expand Up @@ -96,7 +96,7 @@ where
let weak_notifier = Arc::downgrade(&notifier);
let (tx, rx) = mpsc::channel(1);
interval_runner_stop_tx = Some(tx);
spawn(IntervalRunner::new(*duration).run(weak_notifier, rx));
platform_spawn(IntervalRunner::new(*duration).run(weak_notifier, rx));
}
Self {
uid,
Expand Down Expand Up @@ -364,7 +364,7 @@ where
}

fn retry_later(weak_notifier: Weak<watch::Sender<bool>>) {
spawn(async move {
platform_spawn(async move {
interval(Duration::from_millis(100)).tick().await;
if let Some(notifier) = weak_notifier.upgrade() {
let _ = notifier.send(false);
Expand Down
8 changes: 4 additions & 4 deletions libs/client-api/src/collab_sync/sync.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use crate::collab_sync::{
CollabSink, CollabSinkRunner, SinkConfig, SinkState, SyncError, SyncObject,
};
use crate::platform_spawn;
use bytes::Bytes;
use collab::core::awareness::Awareness;
use collab::core::collab::MutexCollab;
Expand All @@ -13,7 +14,6 @@ use realtime_protocol::{Message, MessageReader, SyncMessage};
use std::marker::PhantomData;
use std::ops::Deref;
use std::sync::{Arc, Weak};
use tokio::spawn;
use tokio::sync::watch;
use tokio_stream::wrappers::WatchStream;
use tracing::{error, event, trace, warn, Level};
Expand Down Expand Up @@ -75,7 +75,7 @@ where
pause,
));

spawn(CollabSinkRunner::run(Arc::downgrade(&sink), notifier_rx));
platform_spawn(CollabSinkRunner::run(Arc::downgrade(&sink), notifier_rx));
let cloned_protocol = protocol.clone();
let object_id = object.object_id.clone();
let stream = SyncStream::new(
Expand All @@ -90,7 +90,7 @@ where
let weak_sync_state = Arc::downgrade(&sync_state);
let mut sink_state_stream = WatchStream::new(sink_state_rx);
// Subscribe the sink state stream and update the sync state in the background.
spawn(async move {
platform_spawn(async move {
while let Some(collab_state) = sink_state_stream.next().await {
if let Some(sync_state) = weak_sync_state.upgrade() {
match collab_state {
Expand Down Expand Up @@ -209,7 +209,7 @@ where
P: CollabSyncProtocol + Send + Sync + 'static,
{
let cloned_weak_collab = weak_collab.clone();
spawn(SyncStream::<Sink, Stream>::spawn_doc_stream::<P>(
platform_spawn(SyncStream::<Sink, Stream>::spawn_doc_stream::<P>(
origin,
object_id.clone(),
stream,
Expand Down
6 changes: 3 additions & 3 deletions libs/client-api/src/native/http_native.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use crate::http::log_request_id;
use crate::retry::{RefreshTokenAction, RefreshTokenRetryCondition};
use crate::ws::{WSClientHttpSender, WSError};
use crate::{spawn_blocking_brotli_compress, Client};
use crate::{RefreshTokenAction, RefreshTokenRetryCondition};
use app_error::AppError;
use async_trait::async_trait;
use database_entity::dto::CollabParams;
Expand Down Expand Up @@ -55,7 +55,7 @@ impl Client {
.into_iter()
.map(|params| {
let config = self.config.clone();
tokio::spawn(async move {
platform_spawn(async move {
let data = params.to_bytes().map_err(AppError::from)?;
spawn_blocking_brotli_compress(
data,
Expand Down Expand Up @@ -153,7 +153,7 @@ impl WSClientHttpSender for Client {
}
}

pub fn spawn<T>(future: T) -> tokio::task::JoinHandle<T::Output>
pub fn platform_spawn<T>(future: T) -> tokio::task::JoinHandle<T::Output>
where
T: Future + Send + 'static,
T::Output: Send + 'static,
Expand Down
Loading

0 comments on commit 1cf3a05

Please sign in to comment.