diff --git a/.github/workflows/client_api_check.yml b/.github/workflows/client_api_check.yml index 18c54c3ed..e3551b118 100644 --- a/.github/workflows/client_api_check.yml +++ b/.github/workflows/client_api_check.yml @@ -22,10 +22,17 @@ jobs: - name: Install cargo-tree run: cargo install cargo-tree + - name: Install wasm-pack + run: cargo install wasm-pack + - name: Build ClientAPI working-directory: ./libs/client-api run: cargo build + - name: Build ClientAPI WASM + working-directory: ./libs/client-api + run: wasm-pack build --features="wasm_build" + - name: Check ClientAPI Dependencies run: bash ./build/client_api_deps_check.sh diff --git a/Cargo.lock b/Cargo.lock index 5591480d2..d8ea12783 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -485,6 +485,7 @@ dependencies = [ "actix-web", "anyhow", "bincode", + "getrandom 0.2.10", "reqwest", "rust-s3", "serde", @@ -1210,6 +1211,7 @@ dependencies = [ "database-entity", "futures-core", "futures-util", + "getrandom 0.2.10", "gotrue", "gotrue-entity", "mime", @@ -2175,6 +2177,7 @@ version = "0.1.0" dependencies = [ "anyhow", "futures-util", + "getrandom 0.2.10", "gotrue-entity", "infra", "reqwest", @@ -4000,7 +4003,6 @@ dependencies = [ "js-sys", "log", "mime", - "mime_guess", "native-tls", "once_cell", "percent-encoding", @@ -6060,6 +6062,7 @@ dependencies = [ "collab-document", "collab-entity", "collab-folder", + "getrandom 0.2.10", "indexmap 2.1.0", "nanoid", "serde", diff --git a/libs/app_error/Cargo.toml b/libs/app_error/Cargo.toml index a978d1ed2..7e5afeb8f 100644 --- a/libs/app_error/Cargo.toml +++ b/libs/app_error/Cargo.toml @@ -4,6 +4,8 @@ version = "0.1.0" edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html +[lib] +crate-type = ["cdylib", "rlib"] [dependencies] thiserror = "1.0.56" @@ -18,7 +20,7 @@ url = { version = "2.5.0"} actix-web = { version = "4.4.1", optional = true } reqwest = { version = "0.11" } serde_json.workspace = true -tokio = { workspace = true, optional = true } +tokio = { workspace = true, optional = true, default-features = false } bincode = { version = "1.3.3", optional = true } [features] @@ -29,4 +31,7 @@ s3_error = ["rust-s3"] actix_web_error = ["actix-web"] tokio_error = ["tokio"] gotrue_error= [] -bincode_error = ["bincode"] \ No newline at end of file +bincode_error = ["bincode"] + +[target.'cfg(target_arch = "wasm32")'.dependencies] +getrandom = { version = "0.2", features = ["js"]} \ No newline at end of file diff --git a/libs/app_error/src/gotrue.rs b/libs/app_error/src/gotrue.rs index a54af2e70..16099556e 100644 --- a/libs/app_error/src/gotrue.rs +++ b/libs/app_error/src/gotrue.rs @@ -41,6 +41,7 @@ impl GoTrueError { impl From for GoTrueError { fn from(value: reqwest::Error) -> Self { + #[cfg(not(target_arch = "wasm32"))] if value.is_connect() { return GoTrueError::Connect(value.to_string()); } diff --git a/libs/app_error/src/lib.rs b/libs/app_error/src/lib.rs index c19233b7f..88e0ae0c9 100644 --- a/libs/app_error/src/lib.rs +++ b/libs/app_error/src/lib.rs @@ -161,6 +161,7 @@ impl AppError { impl From for AppError { fn from(error: reqwest::Error) -> Self { + #[cfg(not(target_arch = "wasm32"))] if error.is_connect() { return AppError::Connect(error.to_string()); } diff --git a/libs/client-api/Cargo.toml b/libs/client-api/Cargo.toml index 7cfffe5a3..a1619cf3e 100644 --- a/libs/client-api/Cargo.toml +++ b/libs/client-api/Cargo.toml @@ -4,51 +4,64 @@ version = "0.1.0" edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html +[lib] +crate-type = ["cdylib", "rlib"] [dependencies] -reqwest = { version = "0.11.23", default-features = false, features = ["stream","json","multipart"] } +reqwest = { version = "0.11.23", default-features = false, features = ["stream","json"] } anyhow = "1.0.79" -serde_json.workspace = true serde_repr = "0.1.18" gotrue = { path = "../gotrue" } gotrue-entity = { path = "../gotrue-entity" } shared_entity = { path = "../shared-entity" } -database-entity.workspace = true -url = "2.5.0" -tokio-stream = { version = "0.1.14" } -parking_lot = "0.12.1" -mime = "0.3.17" -app-error = { workspace = true, features = ["tokio_error", "bincode_error"] } -brotli = "3.4.0" - -# ws tracing = { version = "0.1" } thiserror = "1.0.56" -serde.workspace = true -tokio-tungstenite = { version = "0.20.1", features = ["native-tls"] } -tokio = { version = "1.35", features = ["full"] } +bytes = "1.5" +uuid = "1.6.1" futures-util = "0.3.30" futures-core = "0.3.30" tokio-retry = "0.3" -bytes = "1.5" -uuid = "1.6.1" -scraper = { version = "0.17.1", optional = true } +parking_lot = "0.12.1" +brotli = "3.4.0" +mime_guess = "2.0.4" +async-trait = { version = "0.1.77" } +prost = "0.12.3" +bincode = "1.3.3" +url = "2.5.0" +mime = "0.3.17" +tokio-stream = { version = "0.1.14" } +realtime-entity = { workspace = true } -# collab sync collab = { version = "0.1.0", optional = true } collab-entity = { version = "0.1.0" } yrs = { workspace = true, optional = true } -realtime-entity = { workspace = true, features = ["tungstenite"] } realtime-protocol = { workspace = true } workspace-template = { workspace = true, optional = true } -mime_guess = "2.0.4" -async-trait = { version = "0.1.77" } -prost = "0.12.3" -bincode = "1.3.3" +serde_json.workspace = true +serde.workspace = true +database-entity.workspace = true +app-error = { workspace = true, features = ["tokio_error", "bincode_error"] } +scraper = { version = "0.17.1", optional = true } + +[target.'cfg(target_arch = "wasm32")'.dependencies] +getrandom = { version = "0.2", features = ["js"]} +tokio = { workspace = true, features = ["sync"]} +[target.'cfg(not(target_arch = "wasm32"))'.dependencies.tokio] +workspace = true +features = ["sync", "net"] + +[target.'cfg(not(target_arch = "wasm32"))'.dependencies.realtime-entity] +workspace = true +features = ["tungstenite"] + +[target.'cfg(not(target_arch = "wasm32"))'.dependencies.tokio-tungstenite] +version = "0.20.1" +features = ["native-tls"] + [features] collab-sync = ["collab", "yrs"] test_util = ["scraper"] template = ["workspace-template"] - +wasm_build = [] diff --git a/libs/client-api/src/collab_sync/plugin.rs b/libs/client-api/src/collab_sync/plugin.rs index 84c68bc66..d62e993c3 100644 --- a/libs/client-api/src/collab_sync/plugin.rs +++ b/libs/client-api/src/collab_sync/plugin.rs @@ -12,10 +12,10 @@ use realtime_protocol::{Message, SyncMessage}; use tokio_stream::StreamExt; use crate::collab_sync::{SinkConfig, SyncQueue}; -use crate::ws::{ConnectState, WSConnectStateReceiver}; use tokio_stream::wrappers::WatchStream; use tracing::trace; +use crate::{ConnectState, WSConnectStateReceiver}; use yrs::updates::encoder::Encode; pub struct SyncPlugin { diff --git a/libs/client-api/src/http.rs b/libs/client-api/src/http.rs index 0b0f21094..5ac9c26cb 100644 --- a/libs/client-api/src/http.rs +++ b/libs/client-api/src/http.rs @@ -1,9 +1,7 @@ -use crate::entity::AFBlobRecord; use crate::notify::{ClientToken, TokenStateReceiver}; use anyhow::Context; use brotli::CompressorReader; use gotrue_entity::dto::AuthProvider; -use prost::Message as ProstMessage; use std::io::Read; use app_error::AppError; @@ -11,21 +9,20 @@ use bytes::Bytes; use database_entity::dto::{ AFCollabMember, AFCollabMembers, AFSnapshotMeta, AFSnapshotMetas, AFUserProfile, AFUserWorkspaceInfo, AFWorkspace, AFWorkspaceMember, AFWorkspaces, BatchQueryCollabParams, - BatchQueryCollabResult, CollabMemberIdentify, CollabParams, CreateCollabParams, - DeleteCollabParams, InsertCollabMemberParams, QueryCollab, QueryCollabMembers, QueryCollabParams, + BatchQueryCollabResult, CollabMemberIdentify, CreateCollabParams, DeleteCollabParams, + InsertCollabMemberParams, QueryCollab, QueryCollabMembers, QueryCollabParams, QuerySnapshotParams, SnapshotData, UpdateCollabMemberParams, }; -use futures_util::{stream, StreamExt}; +use futures_util::StreamExt; use gotrue::grant::Grant; use gotrue::grant::PasswordGrant; -use async_trait::async_trait; use gotrue::params::MagicLinkParams; use gotrue::params::{AdminUserParams, GenerateLinkParams}; use mime::Mime; use parking_lot::RwLock; use realtime_entity::EncodedCollab; -use reqwest::{header, Body, StatusCode}; +use reqwest::{header, StatusCode}; use collab_entity::CollabType; use reqwest::header::HeaderValue; @@ -38,20 +35,14 @@ use shared_entity::dto::workspace_dto::{ WorkspaceMembers, WorkspaceSpaceUsage, }; use shared_entity::response::{AppResponse, AppResponseError}; -use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::atomic::AtomicBool; use std::sync::Arc; use std::time::{Duration, SystemTime}; -use tokio_retry::strategy::FixedInterval; -use tokio_retry::RetryIf; -use tokio_tungstenite::tungstenite::Message; use tracing::{event, instrument, trace, warn}; use url::Url; -use crate::retry::{RefreshTokenAction, RefreshTokenRetryCondition}; -use crate::ws::{WSClientHttpSender, WSError}; use gotrue_entity::dto::SignUpResponse::{Authenticated, NotAuthenticated}; use gotrue_entity::dto::{GotrueTokenResponse, UpdateGotrueUserParams, User}; -use realtime_entity::realtime_proto::HttpRealtimeMessage; pub const CLIENT_API_VERSION: &str = "0.0.3"; pub const X_COMPRESSION_TYPE: &str = "X-Compression-Type"; @@ -63,10 +54,10 @@ pub struct ClientConfiguration { /// Lower Levels (0-4): Faster compression and decompression speeds but lower compression ratios. Suitable for scenarios where speed is more critical than reducing data size. /// Medium Levels (5-9): A balance between compression ratio and speed. These levels are generally good for a mix of performance and efficiency. /// Higher Levels (10-11): The highest compression ratios, but significantly slower and more resource-intensive. These are typically used in scenarios where reducing data size is paramount and resource usage is a secondary concern, such as for static content compression in web servers. - compression_quality: u32, + pub(crate) compression_quality: u32, /// A larger buffer size means more data is compressed in a single operation, which can lead to better compression ratios /// since Brotli has more data to analyze for patterns and repetitions. - compression_buffer_size: usize, + pub(crate) compression_buffer_size: usize, } impl ClientConfiguration { @@ -113,14 +104,14 @@ pub struct Client { pub(crate) gotrue_client: gotrue::api::Client, pub base_url: String, ws_addr: String, - token: Arc>, - is_refreshing_token: Arc, - refresh_ret_txs: Arc>>, - config: ClientConfiguration, + pub(crate) token: Arc>, + pub(crate) is_refreshing_token: Arc, + pub(crate) refresh_ret_txs: Arc>>, + pub(crate) config: ClientConfiguration, } -type RefreshTokenRet = tokio::sync::oneshot::Receiver>; -type RefreshTokenSender = tokio::sync::oneshot::Sender>; +pub(crate) type RefreshTokenRet = tokio::sync::oneshot::Receiver>; +pub(crate) type RefreshTokenSender = tokio::sync::oneshot::Sender>; /// Hardcoded schema in the frontend application. Do not change this value. const DESKTOP_CALLBACK_URL: &str = "appflowy-flutter://login-callback"; @@ -418,30 +409,6 @@ impl Client { ) } - #[instrument(level = "debug", skip_all, err)] - pub async fn post_realtime_msg( - &self, - device_id: &str, - msg: Message, - ) -> Result<(), AppResponseError> { - let device_id = device_id.to_string(); - let payload = - spawn_blocking_brotli_compress(msg.into_data(), 6, self.config.compression_buffer_size) - .await?; - - let msg = HttpRealtimeMessage { device_id, payload }.encode_to_vec(); - let body = Body::wrap_stream(stream::iter(vec![Ok::<_, reqwest::Error>(msg)])); - let url = format!("{}/api/realtime/post/stream", self.base_url); - let resp = self - .http_client_with_auth_compress(Method::POST, &url) - .await? - .body(body) - .send() - .await?; - log_request_id(&resp); - AppResponse::<()>::from_response(resp).await?.into_error() - } - /// Only expose this method for testing #[cfg(debug_assertions)] pub fn token(&self) -> Arc> { @@ -659,49 +626,6 @@ impl Client { Ok(is_new) } - /// Refreshes the access token using the stored refresh token. - /// - /// This function attempts to refresh the access token by sending a request to the authentication server - /// using the stored refresh token. If successful, it updates the stored access token with the new one - /// received from the server. - #[instrument(level = "debug", skip_all, err)] - pub async fn refresh_token(&self) -> Result { - let (tx, rx) = tokio::sync::oneshot::channel(); - self.refresh_ret_txs.write().push(tx); - - if !self.is_refreshing_token.load(Ordering::SeqCst) { - self.is_refreshing_token.store(true, Ordering::SeqCst); - let txs = std::mem::take(&mut *self.refresh_ret_txs.write()); - let result = self.inner_refresh_token().await; - for tx in txs { - let _ = tx.send(result.clone()); - } - self.is_refreshing_token.store(false, Ordering::SeqCst); - } - Ok(rx) - } - - async fn inner_refresh_token(&self) -> Result<(), AppResponseError> { - let retry_strategy = FixedInterval::new(Duration::from_secs(2)).take(4); - let action = RefreshTokenAction::new(self.token.clone(), self.gotrue_client.clone()); - match RetryIf::spawn(retry_strategy, action, RefreshTokenRetryCondition).await { - Ok(_) => { - event!(tracing::Level::INFO, "refresh token success"); - Ok(()) - }, - Err(err) => { - let err = AppError::from(err); - event!(tracing::Level::ERROR, "refresh token failed: {}", err); - - // If the error is an OAuth error, unset the token. - if err.is_oauth_error() { - self.token.write().unset(); - } - Err(err.into()) - }, - } - } - #[instrument(level = "debug", skip_all, err)] pub async fn sign_up(&self, email: &str, password: &str) -> Result<(), AppResponseError> { match self.gotrue_client.sign_up(email, password).await? { @@ -766,73 +690,17 @@ impl Client { ) .await?; - let resp = self + #[allow(unused_mut)] + let mut builder = self .http_client_with_auth_compress(Method::POST, &url) - .await? - .timeout(Duration::from_secs(60)) - .body(compress_bytes) - .send() .await?; - log_request_id(&resp); - AppResponse::<()>::from_response(resp).await?.into_error() - } - #[instrument(level = "debug", skip_all, err)] - pub async fn create_collab_list( - &self, - workspace_id: &str, - params_list: Vec, - ) -> Result<(), AppResponseError> { - let url = format!( - "{}/api/workspace/{}/batch/collab", - self.base_url, workspace_id - ); - - // Parallel compression - let compression_tasks: Vec<_> = params_list - .into_iter() - .map(|params| { - let config = self.config.clone(); - tokio::spawn(async move { - let data = params.to_bytes().map_err(AppError::from)?; - spawn_blocking_brotli_compress( - data, - config.compression_quality, - config.compression_buffer_size, - ) - .await - }) - }) - .collect(); - - let mut framed_data = Vec::new(); - let mut size_count = 0; - for task in compression_tasks { - let compressed = task.await??; - // The length of a u32 in bytes is 4. The server uses a u32 to read the size of each data frame, - // hence the frame size header is always 4 bytes. It's crucial not to alter this size value, - // as the server's logic for frame size reading is based on this fixed 4-byte length. - // note: - // the size of a u32 is a constant 4 bytes across all platforms that Rust supports. - let size = compressed.len() as u32; - framed_data.extend_from_slice(&size.to_be_bytes()); - framed_data.extend_from_slice(&compressed); - size_count += size; + #[cfg(not(target_arch = "wasm32"))] + { + builder = builder.timeout(Duration::from_secs(60)); } - event!( - tracing::Level::INFO, - "create batch collab with size: {}", - size_count - ); - let body = Body::wrap_stream(stream::once(async { Ok::<_, AppError>(framed_data) })); - let resp = self - .http_client_with_auth_compress(Method::POST, &url) - .await? - .timeout(Duration::from_secs(60)) - .body(body) - .send() - .await?; + let resp = builder.body(compress_bytes).send().await?; log_request_id(&resp); AppResponse::<()>::from_response(resp).await?.into_error() } @@ -1119,7 +987,7 @@ impl Client { data: T, mime: &Mime, content_length: usize, - ) -> Result { + ) -> Result { let resp = self .http_client_with_auth(Method::PUT, url) .await? @@ -1129,7 +997,7 @@ impl Client { .send() .await?; log_request_id(&resp); - AppResponse::::from_response(resp) + AppResponse::::from_response(resp) .await? .into_data() } @@ -1268,7 +1136,7 @@ impl Client { } #[instrument(level = "debug", skip_all, err)] - async fn http_client_with_auth_compress( + pub(crate) async fn http_client_with_auth_compress( &self, method: Method, url: &str, @@ -1288,23 +1156,20 @@ impl Client { ) }) } + + pub(crate) fn batch_create_collab_url(&self, workspace_id: &str) -> String { + format!( + "{}/api/workspace/{}/batch/collab", + self.base_url, workspace_id + ) + } } fn url_missing_param(param: &str) -> AppResponseError { AppError::InvalidRequest(format!("Url Missing Parameter:{}", param)).into() } -#[async_trait] -impl WSClientHttpSender for Client { - async fn send_ws_msg(&self, device_id: &str, message: Message) -> Result<(), WSError> { - self - .post_realtime_msg(device_id, message) - .await - .map_err(|err| WSError::Internal(anyhow::Error::from(err))) - } -} - -fn log_request_id(resp: &reqwest::Response) { +pub(crate) fn log_request_id(resp: &reqwest::Response) { if let Some(request_id) = resp.headers().get("x-request-id") { event!(tracing::Level::DEBUG, "request_id: {:?}", request_id); } else { diff --git a/libs/client-api/src/lib.rs b/libs/client-api/src/lib.rs index 2fac5f81a..81d09699d 100644 --- a/libs/client-api/src/lib.rs +++ b/libs/client-api/src/lib.rs @@ -1,13 +1,37 @@ mod http; +pub use http::*; + +macro_rules! if_native { + ($($item:item)*) => {$( + #[cfg(not(target_arch = "wasm32"))] + $item + )*} +} + +macro_rules! if_wasm { + ($($item:item)*) => {$( + #[cfg(target_arch = "wasm32")] + $item + )*} +} #[cfg(feature = "collab-sync")] pub mod collab_sync; pub mod notify; -mod retry; -pub mod ws; -pub use http::*; +if_native! { + mod native; + #[allow(unused_imports)] + pub use native::*; +} + +if_wasm! { + mod wasm; + #[allow(unused_imports)] + pub use wasm::*; + pub use wasm::ws_wasm::*; +} pub mod error { pub use shared_entity::response::AppResponseError; diff --git a/libs/client-api/src/native/http_native.rs b/libs/client-api/src/native/http_native.rs new file mode 100644 index 000000000..57d9b67cc --- /dev/null +++ b/libs/client-api/src/native/http_native.rs @@ -0,0 +1,156 @@ +use crate::http::log_request_id; +use crate::retry::{RefreshTokenAction, RefreshTokenRetryCondition}; +use crate::{spawn_blocking_brotli_compress, Client, WSClientHttpSender, WSError}; +use app_error::AppError; +use async_trait::async_trait; +use database_entity::dto::CollabParams; +use futures_util::stream; +use prost::Message; +use realtime_entity::realtime_proto::HttpRealtimeMessage; +use reqwest::{Body, Method}; +use shared_entity::response::{AppResponse, AppResponseError}; +use std::sync::atomic::Ordering; +use std::time::Duration; +use tokio_retry::strategy::FixedInterval; +use tokio_retry::RetryIf; +use tracing::{event, instrument}; + +impl Client { + #[instrument(level = "debug", skip_all, err)] + pub async fn post_realtime_msg( + &self, + device_id: &str, + msg: tokio_tungstenite::tungstenite::Message, + ) -> Result<(), AppResponseError> { + let device_id = device_id.to_string(); + let payload = + spawn_blocking_brotli_compress(msg.into_data(), 6, self.config.compression_buffer_size) + .await?; + + let msg = HttpRealtimeMessage { device_id, payload }.encode_to_vec(); + let body = Body::wrap_stream(stream::iter(vec![Ok::<_, reqwest::Error>(msg)])); + let url = format!("{}/api/realtime/post/stream", self.base_url); + let resp = self + .http_client_with_auth_compress(Method::POST, &url) + .await? + .body(body) + .send() + .await?; + crate::http::log_request_id(&resp); + AppResponse::<()>::from_response(resp).await?.into_error() + } + + #[instrument(level = "debug", skip_all, err)] + pub async fn create_collab_list( + &self, + workspace_id: &str, + params_list: Vec, + ) -> Result<(), AppResponseError> { + let url = self.batch_create_collab_url(workspace_id); + + // Parallel compression + let compression_tasks: Vec<_> = params_list + .into_iter() + .map(|params| { + let config = self.config.clone(); + tokio::spawn(async move { + let data = params.to_bytes().map_err(AppError::from)?; + spawn_blocking_brotli_compress( + data, + config.compression_quality, + config.compression_buffer_size, + ) + .await + }) + }) + .collect(); + + let mut framed_data = Vec::new(); + let mut size_count = 0; + for task in compression_tasks { + let compressed = task.await??; + // The length of a u32 in bytes is 4. The server uses a u32 to read the size of each data frame, + // hence the frame size header is always 4 bytes. It's crucial not to alter this size value, + // as the server's logic for frame size reading is based on this fixed 4-byte length. + // note: + // the size of a u32 is a constant 4 bytes across all platforms that Rust supports. + let size = compressed.len() as u32; + framed_data.extend_from_slice(&size.to_be_bytes()); + framed_data.extend_from_slice(&compressed); + size_count += size; + } + event!( + tracing::Level::INFO, + "create batch collab with size: {}", + size_count + ); + let body = Body::wrap_stream(stream::once(async { Ok::<_, AppError>(framed_data) })); + let resp = self + .http_client_with_auth_compress(Method::POST, &url) + .await? + .timeout(Duration::from_secs(60)) + .body(body) + .send() + .await?; + + log_request_id(&resp); + AppResponse::<()>::from_response(resp).await?.into_error() + } + + /// Refreshes the access token using the stored refresh token. + /// + /// This function attempts to refresh the access token by sending a request to the authentication server + /// using the stored refresh token. If successful, it updates the stored access token with the new one + /// received from the server. + #[instrument(level = "debug", skip_all, err)] + pub async fn refresh_token(&self) -> Result { + let (tx, rx) = tokio::sync::oneshot::channel(); + self.refresh_ret_txs.write().push(tx); + + if !self.is_refreshing_token.load(Ordering::SeqCst) { + self.is_refreshing_token.store(true, Ordering::SeqCst); + let txs = std::mem::take(&mut *self.refresh_ret_txs.write()); + let result = self.inner_refresh_token().await; + for tx in txs { + let _ = tx.send(result.clone()); + } + self.is_refreshing_token.store(false, Ordering::SeqCst); + } + Ok(rx) + } + + async fn inner_refresh_token(&self) -> Result<(), AppResponseError> { + let retry_strategy = FixedInterval::new(Duration::from_secs(2)).take(4); + let action = RefreshTokenAction::new(self.token.clone(), self.gotrue_client.clone()); + match RetryIf::spawn(retry_strategy, action, RefreshTokenRetryCondition).await { + Ok(_) => { + event!(tracing::Level::INFO, "refresh token success"); + Ok(()) + }, + Err(err) => { + let err = AppError::from(err); + event!(tracing::Level::ERROR, "refresh token failed: {}", err); + + // If the error is an OAuth error, unset the token. + if err.is_oauth_error() { + self.token.write().unset(); + } + Err(err.into()) + }, + } + } +} + +#[async_trait] +impl WSClientHttpSender for Client { + async fn send_ws_msg( + &self, + device_id: &str, + message: tokio_tungstenite::tungstenite::Message, + ) -> Result<(), WSError> { + self + .post_realtime_msg(device_id, message) + .await + .map_err(|err| WSError::Internal(anyhow::Error::from(err))) + } +} diff --git a/libs/client-api/src/native/mod.rs b/libs/client-api/src/native/mod.rs new file mode 100644 index 000000000..16cd51169 --- /dev/null +++ b/libs/client-api/src/native/mod.rs @@ -0,0 +1,7 @@ +mod http_native; +pub mod retry; +mod ws; + +#[allow(unused_imports)] +pub use http_native::*; +pub use ws::*; diff --git a/libs/client-api/src/retry.rs b/libs/client-api/src/native/retry.rs similarity index 100% rename from libs/client-api/src/retry.rs rename to libs/client-api/src/native/retry.rs diff --git a/libs/client-api/src/ws/client.rs b/libs/client-api/src/native/ws/client.rs similarity index 98% rename from libs/client-api/src/ws/client.rs rename to libs/client-api/src/native/ws/client.rs index aa108fb0b..6c71f8016 100644 --- a/libs/client-api/src/ws/client.rs +++ b/libs/client-api/src/native/ws/client.rs @@ -7,12 +7,11 @@ use std::net::SocketAddr; use std::sync::{Arc, Weak}; use std::time::Duration; -use crate::ws::ping::ServerFixIntervalPing; -use crate::ws::retry::ConnectAction; -use crate::ws::state::{ConnectState, ConnectStateNotify}; -use crate::ws::{WSError, WebSocketChannel}; use tokio::sync::broadcast::{channel, Receiver, Sender}; +use crate::native::ping::ServerFixIntervalPing; +use crate::native::ws::retry::ConnectAction; +use crate::{ConnectState, ConnectStateNotify, WSError, WebSocketChannel}; use realtime_entity::collab_msg::CollabMessage; use realtime_entity::message::RealtimeMessage; use realtime_entity::user::UserMessage; diff --git a/libs/client-api/src/ws/error.rs b/libs/client-api/src/native/ws/error.rs similarity index 100% rename from libs/client-api/src/ws/error.rs rename to libs/client-api/src/native/ws/error.rs diff --git a/libs/client-api/src/ws/handler.rs b/libs/client-api/src/native/ws/handler.rs similarity index 100% rename from libs/client-api/src/ws/handler.rs rename to libs/client-api/src/native/ws/handler.rs diff --git a/libs/client-api/src/ws/mod.rs b/libs/client-api/src/native/ws/mod.rs similarity index 100% rename from libs/client-api/src/ws/mod.rs rename to libs/client-api/src/native/ws/mod.rs diff --git a/libs/client-api/src/ws/msg.rs b/libs/client-api/src/native/ws/msg.rs similarity index 100% rename from libs/client-api/src/ws/msg.rs rename to libs/client-api/src/native/ws/msg.rs diff --git a/libs/client-api/src/ws/ping.rs b/libs/client-api/src/native/ws/ping.rs similarity index 98% rename from libs/client-api/src/ws/ping.rs rename to libs/client-api/src/native/ws/ping.rs index 0f4fdf8ac..29640d8ee 100644 --- a/libs/client-api/src/ws/ping.rs +++ b/libs/client-api/src/native/ws/ping.rs @@ -1,4 +1,4 @@ -use crate::ws::state::{ConnectState, ConnectStateNotify}; +use crate::{ConnectState, ConnectStateNotify}; use std::sync::Arc; use std::time::Duration; use tokio::sync::broadcast::Sender; diff --git a/libs/client-api/src/ws/retry.rs b/libs/client-api/src/native/ws/retry.rs similarity index 97% rename from libs/client-api/src/ws/retry.rs rename to libs/client-api/src/native/ws/retry.rs index bdc4019cc..7913cf849 100644 --- a/libs/client-api/src/ws/retry.rs +++ b/libs/client-api/src/native/ws/retry.rs @@ -1,7 +1,7 @@ use std::future::Future; use std::pin::Pin; -use crate::ws::WSError; +use crate::WSError; use tokio::net::TcpStream; use tokio_retry::Action; use tokio_tungstenite::{connect_async, MaybeTlsStream, WebSocketStream}; diff --git a/libs/client-api/src/ws/state.rs b/libs/client-api/src/native/ws/state.rs similarity index 100% rename from libs/client-api/src/ws/state.rs rename to libs/client-api/src/native/ws/state.rs diff --git a/libs/client-api/src/wasm/http_wasm.rs b/libs/client-api/src/wasm/http_wasm.rs new file mode 100644 index 000000000..825321269 --- /dev/null +++ b/libs/client-api/src/wasm/http_wasm.rs @@ -0,0 +1,60 @@ +use crate::http::RefreshTokenRet; +use crate::Client; +use app_error::gotrue::GoTrueError; +use app_error::AppError; +use async_trait::async_trait; +use database_entity::dto::CollabParams; +use gotrue::grant::{Grant, RefreshTokenGrant}; +use shared_entity::response::AppResponseError; +use std::sync::atomic::Ordering; +use std::time::Duration; +use tokio_retry::strategy::FixedInterval; +use tokio_retry::RetryIf; +use tracing::{event, instrument}; + +impl Client { + pub async fn create_collab_list( + &self, + workspace_id: &str, + params_list: Vec, + ) -> Result<(), AppResponseError> { + let _url = self.batch_create_collab_url(workspace_id); + todo!() + } + + #[instrument(level = "debug", skip_all, err)] + pub async fn refresh_token(&self) -> Result { + let (tx, rx) = tokio::sync::oneshot::channel(); + self.refresh_ret_txs.write().push(tx); + + if !self.is_refreshing_token.load(Ordering::SeqCst) { + self.is_refreshing_token.store(true, Ordering::SeqCst); + let txs = std::mem::take(&mut *self.refresh_ret_txs.write()); + let result = self.inner_refresh_token().await; + for tx in txs { + let _ = tx.send(result.clone()); + } + self.is_refreshing_token.store(false, Ordering::SeqCst); + } + Ok(rx) + } + + async fn inner_refresh_token(&self) -> Result<(), AppResponseError> { + let refresh_token = self + .token + .read() + .as_ref() + .ok_or(GoTrueError::NotLoggedIn( + "fail to refresh user token".to_owned(), + ))? + .refresh_token + .as_str() + .to_owned(); + let new_token = self + .gotrue_client + .token(&Grant::RefreshToken(RefreshTokenGrant { refresh_token })) + .await?; + self.token.write().set(new_token); + Ok(()) + } +} diff --git a/libs/client-api/src/wasm/mod.rs b/libs/client-api/src/wasm/mod.rs new file mode 100644 index 000000000..a5eb2d79e --- /dev/null +++ b/libs/client-api/src/wasm/mod.rs @@ -0,0 +1,4 @@ +mod http_wasm; +pub mod ws_wasm; + +pub use http_wasm::*; diff --git a/libs/client-api/src/wasm/ws_wasm.rs b/libs/client-api/src/wasm/ws_wasm.rs new file mode 100644 index 000000000..e69de29bb diff --git a/libs/database-entity/Cargo.toml b/libs/database-entity/Cargo.toml index c442918f7..92f1b29d4 100644 --- a/libs/database-entity/Cargo.toml +++ b/libs/database-entity/Cargo.toml @@ -4,6 +4,8 @@ version = "0.1.0" edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html +[lib] +crate-type = ["cdylib", "rlib"] [dependencies] serde.workspace = true diff --git a/libs/gotrue/Cargo.toml b/libs/gotrue/Cargo.toml index 92718bb5f..606ad8c8c 100644 --- a/libs/gotrue/Cargo.toml +++ b/libs/gotrue/Cargo.toml @@ -4,6 +4,8 @@ version = "0.1.0" edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html +[lib] +crate-type = ["cdylib", "rlib"] [dependencies] serde.workspace = true @@ -15,3 +17,6 @@ tokio = { version = "1.35.1", features = ["sync", "macros"] } infra = { path = "../infra" } gotrue-entity = { path = "../gotrue-entity" } tracing = "0.1" + +[target.'cfg(target_arch = "wasm32")'.dependencies] +getrandom = { version = "0.2", features = ["js"]} \ No newline at end of file diff --git a/libs/realtime-entity/Cargo.toml b/libs/realtime-entity/Cargo.toml index f0186fe60..16042e34d 100644 --- a/libs/realtime-entity/Cargo.toml +++ b/libs/realtime-entity/Cargo.toml @@ -4,6 +4,8 @@ version = "0.1.0" edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html +[lib] +crate-type = ["cdylib", "rlib"] [dependencies] collab = { version = "0.1.0" } @@ -21,10 +23,10 @@ yrs.workspace = true thiserror = "1.0.56" realtime-protocol.workspace = true -[features] -actix_message = ["actix"] -tungstenite = ["tokio-tungstenite"] - [build-dependencies] protoc-bin-vendored = { version = "3.0" } prost-build = "0.12.3" + +[features] +actix_message = ["actix"] +tungstenite = ["tokio-tungstenite"] diff --git a/libs/realtime-protocol/Cargo.toml b/libs/realtime-protocol/Cargo.toml index a7b4c3f00..a6c9fdd37 100644 --- a/libs/realtime-protocol/Cargo.toml +++ b/libs/realtime-protocol/Cargo.toml @@ -4,6 +4,8 @@ version = "0.1.0" edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html +[lib] +crate-type = ["cdylib", "rlib"] [dependencies] yrs.workspace = true diff --git a/libs/shared-entity/Cargo.toml b/libs/shared-entity/Cargo.toml index 127f651b3..5da8365b5 100644 --- a/libs/shared-entity/Cargo.toml +++ b/libs/shared-entity/Cargo.toml @@ -4,6 +4,8 @@ version = "0.1.0" edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html +[lib] +crate-type = ["cdylib", "rlib"] [dependencies] anyhow = "1.0.79" diff --git a/libs/workspace-template/Cargo.toml b/libs/workspace-template/Cargo.toml index 0753d213c..00c4d81e1 100644 --- a/libs/workspace-template/Cargo.toml +++ b/libs/workspace-template/Cargo.toml @@ -4,6 +4,8 @@ version = "0.1.0" edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html +[lib] +crate-type = ["cdylib", "rlib"] [dependencies] bytes.workspace = true @@ -13,9 +15,12 @@ collab-document = { version = "0.1.0"} collab-entity = { version = "0.1.0"} async-trait = "0.1.77" anyhow.workspace = true -tokio.workspace = true +tokio = { workspace = true, features = ["sync"] } uuid.workspace = true indexmap = "2.1.0" serde_json.workspace = true nanoid = "0.4.0" serde = { version = "1.0.195", features = ["derive"] } + +[target.'cfg(target_arch = "wasm32")'.dependencies] +getrandom = { version = "0.2", features = ["js"]} diff --git a/tests/main.rs b/tests/main.rs index 6ca0b568a..dd42e372a 100644 --- a/tests/main.rs +++ b/tests/main.rs @@ -45,12 +45,3 @@ pub fn localhost_client() -> Client { ClientConfiguration::default(), ) } - -pub const DEV_URL: &str = "https://test.appflowy.cloud"; -pub const DEV_WS: &str = "wss://test.appflowy.cloud/ws"; -pub const DEV_GOTRUE: &str = "https://test.appflowy.cloud/gotrue"; - -#[allow(dead_code)] -pub fn test_appflowy_cloud_client() -> Client { - Client::new(DEV_URL, DEV_WS, DEV_GOTRUE, ClientConfiguration::default()) -} diff --git a/tests/user/sign_up.rs b/tests/user/sign_up.rs index 573c6fab8..5965af174 100644 --- a/tests/user/sign_up.rs +++ b/tests/user/sign_up.rs @@ -2,7 +2,7 @@ use app_error::ErrorCode; use gotrue_entity::dto::AuthProvider; use crate::{ - localhost_client, test_appflowy_cloud_client, + localhost_client, user::utils::{generate_unique_email, generate_unique_registered_user_client}, }; @@ -63,23 +63,3 @@ async fn sign_up_oauth_not_available() { ErrorCode::InvalidOAuthProvider ); } - -#[tokio::test] -async fn sign_up_with_google_oauth() { - let c = localhost_client(); - let url = c - .generate_oauth_url_with_provider(&AuthProvider::Google) - .await - .unwrap(); - assert!(!url.is_empty()); - - let c = test_appflowy_cloud_client(); - let url = c - .generate_oauth_url_with_provider(&AuthProvider::Google) - .await - .unwrap(); - assert!(!url.is_empty()); - - // let a = r#"appflowy-flutter://#access_token=eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJleHAiOjE2OTY0MzExODAsImlhdCI6MTY5NjQyNzU4MCwic3ViIjoiZWQ4Y2RhMzUtM2Q5MC00YTdjLWI3NTEtMzA2OWQ5Nzk4ZTZiIiwiZW1haWwiOiJuYXRoYW5AYXBwZmxvd3kuaW8iLCJwaG9uZSI6IiIsImFwcF9tZXRhZGF0YSI6eyJwcm92aWRlciI6Imdvb2dsZSIsInByb3ZpZGVycyI6WyJnb29nbGUiXX0sInVzZXJfbWV0YWRhdGEiOnsiYXZhdGFyX3VybCI6Imh0dHBzOi8vbGgzLmdvb2dsZXVzZXJjb250ZW50LmNvbS9hL0FDZzhvY0lHb2tMeFE2U2dWY2F3UERwcmYxY05abVV3MU5yXzF0djR5bXlTc2VvaT1zOTYtYyIsImN1c3RvbV9jbGFpbXMiOnsiaGQiOiJhcHBmbG93eS5pbyJ9LCJlbWFpbCI6Im5hdGhhbkBhcHBmbG93eS5pbyIsImVtYWlsX3ZlcmlmaWVkIjp0cnVlLCJmdWxsX25hbWUiOiJOYXRoYW4gRm9vIiwiaXNzIjoiaHR0cHM6Ly9hY2NvdW50cy5nb29nbGUuY29tIiwibmFtZSI6Ik5hdGhhbiBGb28iLCJwaWN0dXJlIjoiaHR0cHM6Ly9saDMuZ29vZ2xldXNlcmNvbnRlbnQuY29tL2EvQUNnOG9jSUdva0x4UTZTZ1ZjYXdQRHByZjFjTlptVXcxTnJfMXR2NHlteVNzZW9pPXM5Ni1jIiwicHJvdmlkZXJfaWQiOiIxMDk0ODEzOTczMjQ4MjM2Mzk0MzUiLCJzdWIiOiIxMDk0ODEzOTczMjQ4MjM2Mzk0MzUifSwicm9sZSI6IiIsImFhbCI6ImFhbDEiLCJhbXIiOlt7Im1ldGhvZCI6Im9hdXRoIiwidGltZXN0YW1wIjoxNjk2NDI3NTgwfV0sInNlc3Npb25faWQiOiIyNzkwOGExNS02MDIxLTQ4MjctOTNhOS0wZGU3Y2EwYjg3MjgifQ.UNCSfcIVqFRRRtTkhGipEXBOleHQt35lhbMaIYLZuv4&expires_at=1696431180&expires_in=3600&provider_token=ya29.a0AfB_byDtFDX9UfiXw3IKzGTrZeebaQCxheWpqVg3tZi5jCWdKmZRBFsh7p7k0svqxaaX8rqN0lQsFeBbdGtd7KOSYtjsfcOkpHMH0d1fMSxrlyl_KkuvlkJe9q_X4SvpsJmx0VsVZ1CMypszLd4nzZitB0KotQPMzFMaCgYKAcASARESFQGOcNnC8cf9LzY-ZeirJHIwfL9r8w0170&refresh_token=tXCWxsp3cQF8U2307mGuMQ&token_type=bearer"#; - // c.sign_in_with_url(a).await.unwrap(); -} diff --git a/tests/user/update.rs b/tests/user/update.rs index e5c730fb7..892f35401 100644 --- a/tests/user/update.rs +++ b/tests/user/update.rs @@ -1,7 +1,7 @@ use crate::localhost_client; use crate::user::utils::generate_unique_registered_user_client; use app_error::ErrorCode; -use client_api::ws::{WSClient, WSClientConfig}; +use client_api::{WSClient, WSClientConfig}; use serde_json::json; use shared_entity::dto::auth_dto::{UpdateUserParams, UserMetaData}; use std::time::Duration; diff --git a/tests/util/test_client.rs b/tests/util/test_client.rs index f1a7c6905..6fbbb6f99 100644 --- a/tests/util/test_client.rs +++ b/tests/util/test_client.rs @@ -3,7 +3,7 @@ use assert_json_diff::{ }; use bytes::Bytes; use client_api::collab_sync::{SinkConfig, SyncObject, SyncPlugin}; -use client_api::ws::{WSClient, WSClientConfig}; +use client_api::{WSClient, WSClientConfig}; use collab::core::collab::MutexCollab; use collab::core::collab_plugin::EncodedCollab; use collab::core::collab_state::SyncState; diff --git a/tests/websocket/connect.rs b/tests/websocket/connect.rs index d9c157cca..942a908cf 100644 --- a/tests/websocket/connect.rs +++ b/tests/websocket/connect.rs @@ -1,5 +1,5 @@ use crate::user::utils::generate_unique_registered_user_client; -use client_api::ws::{ConnectState, WSClient, WSClientConfig}; +use client_api::{ConnectState, WSClient, WSClientConfig}; #[tokio::test] async fn realtime_connect_test() {