Skip to content

Commit

Permalink
chore: replace with websocket::message
Browse files Browse the repository at this point in the history
  • Loading branch information
appflowy committed Jan 23, 2024
1 parent 9c37490 commit 1b64b71
Show file tree
Hide file tree
Showing 21 changed files with 105 additions and 349 deletions.
178 changes: 35 additions & 143 deletions Cargo.lock

Large diffs are not rendered by default.

5 changes: 1 addition & 4 deletions libs/client-api/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ mime = "0.3.17"
tokio-stream = { version = "0.1.14" }
realtime-entity = { workspace = true }
chrono = "0.4"
websocket = { workspace = true, features = ["native-tls"] }

collab = { version = "0.1.0", optional = true }
collab-entity = { version = "0.1.0" }
Expand All @@ -57,10 +58,6 @@ features = ["sync", "net"]
workspace = true
features = ["tungstenite"]

[target.'cfg(not(target_arch = "wasm32"))'.dependencies.tokio-tungstenite]
version = "0.20.1"
features = ["native-tls"]

[features]
collab-sync = ["collab", "yrs"]
test_util = ["scraper"]
Expand Down
2 changes: 2 additions & 0 deletions libs/client-api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ if_wasm! {
pub use wasm::ws_wasm::*;
}

pub mod ws;

pub mod error {
pub use shared_entity::response::AppResponseError;
pub use shared_entity::response::ErrorCode;
Expand Down
11 changes: 4 additions & 7 deletions libs/client-api/src/native/http_native.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use crate::http::log_request_id;
use crate::retry::{RefreshTokenAction, RefreshTokenRetryCondition};
use crate::{spawn_blocking_brotli_compress, Client, WSClientHttpSender, WSError};
use crate::ws::{WSClientHttpSender, WSError};
use crate::{spawn_blocking_brotli_compress, Client};
use app_error::AppError;
use async_trait::async_trait;
use database_entity::dto::CollabParams;
Expand All @@ -20,7 +21,7 @@ impl Client {
pub async fn post_realtime_msg(
&self,
device_id: &str,
msg: tokio_tungstenite::tungstenite::Message,
msg: websocket::Message,
) -> Result<(), AppResponseError> {
let device_id = device_id.to_string();
let payload =
Expand Down Expand Up @@ -143,11 +144,7 @@ impl Client {

#[async_trait]
impl WSClientHttpSender for Client {
async fn send_ws_msg(
&self,
device_id: &str,
message: tokio_tungstenite::tungstenite::Message,
) -> Result<(), WSError> {
async fn send_ws_msg(&self, device_id: &str, message: websocket::Message) -> Result<(), WSError> {
self
.post_realtime_msg(device_id, message)
.await
Expand Down
2 changes: 0 additions & 2 deletions libs/client-api/src/native/mod.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
mod http_native;
pub mod retry;
mod ws;

#[allow(unused_imports)]
pub use http_native::*;
pub use ws::*;
97 changes: 0 additions & 97 deletions libs/client-api/src/native/ws/msg.rs

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -9,20 +9,17 @@ use std::time::Duration;

use tokio::sync::broadcast::{channel, Receiver, Sender};

use crate::native::ping::ServerFixIntervalPing;
use crate::native::ws::retry::ConnectAction;
use crate::{ConnectState, ConnectStateNotify, WSError, WebSocketChannel};
use crate::ws::ping::ServerFixIntervalPing;
use crate::ws::retry::ConnectAction;
use crate::ws::{ConnectState, ConnectStateNotify, WSError, WebSocketChannel};
use realtime_entity::collab_msg::CollabMessage;
use realtime_entity::message::RealtimeMessage;
use realtime_entity::user::UserMessage;
use tokio::sync::{oneshot, Mutex};
use tokio_retry::strategy::FixedInterval;
use tokio_retry::{Condition, RetryIf};
use tokio_tungstenite::tungstenite::protocol::frame::coding::CloseCode;
use tokio_tungstenite::tungstenite::protocol::CloseFrame;
use tokio_tungstenite::tungstenite::Message;
use tokio_tungstenite::MaybeTlsStream;
use tracing::{debug, error, info, trace, warn};
use websocket::{CloseCode, CloseFrame, Message};

pub struct WSClientConfig {
/// specifies the number of messages that the channel can hold at any given
Expand Down Expand Up @@ -90,11 +87,7 @@ impl WSClient {
}
}

pub async fn connect(
&self,
addr: String,
device_id: &str,
) -> Result<Option<SocketAddr>, WSError> {
pub async fn connect(&self, addr: String, device_id: &str) -> Result<(), WSError> {
self.set_state(ConnectState::Connecting).await;

let (stop_tx, mut stop_rx) = oneshot::channel();
Expand Down Expand Up @@ -137,11 +130,6 @@ impl WSClient {
}

let ws_stream = conn_result?;
let addr = match ws_stream.get_ref() {
MaybeTlsStream::Plain(s) => s.local_addr().ok(),
_ => None,
};

self.set_state(ConnectState::Connected).await;
let (mut sink, mut stream) = ws_stream.split();
let weak_collab_channels = Arc::downgrade(&self.collab_channels);
Expand Down Expand Up @@ -257,7 +245,7 @@ impl WSClient {
}
});

Ok(addr)
Ok(())
}

/// Return a [WebSocketChannel] that can be used to send messages to the websocket. Caller should
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use reqwest::StatusCode;
use tokio_tungstenite::tungstenite::Error;
use websocket::Error;

#[derive(Debug, thiserror::Error)]
pub enum WSError {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ use std::task::{Context, Poll};
use tokio::sync::broadcast::{channel, Sender};
use tokio::sync::mpsc::{unbounded_channel, UnboundedSender};
use tokio_stream::wrappers::UnboundedReceiverStream;
use tokio_tungstenite::tungstenite::Message;
use tracing::{trace, warn};
use websocket::Message;

pub struct WebSocketChannel<T> {
object_id: String,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,11 @@
mod client;
mod error;
mod handler;
// mod msg;
pub(crate) mod ping;
mod retry;
mod state;

pub use client::*;
pub use error::*;
pub use handler::*;
// pub use msg::*;
pub use state::*;
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
use crate::{ConnectState, ConnectStateNotify};
use crate::ws::{ConnectState, ConnectStateNotify};
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::broadcast::Sender;
use tokio::sync::mpsc::Receiver;
use tokio::sync::Mutex;
use tokio_tungstenite::tungstenite::Message;
use websocket::Message;

pub(crate) struct ServerFixIntervalPing {
duration: Duration,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
use std::future::Future;
use std::pin::Pin;

use crate::WSError;
use tokio::net::TcpStream;
use crate::ws::WSError;
use tokio_retry::Action;
use tokio_tungstenite::{connect_async, MaybeTlsStream, WebSocketStream};
use tracing::info;
use websocket::{connect_async, WebSocketStream};

pub(crate) struct ConnectAction {
addr: String,
Expand All @@ -19,15 +18,15 @@ impl ConnectAction {

impl Action for ConnectAction {
type Future = Pin<Box<dyn Future<Output = Result<Self::Item, Self::Error>> + Send + Sync>>;
type Item = WebSocketStream<MaybeTlsStream<TcpStream>>;
type Item = WebSocketStream;
type Error = WSError;

fn run(&mut self) -> Self::Future {
let cloned_addr = self.addr.clone();
Box::pin(async move {
info!("🔵websocket start connecting");
match connect_async(&cloned_addr).await {
Ok((stream, _response)) => {
Ok(stream) => {
info!("🟢websocket connect success");
Ok(stream)
},
Expand Down
File renamed without changes.
1 change: 1 addition & 0 deletions libs/realtime-entity/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ database-entity.workspace = true
yrs.workspace = true
thiserror = "1.0.56"
realtime-protocol.workspace = true
websocket.workspace = true

[build-dependencies]
protoc-bin-vendored = { version = "3.0" }
Expand Down
6 changes: 1 addition & 5 deletions libs/realtime-entity/src/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use crate::collab_msg::CollabMessage;
use bytes::Bytes;
use serde::{Deserialize, Serialize};
use std::fmt::Display;
use websocket::Message;

#[derive(Debug, Clone, Serialize, Deserialize)]
#[cfg_attr(
Expand Down Expand Up @@ -71,10 +72,7 @@ impl TryFrom<Vec<u8>> for RealtimeMessage {
}

use crate::user::UserMessage;
#[cfg(feature = "tungstenite")]
use tokio_tungstenite::tungstenite::Message;

#[cfg(feature = "tungstenite")]
impl TryFrom<&Message> for RealtimeMessage {
type Error = anyhow::Error;

Expand All @@ -86,7 +84,6 @@ impl TryFrom<&Message> for RealtimeMessage {
}
}

#[cfg(feature = "tungstenite")]
impl TryFrom<Message> for RealtimeMessage {
type Error = anyhow::Error;

Expand All @@ -98,7 +95,6 @@ impl TryFrom<Message> for RealtimeMessage {
}
}

#[cfg(feature = "tungstenite")]
impl From<RealtimeMessage> for Message {
fn from(msg: RealtimeMessage) -> Self {
let bytes = bincode::serialize(&msg).unwrap_or_default();
Expand Down
4 changes: 2 additions & 2 deletions libs/websocket/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,15 @@ __rustls-tls = []

[dependencies]
thiserror = "1"
http = "1.0"
http = "0.2"
httparse = "1.8"
futures-util = { version = "0.3", default-features = false, features = [
"sink",
"std",
] }

[target.'cfg(not(target_arch = "wasm32"))'.dependencies]
tokio-tungstenite = "0.21"
tokio-tungstenite = "0.20"
tokio = { version = "1", default-features = false, features = ["net"] }

[target.'cfg(target_arch = "wasm32")'.dependencies]
Expand Down
Loading

0 comments on commit 1b64b71

Please sign in to comment.