From 71f458e27ced535c79e6237e8dd64aa8996e0e78 Mon Sep 17 00:00:00 2001 From: khorshuheng Date: Sat, 18 Jan 2025 16:53:51 +0800 Subject: [PATCH] chore: remove history modules --- libs/client-api/src/http_history.rs | 52 ---- libs/client-api/src/lib.rs | 1 - libs/tonic-proto/proto/history.proto | 34 +-- .../appflowy-history_deprecated/Cargo.toml | 53 ---- .../appflowy-history_deprecated/Dockerfile | 41 --- .../appflowy-history_deprecated/deploy.env | 3 - .../appflowy-history_deprecated/src/api.rs | 57 ---- .../src/application.rs | 135 --------- .../src/biz/history.rs | 196 ------------- .../src/biz/mod.rs | 3 - .../src/biz/persistence.rs | 55 ---- .../src/biz/snapshot.rs | 274 ------------------ .../appflowy-history_deprecated/src/config.rs | 112 ------- .../src/core/manager.rs | 257 ---------------- .../src/core/mod.rs | 2 - .../src/core/open_handle.rs | 268 ----------------- .../appflowy-history_deprecated/src/error.rs | 55 ---- .../appflowy-history_deprecated/src/lib.rs | 8 - .../appflowy-history_deprecated/src/main.rs | 11 - .../appflowy-history_deprecated/src/models.rs | 1 - .../src/response.rs | 62 ---- .../tests/edit_test/mock.rs | 77 ----- .../tests/edit_test/mod.rs | 2 - .../tests/edit_test/recv_update_test.rs | 123 -------- .../appflowy-history_deprecated/tests/main.rs | 3 - .../tests/stream_test/control_stream_test.rs | 204 ------------- .../tests/stream_test/encode_test.rs | 139 --------- .../tests/stream_test/mod.rs | 3 - .../tests/stream_test/update_stream_test.rs | 90 ------ .../appflowy-history_deprecated/tests/util.rs | 176 ----------- src/api/history.rs | 110 ------- src/api/mod.rs | 1 - src/application.rs | 15 - src/config/config.rs | 9 - src/state.rs | 3 - tests/sql_test/history_test.rs | 19 +- 36 files changed, 9 insertions(+), 2645 deletions(-) delete mode 100644 libs/client-api/src/http_history.rs delete mode 100644 services/appflowy-history_deprecated/Cargo.toml delete mode 100644 services/appflowy-history_deprecated/Dockerfile delete mode 100644 services/appflowy-history_deprecated/deploy.env delete mode 100644 services/appflowy-history_deprecated/src/api.rs delete mode 100644 services/appflowy-history_deprecated/src/application.rs delete mode 100644 services/appflowy-history_deprecated/src/biz/history.rs delete mode 100644 services/appflowy-history_deprecated/src/biz/mod.rs delete mode 100644 services/appflowy-history_deprecated/src/biz/persistence.rs delete mode 100644 services/appflowy-history_deprecated/src/biz/snapshot.rs delete mode 100644 services/appflowy-history_deprecated/src/config.rs delete mode 100644 services/appflowy-history_deprecated/src/core/manager.rs delete mode 100644 services/appflowy-history_deprecated/src/core/mod.rs delete mode 100644 services/appflowy-history_deprecated/src/core/open_handle.rs delete mode 100644 services/appflowy-history_deprecated/src/error.rs delete mode 100644 services/appflowy-history_deprecated/src/lib.rs delete mode 100644 services/appflowy-history_deprecated/src/main.rs delete mode 100644 services/appflowy-history_deprecated/src/models.rs delete mode 100644 services/appflowy-history_deprecated/src/response.rs delete mode 100644 services/appflowy-history_deprecated/tests/edit_test/mock.rs delete mode 100644 services/appflowy-history_deprecated/tests/edit_test/mod.rs delete mode 100644 services/appflowy-history_deprecated/tests/edit_test/recv_update_test.rs delete mode 100644 services/appflowy-history_deprecated/tests/main.rs delete mode 100644 services/appflowy-history_deprecated/tests/stream_test/control_stream_test.rs delete mode 100644 services/appflowy-history_deprecated/tests/stream_test/encode_test.rs delete mode 100644 services/appflowy-history_deprecated/tests/stream_test/mod.rs delete mode 100644 services/appflowy-history_deprecated/tests/stream_test/update_stream_test.rs delete mode 100644 services/appflowy-history_deprecated/tests/util.rs delete mode 100644 src/api/history.rs diff --git a/libs/client-api/src/http_history.rs b/libs/client-api/src/http_history.rs deleted file mode 100644 index 4c3b50372..000000000 --- a/libs/client-api/src/http_history.rs +++ /dev/null @@ -1,52 +0,0 @@ -use crate::http::log_request_id; -use crate::Client; -use client_api_entity::CollabType; -use reqwest::Method; -use shared_entity::dto::history_dto::{RepeatedSnapshotMeta, SnapshotInfo}; -use shared_entity::response::{AppResponse, AppResponseError}; - -impl Client { - pub async fn get_snapshots( - &self, - workspace_id: &str, - object_id: &str, - collab_type: CollabType, - ) -> Result { - let collab_type = collab_type.value(); - let url = format!( - "{}/api/history/{workspace_id}/{object_id}/{collab_type}", - self.base_url, - ); - let resp = self - .http_client_with_auth(Method::GET, &url) - .await? - .send() - .await?; - log_request_id(&resp); - AppResponse::::from_response(resp) - .await? - .into_data() - } - - pub async fn get_latest_history( - &self, - workspace_id: &str, - object_id: &str, - collab_type: CollabType, - ) -> Result { - let collab_type = collab_type.value(); - let url = format!( - "{}/api/history/{workspace_id}/{object_id}/{collab_type}/latest", - self.base_url, - ); - let resp = self - .http_client_with_auth(Method::GET, &url) - .await? - .send() - .await?; - log_request_id(&resp); - AppResponse::::from_response(resp) - .await? - .into_data() - } -} diff --git a/libs/client-api/src/lib.rs b/libs/client-api/src/lib.rs index 849343978..a5fd1cb66 100644 --- a/libs/client-api/src/lib.rs +++ b/libs/client-api/src/lib.rs @@ -5,7 +5,6 @@ mod http_billing; mod http_access_request; mod http_blob; mod http_collab; -mod http_history; mod http_member; mod http_publish; mod http_quick_note; diff --git a/libs/tonic-proto/proto/history.proto b/libs/tonic-proto/proto/history.proto index 6ba055104..4d9c75514 100644 --- a/libs/tonic-proto/proto/history.proto +++ b/libs/tonic-proto/proto/history.proto @@ -5,19 +5,6 @@ import "google/protobuf/wrappers.proto"; // For optional string support package history; -service History { - rpc GetSnapshots (SnapshotRequestPb) returns (RepeatedSnapshotMetaPb); - rpc GetNumOfSnapshot (SnapshotRequestPb) returns (RepeatedSnapshotInfoPb); - rpc GetLatestSnapshot (SnapshotRequestPb) returns (SingleSnapshotInfoPb); -} - -message SnapshotRequestPb { - string workspace_id = 1; - string object_id = 2; - int32 collab_type = 3; - int32 num_snapshot = 4; -} - // SnapshotMetaPB represents metadata for a snapshot. message SnapshotMetaPb { // The unique identifier for the snapshot. @@ -32,32 +19,13 @@ message SnapshotMetaPb { int64 created_at = 4; } -// A container for repeated instances of SnapshotMetaPB. -message RepeatedSnapshotMetaPb { - repeated SnapshotMetaPb items = 1; // List of SnapshotMetaPB items -} - - -// SnapshotStatePB represents the state of a snapshot, including optional dependency IDs. -message SnapshotStatePb { - string oid = 1; // Unique identifier for the snapshot - bytes doc_state = 2; // The document state as raw binary data - int32 doc_state_version = 3; // Version of the document state format - google.protobuf.StringValue deps_snapshot_id = 4; // Optional dependency snapshot ID -} - message SingleSnapshotInfoPb { HistoryStatePb history_state = 1; SnapshotMetaPb snapshot_meta = 2; } -message RepeatedSnapshotInfoPb { - HistoryStatePb history_state = 1; - repeated SnapshotMetaPb snapshots = 2; -} - message HistoryStatePb { string object_id = 1; // Unique identifier for the object bytes doc_state = 2; // The document state as raw binary data int32 doc_state_version = 3; // Version of the document state format, with decoding instructions based on version -} \ No newline at end of file +} diff --git a/services/appflowy-history_deprecated/Cargo.toml b/services/appflowy-history_deprecated/Cargo.toml deleted file mode 100644 index b7d5d65bc..000000000 --- a/services/appflowy-history_deprecated/Cargo.toml +++ /dev/null @@ -1,53 +0,0 @@ -[package] -name = "appflowy-history" -version = "0.1.0" -edition = "2021" - -# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html -[[bin]] -name = "appflowy_history" -path = "src/main.rs" - -[lib] -path = "src/lib.rs" - -[dependencies] -collab.workspace = true -collab-entity.workspace = true -tracing.workspace = true -serde.workspace = true -serde_json.workspace = true -anyhow.workspace = true -tokio = { workspace = true, features = ["rt-multi-thread", "macros", "net"] } -tokio-stream = { version = "0.1", features = ["net"] } -redis = { workspace = true, features = ["aio", "tokio-comp", "connection-manager"] } -dotenvy = "0.15.0" -axum = "0.7.4" -thiserror = "1.0.58" -tracing-subscriber = { version = "0.3", features = ["env-filter", "json"] } -tower-service = "0.3" -tower-http = { version = "0.5", features = ["cors", "fs"] } -tower = "0.4" -serde_repr = "0.1.18" -collab-stream.workspace = true -chrono = "0.4.37" -uuid = { workspace = true, features = ["v4"] } -sqlx = { workspace = true, default-features = false, features = ["runtime-tokio-rustls", "macros", "postgres", "uuid", "chrono", "migrate"] } -dashmap = "5.5.3" -infra.workspace = true -database.workspace = true -bincode.workspace = true -tonic.workspace = true -tonic-proto.workspace = true -futures = "0.3.30" -log = "0.4.20" -prost.workspace = true -arc-swap = "1.7.1" - -[dev-dependencies] -assert-json-diff = "2.0.2" -rand = "0.8.5" -serial_test = "3.0.0" - -[features] -verbose_log = [] \ No newline at end of file diff --git a/services/appflowy-history_deprecated/Dockerfile b/services/appflowy-history_deprecated/Dockerfile deleted file mode 100644 index 901494439..000000000 --- a/services/appflowy-history_deprecated/Dockerfile +++ /dev/null @@ -1,41 +0,0 @@ -FROM lukemathwalker/cargo-chef:latest-rust-1.81 as chef - -# Set the initial working directory -WORKDIR /app -RUN apt update && apt install lld clang -y - -FROM chef as planner -COPY . . - -# Compute a lock-like file for our project -RUN cargo chef prepare --recipe-path recipe.json - -FROM chef as builder - -# Update package lists and install protobuf-compiler along with other build dependencies -RUN apt update && apt install -y protobuf-compiler lld clang - -COPY --from=planner /app/recipe.json recipe.json -# Build our project dependencies -RUN cargo chef cook --release --recipe-path recipe.json - -COPY . . - -WORKDIR /app/services/appflowy-history -RUN cargo build --release --bin appflowy_history - -FROM debian:bookworm-slim AS runtime -WORKDIR /app/services/appflowy-history -RUN apt-get update -y \ - && apt-get install -y --no-install-recommends openssl \ - && apt-get install protobuf-compiler -y \ - # Clean up - && apt-get autoremove -y \ - && apt-get clean -y \ - && rm -rf /var/lib/apt/lists/* - -WORKDIR /app/ -COPY --from=builder /app/target/release/appflowy_history /usr/local/bin/appflowy_history -ENV APP_ENVIRONMENT production -ENV RUST_BACKTRACE 1 -CMD ["appflowy_history"] diff --git a/services/appflowy-history_deprecated/deploy.env b/services/appflowy-history_deprecated/deploy.env deleted file mode 100644 index 70583b27b..000000000 --- a/services/appflowy-history_deprecated/deploy.env +++ /dev/null @@ -1,3 +0,0 @@ -APPFLOWY_HISTORY_REDIS_URL= -APPFLOWY_HISTORY_DATABASE_URL= -APPFLOWY_HISTORY_DATABASE_NAME=postgres \ No newline at end of file diff --git a/services/appflowy-history_deprecated/src/api.rs b/services/appflowy-history_deprecated/src/api.rs deleted file mode 100644 index 933f5ed72..000000000 --- a/services/appflowy-history_deprecated/src/api.rs +++ /dev/null @@ -1,57 +0,0 @@ -use crate::application::AppState; -use crate::biz::history::get_snapshots; -use collab_entity::CollabType; -use tonic::{Request, Response, Status}; -use tonic_proto::history::history_server::History; -use tonic_proto::history::{RepeatedSnapshotInfoPb, SingleSnapshotInfoPb, SnapshotRequestPb}; - -pub struct HistoryImpl { - pub state: AppState, -} - -/// The `History` trait is automatically generated by the Tonic framework based on the definitions -/// provided in the `history.proto` file. -/// -/// ## Modifying RPC Methods -/// - To add new RPC methods or modify existing ones, you should edit the `history.proto` file located at: -/// `libs/tonic-proto/proto/history.proto` -/// - After updating the protobuf file, you need to regenerate the Rust source code to reflect these changes. -/// -/// ## Regenerating Code -/// - Code regeneration is handled by the `build.rs` script in the Tonic framework, which processes `.proto` files. -/// - To trigger this script and regenerate the code, run `cargo build` in the `tonic-proto` project. -/// This action rebuilds all project dependencies and updates generated code accordingly. -/// -#[tonic::async_trait] -impl History for HistoryImpl { - async fn get_snapshots( - &self, - request: Request, - ) -> Result, Status> { - let request = request.into_inner(); - let collab_type = CollabType::from(request.collab_type); - let data = get_snapshots(&request.object_id, &collab_type, &self.state.pg_pool).await?; - Ok(Response::new(data)) - } - - async fn get_num_of_snapshot( - &self, - _request: Request, - ) -> Result, Status> { - todo!() - } - - async fn get_latest_snapshot( - &self, - request: Request, - ) -> Result, Status> { - let request = request.into_inner(); - let resp = self - .state - .open_collab_manager - .get_latest_snapshot(request, &self.state.pg_pool) - .await?; - - Ok(Response::new(resp)) - } -} diff --git a/services/appflowy-history_deprecated/src/application.rs b/services/appflowy-history_deprecated/src/application.rs deleted file mode 100644 index 3277947f9..000000000 --- a/services/appflowy-history_deprecated/src/application.rs +++ /dev/null @@ -1,135 +0,0 @@ -use crate::api::HistoryImpl; -use crate::config::{Config, DatabaseSetting, Environment}; -use crate::core::manager::OpenCollabManager; -use anyhow::Error; -use collab_stream::client::CollabRedisStream; -use redis::aio::ConnectionManager; -use sqlx::postgres::PgPoolOptions; -use sqlx::PgPool; - -use std::sync::{Arc, Once}; -use std::time::Duration; -use tokio::net::TcpListener; -use tokio_stream::wrappers::TcpListenerStream; -use tonic::transport::server::Router; -use tonic::transport::Server; -use tonic_proto::history::history_server::HistoryServer; -use tower::layer::util::Identity; -use tracing::info; -use tracing::subscriber::set_global_default; -use tracing_subscriber::layer::SubscriberExt; -use tracing_subscriber::EnvFilter; - -pub async fn run_server( - listener: TcpListener, - config: Config, -) -> Result<(), Box> { - dotenvy::dotenv().ok(); - // Initialize logger - init_subscriber(&config.app_env); - info!("config loaded: {:?}", &config); - - // Start the server - info!("Starting server at: {:?}", listener.local_addr()); - let stream = TcpListenerStream::new(listener); - let server = create_app(config).await.unwrap(); - server.serve_with_incoming(stream).await?; - Ok(()) -} - -pub fn init_subscriber(app_env: &Environment) { - static START: Once = Once::new(); - START.call_once(|| { - let level = std::env::var("RUST_LOG").unwrap_or("info".to_string()); - let mut filters = vec![]; - filters.push(format!("appflowy_history={}", level)); - let env_filter = EnvFilter::new(filters.join(",")); - - let builder = tracing_subscriber::fmt() - .with_target(true) - .with_max_level(tracing::Level::TRACE) - .with_thread_ids(false) - .with_file(false); - - match app_env { - Environment::Local => { - let subscriber = builder - .with_ansi(true) - .with_target(false) - .with_file(false) - .pretty() - .finish() - .with(env_filter); - set_global_default(subscriber).unwrap(); - }, - Environment::Production => { - let subscriber = builder.json().finish().with(env_filter); - set_global_default(subscriber).unwrap(); - }, - } - }); -} - -pub async fn create_app(config: Config) -> Result, Error> { - // Postgres - info!("Preparing to run database migrations..."); - let pg_pool = get_connection_pool(&config.db_settings).await?; - migrate(&pg_pool).await?; - - // Redis - let redis_connection_manager = redis::Client::open(config.redis_url) - .expect("failed to create redis client") - .get_connection_manager() - .await - .expect("failed to get redis connection manager"); - - let collab_redis_stream = - CollabRedisStream::new_with_connection_manager(redis_connection_manager.clone()); - let open_collab_manager = Arc::new( - OpenCollabManager::new( - collab_redis_stream, - pg_pool.clone(), - &config.stream_settings, - ) - .await, - ); - - let state = AppState { - redis_client: redis_connection_manager, - open_collab_manager, - pg_pool, - }; - - let history_impl = HistoryImpl { state }; - Ok(Server::builder().add_service(HistoryServer::new(history_impl))) -} - -#[derive(Clone)] -pub struct AppState { - pub redis_client: ConnectionManager, - pub open_collab_manager: Arc, - pub pg_pool: PgPool, -} - -async fn migrate(pool: &PgPool) -> Result<(), Error> { - sqlx::migrate!("../../migrations") - .set_ignore_missing(true) - .run(pool) - .await - .map_err(|e| anyhow::anyhow!("Failed to run migrations: {}", e)) -} - -async fn get_connection_pool(setting: &DatabaseSetting) -> Result { - info!( - "Connecting to postgres database with setting: {:?}", - setting - ); - PgPoolOptions::new() - .max_connections(setting.max_connections) - .acquire_timeout(Duration::from_secs(10)) - .max_lifetime(Duration::from_secs(30 * 60)) - .idle_timeout(Duration::from_secs(30)) - .connect_with(setting.with_db()) - .await - .map_err(|e| anyhow::anyhow!("Failed to connect to postgres database: {}", e)) -} diff --git a/services/appflowy-history_deprecated/src/biz/history.rs b/services/appflowy-history_deprecated/src/biz/history.rs deleted file mode 100644 index 47b0104b1..000000000 --- a/services/appflowy-history_deprecated/src/biz/history.rs +++ /dev/null @@ -1,196 +0,0 @@ -use anyhow::anyhow; -use collab::core::collab_plugin::CollabPluginType; -use collab::lock::RwLock; -use collab::preclude::updates::encoder::{Encoder, EncoderV2}; -use collab::preclude::{Collab, CollabPlugin, ReadTxn, Snapshot, StateVector, TransactionMut}; -use collab_entity::CollabType; -use database::history::ops::get_snapshot_meta_list; -use serde_json::Value; -use sqlx::PgPool; -use std::sync::Arc; -use tonic_proto::history::{RepeatedSnapshotMetaPb, SnapshotMetaPb}; - -use crate::biz::snapshot::{ - calculate_edit_count, CollabSnapshot, CollabSnapshotState, SnapshotGenerator, -}; -use crate::error::HistoryError; - -pub struct CollabHistory { - pub(crate) object_id: String, - collab: Arc>, - collab_type: CollabType, - snapshot_generator: SnapshotGenerator, -} - -impl CollabHistory { - pub async fn new(object_id: &str, collab: Arc>, collab_type: CollabType) -> Self { - let current_edit_count = { - let read_guard = collab.read().await; - let txn = read_guard.transact(); - calculate_edit_count(&txn) - }; - - #[cfg(feature = "verbose_log")] - tracing::trace!( - "[History] object:{} init edit count: {}", - object_id, - current_edit_count - ); - - let snapshot_generator = SnapshotGenerator::new( - object_id, - Arc::downgrade(&collab), - collab_type.clone(), - current_edit_count as u32, - ); - collab.read().await.add_plugin(Box::new(CountUpdatePlugin { - snapshot_generator: snapshot_generator.clone(), - })); - collab.write().await.initialize(); - - Self { - object_id: object_id.to_string(), - snapshot_generator, - collab, - collab_type, - } - } - - pub async fn generate_snapshot_if_empty(&self) { - if !self.snapshot_generator.has_snapshot().await { - self.snapshot_generator.generate().await; - } - } - - pub async fn gen_history( - &self, - min_snapshot_required: Option, - ) -> Result, HistoryError> { - if let Some(min_snapshot_required) = min_snapshot_required { - let num_snapshot = self.snapshot_generator.num_pending_snapshots().await; - if num_snapshot < min_snapshot_required { - #[cfg(feature = "verbose_log")] - tracing::trace!( - "[History]: {} current snapshot:{}, minimum required:{}", - self.object_id, - num_snapshot, - min_snapshot_required - ); - return Ok(None); - } - } - - let collab = self.collab.clone(); - let timestamp = chrono::Utc::now().timestamp(); - let snapshots: Vec = self.snapshot_generator.consume_pending_snapshots().await - .into_iter() - // Remove the snapshots which created_at is bigger than the current timestamp - .filter(|snapshot| snapshot.created_at <= timestamp) - .collect(); - - // If there are no snapshots, we don't need to generate a new snapshot - if snapshots.is_empty() { - #[cfg(feature = "verbose_log")] - tracing::trace!("[History]: {} has no snapshots", self.object_id,); - return Ok(None); - } - let collab_type = self.collab_type.clone(); - let object_id = self.object_id.clone(); - let (doc_state, state_vector) = tokio::task::spawn_blocking(move || { - let lock = collab.blocking_read(); - let result = collab_type.validate_require_data(&lock); - match result { - Ok(_) => { - let txn = lock.transact(); - let doc_state_v2 = txn.encode_state_as_update_v2(&StateVector::default()); - let state_vector = txn.state_vector(); - Ok::<_, HistoryError>((doc_state_v2, state_vector)) - }, - Err(err) => Err::<_, HistoryError>(HistoryError::Internal(anyhow!( - "Failed to validate {}:{} required data: {}", - object_id, - collab_type, - err - ))), - } - }) - .await - .map_err(|err| HistoryError::Internal(err.into()))??; - - let collab_type = self.collab_type.clone(); - let object_id = self.object_id.clone(); - let state = CollabSnapshotState::new( - object_id, - doc_state, - 2, - state_vector, - chrono::Utc::now().timestamp(), - ); - Ok(Some(HistoryContext { - collab_type, - state, - snapshots, - })) - } - - /// Encode the state of the collab as Update. - /// We encode the collaboration state as an update using the v2 format, chosen over the v1 format - /// due to its reduced data size. This optimization helps in minimizing the storage and - /// transmission overhead, making the process more efficient. - pub async fn encode_update_v2(&self, snapshot: &Snapshot) -> Result, HistoryError> { - let lock = self.collab.read().await; - let txn = lock.transact(); - let mut encoder = EncoderV2::new(); - txn - .encode_state_from_snapshot(snapshot, &mut encoder) - .map_err(|err| HistoryError::Internal(err.into()))?; - Ok(encoder.to_vec()) - } - - #[cfg(debug_assertions)] - pub async fn json(&self) -> Value { - let lock_guard = self.collab.read().await; - lock_guard.to_json_value() - } -} - -pub struct HistoryContext { - pub collab_type: CollabType, - pub state: CollabSnapshotState, - pub snapshots: Vec, -} - -struct CountUpdatePlugin { - snapshot_generator: SnapshotGenerator, -} -impl CollabPlugin for CountUpdatePlugin { - fn receive_update(&self, _object_id: &str, txn: &TransactionMut, _update: &[u8]) { - self.snapshot_generator.did_apply_update(txn); - } - - fn plugin_type(&self) -> CollabPluginType { - CollabPluginType::Other("CountUpdatePlugin".to_string()) - } -} - -pub async fn get_snapshots( - object_id: &str, - collab_type: &CollabType, - pg_pool: &PgPool, -) -> Result { - let metas = get_snapshot_meta_list(object_id, collab_type, pg_pool) - .await - .unwrap(); - - let metas = metas - .into_iter() - .map(|meta| SnapshotMetaPb { - oid: meta.oid, - snapshot: meta.snapshot, - snapshot_version: meta.snapshot_version, - created_at: meta.created_at, - }) - .collect::>(); - - Ok(RepeatedSnapshotMetaPb { items: metas }) -} diff --git a/services/appflowy-history_deprecated/src/biz/mod.rs b/services/appflowy-history_deprecated/src/biz/mod.rs deleted file mode 100644 index 09d8b8688..000000000 --- a/services/appflowy-history_deprecated/src/biz/mod.rs +++ /dev/null @@ -1,3 +0,0 @@ -pub mod history; -pub(crate) mod persistence; -pub mod snapshot; diff --git a/services/appflowy-history_deprecated/src/biz/persistence.rs b/services/appflowy-history_deprecated/src/biz/persistence.rs deleted file mode 100644 index f3ca0fecd..000000000 --- a/services/appflowy-history_deprecated/src/biz/persistence.rs +++ /dev/null @@ -1,55 +0,0 @@ -use crate::biz::snapshot::{CollabSnapshot, CollabSnapshotState}; -use crate::error::HistoryError; -use collab_entity::CollabType; -use database::history::ops::insert_history; -use sqlx::PgPool; -use tonic_proto::history::SnapshotMetaPb; -use tracing::info; -use uuid::Uuid; - -pub struct HistoryPersistence { - workspace_id: Uuid, - pg_pool: PgPool, -} - -impl HistoryPersistence { - pub fn new(workspace_id: Uuid, pg_pool: PgPool) -> Self { - Self { - workspace_id, - pg_pool, - } - } - pub async fn insert_history( - &self, - state: CollabSnapshotState, - snapshots: Vec, - collab_type: CollabType, - ) -> Result<(), HistoryError> { - info!( - "[History]: write {}:{}: {} snapshots, doc state len:{}", - state.object_id, - collab_type, - snapshots.len(), - state.doc_state.len(), - ); - - let snapshots = snapshots - .into_iter() - .map(SnapshotMetaPb::from) - .collect::>(); - - insert_history( - &self.workspace_id, - &state.object_id, - state.doc_state, - state.doc_state_version, - None, - collab_type, - state.created_at, - snapshots, - self.pg_pool.clone(), - ) - .await?; - Ok(()) - } -} diff --git a/services/appflowy-history_deprecated/src/biz/snapshot.rs b/services/appflowy-history_deprecated/src/biz/snapshot.rs deleted file mode 100644 index 4b4a9c436..000000000 --- a/services/appflowy-history_deprecated/src/biz/snapshot.rs +++ /dev/null @@ -1,274 +0,0 @@ -use arc_swap::ArcSwap; -use collab::lock::{Mutex, RwLock}; -use collab::preclude::updates::encoder::Encode; -use collab::preclude::{Collab, ReadTxn, Snapshot, StateVector}; -use collab_entity::CollabType; - -use std::ops::Deref; - -use std::sync::{Arc, Weak}; -use tracing::warn; - -use tonic_proto::history::SnapshotMetaPb; - -#[derive(Clone)] -pub struct SnapshotGenerator { - object_id: String, - mutex_collab: Weak>, - collab_type: CollabType, - current_update_count: Arc>, - prev_edit_count: Arc>, - pending_snapshots: Arc>>, -} - -impl SnapshotGenerator { - pub fn new( - object_id: &str, - mutex_collab: Weak>, - collab_type: CollabType, - edit_count: u32, - ) -> Self { - Self { - object_id: object_id.to_string(), - mutex_collab, - collab_type, - current_update_count: Arc::new(ArcSwap::new(Arc::new(edit_count))), - prev_edit_count: Arc::new(ArcSwap::new(Arc::new(edit_count))), - pending_snapshots: Default::default(), - } - } - - pub async fn consume_pending_snapshots(&self) -> Vec { - //FIXME: this should be either a channel or lockless immutable queue - let mut lock = self.pending_snapshots.lock().await; - std::mem::take(&mut *lock) - } - - pub async fn has_snapshot(&self) -> bool { - !self.pending_snapshots.lock().await.is_empty() - } - - pub async fn num_pending_snapshots(&self) -> usize { - self.pending_snapshots.lock().await.len() - } - - /// Generate a snapshot if the current edit count is not zero. - pub async fn generate(&self) { - if let Some(collab) = self.mutex_collab.upgrade() { - let current = self.current_update_count.load_full(); - let prev = self.prev_edit_count.load_full(); - if current < prev { - return; - } - let threshold_count = *current - *prev; - if threshold_count > snapshot_min_edit_threshold(&self.collab_type) { - self - .prev_edit_count - .store(self.current_update_count.load_full()); - - #[cfg(feature = "verbose_log")] - tracing::trace!("[History]: object:{} generating snapshot", self.object_id); - let snapshot = gen_snapshot( - &*collab.read().await, - &self.object_id, - "generate snapshot by periodic tick", - ); - self.pending_snapshots.lock().await.push(snapshot); - } - } else { - warn!("collab is dropped. cannot generate snapshot") - } - } - - pub fn did_apply_update(&self, txn: &T) { - let txn_edit_count = calculate_edit_count(txn); - self - .current_update_count - .store(Arc::new(txn_edit_count as u32)); - - let current = self.current_update_count.load_full(); - let prev = self.prev_edit_count.load_full(); - if current < prev { - warn!( - "object:{} current edit count:{} is less than prev edit count:{}", - self.object_id, current, prev - ); - return; - } - let threshold_count = *current - *prev; - let threshold = snapshot_max_edit_threshold(&self.collab_type); - #[cfg(feature = "verbose_log")] - tracing::trace!( - "[History] object_id:{}, update count:{}, threshold={}", - self.object_id, - threshold_count, - threshold, - ); - - if threshold_count + 1 >= threshold { - self.prev_edit_count.store(Arc::new(txn_edit_count as u32)); - let pending_snapshots = self.pending_snapshots.clone(); - let mutex_collab = self.mutex_collab.clone(); - let object_id = self.object_id.clone(); - tokio::spawn(async move { - if let Some(collab) = mutex_collab.upgrade() { - let snapshot = gen_snapshot( - &*collab.read().await, - &object_id, - &format!("Current edit:{}, threshold:{}", threshold_count, threshold), - ); - pending_snapshots.lock().await.push(snapshot); - } else { - warn!("collab is dropped. cannot generate snapshot") - } - }); - } - } -} - -#[inline] -fn snapshot_max_edit_threshold(collab_type: &CollabType) -> u32 { - match collab_type { - CollabType::Document => 500, - CollabType::Database => 30, - CollabType::WorkspaceDatabase => 10, - CollabType::Folder => 10, - CollabType::DatabaseRow => 10, - CollabType::UserAwareness => 20, - CollabType::Unknown => { - if cfg!(debug_assertions) { - 5 - } else { - 50 - } - }, - } -} - -#[inline] -fn snapshot_min_edit_threshold(collab_type: &CollabType) -> u32 { - match collab_type { - CollabType::Document => 50, - CollabType::Database => 10, - CollabType::WorkspaceDatabase => 10, - CollabType::Folder => 10, - CollabType::DatabaseRow => 10, - CollabType::UserAwareness => 10, - CollabType::Unknown => 5, - } -} -#[inline] -pub fn gen_snapshot(collab: &Collab, object_id: &str, reason: &str) -> CollabSnapshot { - tracing::trace!( - "[History]: generate {} snapshot, reason: {}", - object_id, - reason - ); - - let snapshot = collab.transact().snapshot(); - let timestamp = chrono::Utc::now().timestamp(); - CollabSnapshot::new(object_id, snapshot, timestamp) -} - -/// Represents the state of a collaborative object (Collab) at a specific timestamp. -/// This is used to revert a Collab to a past state using the closest preceding -/// `CollabStateSnapshot`. When reverting to a specific `CollabSnapshot`, -/// -/// locating the nearest `CollabStateSnapshot` whose `created_at` timestamp -/// is less than or equal to the `CollabSnapshot`'s `created_at`. -/// This `CollabStateSnapshot` is then used to restore the Collab's state to the snapshot's timestamp. -pub struct CollabSnapshotState { - pub snapshot_id: String, - /// Unique identifier of the collaborative document. - pub object_id: String, - /// Binary representation of the Collab's state. - pub doc_state: Vec, - pub doc_state_version: i32, - pub state_vector: StateVector, - /// Timestamp indicating when this snapshot was created, measured in milliseconds since the Unix epoch. - pub created_at: i64, - /// This field specifies the ID of another snapshot that the current snapshot depends on. If present, - /// it indicates that the current document's state is built upon or derived from the state of the - /// specified dependency snapshot. - pub dependency_snapshot_id: Option, -} - -impl CollabSnapshotState { - pub fn new( - object_id: String, - doc_state: Vec, - doc_state_version: i32, - state_vector: StateVector, - created_at: i64, - ) -> Self { - let snapshot_id = uuid::Uuid::new_v4().to_string(); - Self { - snapshot_id, - object_id, - doc_state, - doc_state_version, - state_vector, - created_at, - dependency_snapshot_id: None, - } - } -} - -/// Captures a significant version of a collaborative object (Collab), marking a specific point in time. -/// This snapshot is identified by a unique ID and linked to a specific `CollabStateSnapshot`. -/// It represents a milestone or version of the Collab that can be referenced or reverted to. -pub struct CollabSnapshot { - pub object_id: String, - /// Snapshot data capturing the Collab's state at the time of the snapshot. - pub snapshot: Snapshot, - /// Timestamp indicating when this snapshot was created, measured in milliseconds since the Unix epoch. - pub created_at: i64, -} -impl Deref for CollabSnapshot { - type Target = Snapshot; - - fn deref(&self) -> &Self::Target { - &self.snapshot - } -} - -impl CollabSnapshot { - pub fn new(object_id: &str, snapshot: Snapshot, created_at: i64) -> Self { - Self { - snapshot, - object_id: object_id.to_string(), - created_at, - } - } -} - -impl From for SnapshotMetaPb { - fn from(snapshot: CollabSnapshot) -> Self { - let snapshot_data = snapshot.encode_v1(); - Self { - oid: snapshot.object_id, - snapshot: snapshot_data, - snapshot_version: 1, - created_at: snapshot.created_at, - } - } -} - -#[inline] -pub(crate) fn calculate_edit_count(txn: &T) -> u64 { - let snapshot = txn.snapshot(); - let mut insert_count = 0; - for (_, &clock) in snapshot.state_map.iter() { - insert_count += clock as u64; - } - - let mut delete_count = 0; - for (_, range) in snapshot.delete_set.iter() { - for f in range.iter() { - let deleted_segments = f.len() as u64; - delete_count += deleted_segments; - } - } - - insert_count + delete_count -} diff --git a/services/appflowy-history_deprecated/src/config.rs b/services/appflowy-history_deprecated/src/config.rs deleted file mode 100644 index 3c8fea9ec..000000000 --- a/services/appflowy-history_deprecated/src/config.rs +++ /dev/null @@ -1,112 +0,0 @@ -use anyhow::{Context, Error}; -use collab_stream::client::CONTROL_STREAM_KEY; -use infra::env_util::get_env_var; -use serde::Deserialize; -use sqlx::postgres::{PgConnectOptions, PgSslMode}; -use std::{fmt::Display, str::FromStr}; - -#[derive(Debug, Clone)] -pub struct Config { - pub app_env: Environment, - pub redis_url: String, - pub db_settings: DatabaseSetting, - pub stream_settings: StreamSetting, -} - -impl Config { - pub fn from_env() -> Result { - Ok(Config { - app_env: get_env_var("APPFLOWY_HISTORY_ENVIRONMENT", "local") - .parse() - .context("fail to get APPFLOWY_HISTORY_ENVIRONMENT")?, - redis_url: get_env_var("APPFLOWY_HISTORY_REDIS_URL", "redis://localhost:6379"), - db_settings: DatabaseSetting { - pg_conn_opts: PgConnectOptions::from_str(&get_env_var( - "APPFLOWY_HISTORY_DATABASE_URL", - "postgres://postgres:password@localhost:5432/postgres", - ))?, - require_ssl: get_env_var("APPFLOWY_HISTORY_DATABASE_REQUIRE_SSL", "false") - .parse() - .context("fail to get APPFLOWY_HISTORY_DATABASE_REQUIRE_SSL")?, - max_connections: get_env_var("APPFLOWY_HISTORY_DATABASE_MAX_CONNECTIONS", "20") - .parse() - .context("fail to get APPFLOWY_HISTORY_DATABASE_MAX_CONNECTIONS")?, - database_name: get_env_var("APPFLOWY_HISTORY_DATABASE_NAME", "postgres"), - }, - stream_settings: StreamSetting { - control_key: CONTROL_STREAM_KEY.to_string(), - }, - }) - } -} - -#[derive(Clone, Debug)] -pub struct DatabaseSetting { - pub pg_conn_opts: PgConnectOptions, - pub require_ssl: bool, - pub max_connections: u32, - pub database_name: String, -} - -impl DatabaseSetting { - pub fn without_db(&self) -> PgConnectOptions { - let ssl_mode = if self.require_ssl { - PgSslMode::Require - } else { - PgSslMode::Prefer - }; - let options = self.pg_conn_opts.clone(); - options.ssl_mode(ssl_mode) - } - - pub fn with_db(&self) -> PgConnectOptions { - self.without_db().database(&self.database_name) - } -} - -impl Display for DatabaseSetting { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - let masked_pg_conn_opts = self.pg_conn_opts.clone().password("********"); - write!( - f, - "DatabaseSetting {{ pg_conn_opts: {:?}, require_ssl: {}, max_connections: {} }}", - masked_pg_conn_opts, self.require_ssl, self.max_connections - ) - } -} - -#[derive(Debug, Clone)] -pub struct StreamSetting { - /// The key of the stream that contains control event, [CollabControlEvent]. - pub control_key: String, -} - -#[derive(Clone, Debug, Deserialize)] -pub enum Environment { - Local, - Production, -} - -impl Environment { - pub fn as_str(&self) -> &'static str { - match self { - Environment::Local => "local", - Environment::Production => "production", - } - } -} - -impl FromStr for Environment { - type Err = anyhow::Error; - - fn from_str(s: &str) -> Result { - match s.to_lowercase().as_str() { - "local" => Ok(Self::Local), - "production" => Ok(Self::Production), - other => anyhow::bail!( - "{} is not a supported environment. Use either `local` or `production`.", - other - ), - } - } -} diff --git a/services/appflowy-history_deprecated/src/core/manager.rs b/services/appflowy-history_deprecated/src/core/manager.rs deleted file mode 100644 index 4ea7b31d6..000000000 --- a/services/appflowy-history_deprecated/src/core/manager.rs +++ /dev/null @@ -1,257 +0,0 @@ -use crate::biz::persistence::HistoryPersistence; -use crate::core::open_handle::OpenCollabHandle; -use crate::error::HistoryError; -use collab_entity::CollabType; -use collab_stream::client::CollabRedisStream; -use collab_stream::model::CollabControlEvent; -use collab_stream::stream_group::ReadOption; -use dashmap::mapref::entry::Entry; - -use crate::config::StreamSetting; -use collab::core::collab::DataSource; -use collab::core::origin::CollabOrigin; -use collab::preclude::Collab; -use dashmap::DashMap; -use database::history::ops::get_latest_snapshot; -use sqlx::PgPool; -use std::sync::Arc; -use std::time::Duration; -use tokio::time::interval; -use tonic_proto::history::{HistoryStatePb, SingleSnapshotInfoPb, SnapshotRequestPb}; -use tracing::{error, trace}; -use uuid::Uuid; - -const CONSUMER_NAME: &str = "open_collab"; -pub struct OpenCollabManager { - #[allow(dead_code)] - handles: Arc>>, - #[allow(dead_code)] - redis_stream: CollabRedisStream, -} - -impl OpenCollabManager { - pub async fn new( - redis_stream: CollabRedisStream, - pg_pool: PgPool, - setting: &StreamSetting, - ) -> Self { - let handles = Arc::new(DashMap::new()); - spawn_control_group(redis_stream.clone(), &handles, pg_pool, setting).await; - Self { - handles, - redis_stream, - } - } - - pub async fn get_in_memory_history( - &self, - req: SnapshotRequestPb, - ) -> Result { - match self.handles.get(&req.object_id) { - None => Err(HistoryError::RecordNotFound(req.object_id)), - Some(handle) => handle.history_state().await, - } - } - - pub async fn get_latest_snapshot( - &self, - req: SnapshotRequestPb, - pg_pool: &PgPool, - ) -> Result { - match self.find_available_latest_snapshot(&req, pg_pool).await { - Ok(pb) => Ok(pb), - Err(err) => { - // if matches!(err, HistoryError::InvalidCollab(_)) { - // // 1. delete snapshot state and related snapshot meta - // // 2. try to find next available snapshot - // return self.get_latest_snapshot(req, pg_pool).await; - // } - Err(err) - }, - } - } - - async fn find_available_latest_snapshot( - &self, - req: &SnapshotRequestPb, - pg_pool: &PgPool, - ) -> Result { - let collab_type = CollabType::from(req.collab_type); - match get_latest_snapshot(&req.object_id, &collab_type, pg_pool).await { - Ok(Some(pb)) => { - if let Some(history) = pb.history_state.clone() { - trace!("[History] validate history state: {}", req.object_id); - self - .validate_collab( - req.object_id.clone(), - collab_type, - history.doc_state.clone(), - history.doc_state_version, - ) - .await?; - } - Ok(pb) - }, - _ => Err(HistoryError::RecordNotFound(req.object_id.clone())), - } - } - - async fn validate_collab( - &self, - object_id: String, - collab_type: CollabType, - doc_state: Vec, - doc_state_version: i32, - ) -> Result<(), HistoryError> { - tokio::task::spawn_blocking(move || { - let database_source = match doc_state_version { - 1 => DataSource::DocStateV1(doc_state), - 2 => DataSource::DocStateV2(doc_state), - _ => DataSource::DocStateV1(doc_state), - }; - let collab = Collab::new_with_source( - CollabOrigin::Empty, - &object_id, - database_source, - vec![], - false, - )?; - - collab_type - .validate_require_data(&collab) - .map_err(|err| HistoryError::InvalidCollab(format!("{}", err)))?; - Ok::<_, HistoryError>(()) - }) - .await - .map_err(|err| HistoryError::Internal(err.into()))? - } -} - -async fn spawn_control_group( - redis_stream: CollabRedisStream, - handles: &Arc>>, - pg_pool: PgPool, - setting: &StreamSetting, -) { - let mut control_group = redis_stream - .collab_control_stream(&setting.control_key, "history") - .await - .unwrap(); - - // Handle stale messages - if let Ok(stale_messages) = control_group.get_unacked_messages(CONSUMER_NAME).await { - for message in &stale_messages { - if let Ok(event) = CollabControlEvent::decode(&message.data) { - handle_control_event(&redis_stream, event, handles, &pg_pool).await; - } - } - - if let Err(err) = control_group.ack_messages(&stale_messages).await { - error!("Failed to ack stale messages: {:?}", err); - } - } - - let weak_handles = Arc::downgrade(handles); - let mut interval = interval(Duration::from_secs(1)); - tokio::spawn(async move { - loop { - interval.tick().await; - if let Ok(messages) = control_group - .consumer_messages(CONSUMER_NAME, ReadOption::Count(10)) - .await - { - if let Some(handles) = weak_handles.upgrade() { - if messages.is_empty() { - continue; - } - - trace!("[History] received {} control messages", messages.len()); - for message in &messages { - if let Ok(event) = CollabControlEvent::decode(&message.data) { - handle_control_event(&redis_stream, event, &handles, &pg_pool).await; - } - } - if let Err(err) = control_group.ack_messages(&messages).await { - error!("Failed to ack messages: {:?}", err); - } - } - } - } - }); -} - -async fn handle_control_event( - redis_stream: &CollabRedisStream, - event: CollabControlEvent, - handles: &Arc>>, - pg_pool: &PgPool, -) { - trace!("[History] received control event: {}", event); - match event { - CollabControlEvent::Open { - workspace_id, - object_id, - collab_type, - doc_state, - } => match handles.entry(object_id.clone()) { - Entry::Occupied(_) => {}, - Entry::Vacant(entry) => { - trace!( - "[History]: open collab:{}, collab_type:{}", - object_id, - collab_type - ); - match init_collab_handle( - redis_stream, - pg_pool, - &workspace_id, - &object_id, - collab_type, - doc_state, - ) - .await - { - Ok(handle) => { - let arc_handle = Arc::new(handle); - entry.insert(arc_handle); - }, - Err(err) => { - error!("Failed to open collab: {:?}", err); - }, - } - }, - }, - CollabControlEvent::Close { object_id } => { - trace!("[History]: close collab:{}", object_id); - handles.remove(&object_id); - }, - } -} - -#[inline] -async fn init_collab_handle( - redis_stream: &CollabRedisStream, - pg_pool: &PgPool, - workspace_id: &String, - object_id: &String, - collab_type: CollabType, - doc_state: Vec, -) -> Result { - let group_name = format!("history_{}:{}", workspace_id, object_id); - let update_stream = redis_stream - .collab_update_stream_group(workspace_id, object_id, &group_name) - .await - .unwrap(); - - let workspace_id = - Uuid::parse_str(workspace_id).map_err(|err| HistoryError::Internal(err.into()))?; - let persistence = Arc::new(HistoryPersistence::new(workspace_id, pg_pool.clone())); - OpenCollabHandle::new( - object_id, - doc_state, - collab_type, - Some(update_stream), - Some(persistence), - ) - .await -} diff --git a/services/appflowy-history_deprecated/src/core/mod.rs b/services/appflowy-history_deprecated/src/core/mod.rs deleted file mode 100644 index 7b95d63ac..000000000 --- a/services/appflowy-history_deprecated/src/core/mod.rs +++ /dev/null @@ -1,2 +0,0 @@ -pub mod manager; -pub mod open_handle; diff --git a/services/appflowy-history_deprecated/src/core/open_handle.rs b/services/appflowy-history_deprecated/src/core/open_handle.rs deleted file mode 100644 index 168e864c6..000000000 --- a/services/appflowy-history_deprecated/src/core/open_handle.rs +++ /dev/null @@ -1,268 +0,0 @@ -use std::sync::{Arc, Weak}; -use std::time::Duration; - -use collab::core::collab::DataSource; -use collab::core::origin::CollabOrigin; -use collab::error::CollabError; -use collab::lock::RwLock; -use collab::preclude::updates::decoder::Decode; -use collab::preclude::{Collab, Update}; -use collab_entity::CollabType; -use tokio::time::interval; -use tracing::error; - -use collab_stream::model::{CollabUpdateEvent, StreamMessage}; -use collab_stream::stream_group::{ReadOption, StreamGroup}; -use tonic_proto::history::HistoryStatePb; - -use crate::biz::history::CollabHistory; -use crate::biz::persistence::HistoryPersistence; -use crate::error::HistoryError; - -const CONSUMER_NAME: &str = "open_collab_handle"; -pub struct OpenCollabHandle { - pub object_id: String, - pub collab: Arc>, - pub collab_type: CollabType, - pub history: Arc, - - #[allow(dead_code)] - /// The history persistence to save the history periodically. - /// bind the lifetime to the handle. - history_persistence: Option>, -} - -impl OpenCollabHandle { - pub async fn new( - object_id: &str, - doc_state: Vec, - collab_type: CollabType, - update_stream: Option, - history_persistence: Option>, - ) -> Result { - // Must set skip_gc = true to avoid the garbage collection of the collab. - let mut collab = Collab::new_with_source( - CollabOrigin::Empty, - object_id, - DataSource::DocStateV1(doc_state), - vec![], - true, - )?; - collab.initialize(); - let collab = Arc::new(RwLock::new(collab)); - - let object_id = object_id.to_string(); - let history = - Arc::new(CollabHistory::new(&object_id, collab.clone(), collab_type.clone()).await); - - // Spawn a task to receive updates from the update stream. - spawn_recv_update(&object_id, &collab_type, collab.clone(), update_stream).await?; - - // spawn a task periodically to save the history to the persistence. - if let Some(persistence) = &history_persistence { - spawn_save_history(Arc::downgrade(&history), Arc::downgrade(persistence)); - } - - Ok(Self { - object_id, - collab, - collab_type, - history, - history_persistence, - }) - } - - pub async fn history_state(&self) -> Result { - let lock_guard = self.collab.read().await; - let encode_collab = lock_guard.encode_collab_v1(|collab| { - self - .collab_type - .validate_require_data(collab) - .map_err(|err| HistoryError::Internal(err.into())) - })?; - Ok(HistoryStatePb { - object_id: self.object_id.clone(), - doc_state: encode_collab.doc_state.to_vec(), - doc_state_version: 1, - }) - } -} - -/// Spawns an asynchronous task to continuously receive and process updates from a given update stream. -async fn spawn_recv_update( - object_id: &str, - collab_type: &CollabType, - collab: Arc>, - update_stream: Option, -) -> Result<(), HistoryError> { - let mut update_stream = match update_stream { - Some(stream) => stream, - None => return Ok(()), - }; - - let interval_duration = Duration::from_secs(5); - let object_id = object_id.to_string(); - let collab_type = collab_type.clone(); - - if let Ok(stale_messages) = update_stream.get_unacked_messages(CONSUMER_NAME).await { - let message_ids = stale_messages - .iter() - .map(|m| m.id.to_string()) - .collect::>(); - - // 1.Process the stale messages. - if let Err(err) = process_messages( - &mut update_stream, - stale_messages, - collab.clone(), - &object_id, - &collab_type, - ) - .await - { - // 2.Clear the stale messages if failed to process them. - if let Err(err) = update_stream.clear().await { - error!("[History]: fail to clear stale update messages: {:?}", err); - } - return Err(HistoryError::ApplyStaleMessage(err.to_string())); - } - - // 3.Acknowledge the stale messages. - if let Err(err) = update_stream.ack_message_ids(message_ids).await { - error!("[History ] fail to ack stale messages: {:?}", err); - } - } - - // spawn a task to receive updates from the update stream. - let weak_collab = Arc::downgrade(&collab); - tokio::spawn(async move { - let mut interval = interval(interval_duration); - loop { - interval.tick().await; - - // Check if the mutex_collab is still alive. If not, break the loop. - if let Some(collab) = weak_collab.upgrade() { - if let Ok(messages) = update_stream - .consumer_messages(CONSUMER_NAME, ReadOption::Undelivered) - .await - { - if messages.is_empty() { - continue; - } - - #[cfg(feature = "verbose_log")] - tracing::trace!("[History] received {} update messages", messages.len()); - if let Err(e) = process_messages( - &mut update_stream, - messages, - collab, - &object_id, - &collab_type, - ) - .await - { - error!("Error processing update: {:?}", e); - } - } - } else { - // break the loop if the mutex_collab is dropped. - break; - } - } - }); - Ok(()) -} - -/// Processes messages from the update stream and applies them. -async fn process_messages( - update_stream: &mut StreamGroup, - messages: Vec, - collab: Arc>, - object_id: &str, - _collab_type: &CollabType, -) -> Result<(), HistoryError> { - let mut write_guard = collab.write().await; - apply_updates(object_id, &messages, &mut write_guard)?; - drop(write_guard); - update_stream.ack_messages(&messages).await?; - Ok(()) -} - -/// Applies decoded updates from messages to the given locked collaboration object. -#[inline] -fn apply_updates( - _object_id: &str, - messages: &[StreamMessage], - collab: &mut Collab, -) -> Result<(), HistoryError> { - let mut txn = collab.transact_mut(); - for message in messages { - let CollabUpdateEvent::UpdateV1 { encode_update } = CollabUpdateEvent::decode(&message.data)?; - let update = Update::decode_v1(&encode_update) - .map_err(|e| CollabError::YrsEncodeStateError(e.to_string()))?; - - #[cfg(feature = "verbose_log")] - tracing::trace!( - "[History]: object_id:{} apply update: {:#?}", - _object_id, - update - ); - txn - .apply_update(update) - .map_err(|err| HistoryError::Internal(err.into()))?; - } - Ok(()) -} -fn spawn_save_history(history: Weak, history_persistence: Weak) { - tokio::spawn(async move { - let mut interval = if cfg!(debug_assertions) { - interval(Duration::from_secs(10)) - } else { - interval(Duration::from_secs(5 * 60)) - }; - interval.tick().await; // Initial delay - - let mut tick_count = 1; - loop { - interval.tick().await; // Wait for the next interval tick - if let (Some(history), Some(persistence)) = (history.upgrade(), history_persistence.upgrade()) - { - let min_snapshot_required = if tick_count % 10 == 0 { - history.generate_snapshot_if_empty().await; - None // No limit on snapshots every 3 ticks - } else { - Some(3) - }; - - #[cfg(feature = "verbose_log")] - tracing::trace!( - "[History]: {} periodic save history task. tick count: {}, min_snapshot_required:{:?}", - &history.object_id, - tick_count, - min_snapshot_required - ); - - // Generate history and attempt to insert it into persistence - match history.gen_history(min_snapshot_required).await { - Ok(Some(ctx)) => { - if let Err(err) = persistence - .insert_history(ctx.state, ctx.snapshots, ctx.collab_type) - .await - { - error!("Failed to save snapshot: {:?}", err); - } - }, - Ok(None) => {}, // No history to save - Err(err) => error!("Error generating history: {:?}", err), - } - - tick_count += 1; - } else { - // Exit loop if history or persistence has been dropped - #[cfg(feature = "verbose_log")] - tracing::trace!("[History]: exiting periodic save history task"); - break; - } - } - }); -} diff --git a/services/appflowy-history_deprecated/src/error.rs b/services/appflowy-history_deprecated/src/error.rs deleted file mode 100644 index ed7a051d8..000000000 --- a/services/appflowy-history_deprecated/src/error.rs +++ /dev/null @@ -1,55 +0,0 @@ -use crate::response::{APIResponse, Code}; -use axum::response::IntoResponse; -use tonic::Status; - -#[derive(thiserror::Error, Debug)] -pub enum HistoryError { - #[error(transparent)] - CollabError(#[from] collab::error::CollabError), - - #[error(transparent)] - PersistenceError(#[from] sqlx::Error), - - #[error("Try lock fail")] - TryLockFail, - - #[error("Record not found:{0}")] - RecordNotFound(String), - - #[error("Apply stale message:{0}")] - ApplyStaleMessage(String), - - #[error(transparent)] - RedisStreamError(#[from] collab_stream::error::StreamError), - - #[error("Invalid collab:{0}")] - InvalidCollab(String), - - #[error(transparent)] - Internal(#[from] anyhow::Error), -} - -impl HistoryError { - pub fn code(&self) -> Code { - Code::Unhandled - } -} - -impl IntoResponse for HistoryError { - fn into_response(self) -> axum::response::Response { - let code = self.code(); - let message = self.to_string(); - APIResponse::new(()) - .with_message(message) - .with_code(code) - .into_response() - } -} - -impl From for Status { - fn from(value: HistoryError) -> Self { - let code = value.code(); - let message = value.to_string(); - Status::new(code.into(), message) - } -} diff --git a/services/appflowy-history_deprecated/src/lib.rs b/services/appflowy-history_deprecated/src/lib.rs deleted file mode 100644 index 15e92a065..000000000 --- a/services/appflowy-history_deprecated/src/lib.rs +++ /dev/null @@ -1,8 +0,0 @@ -pub(crate) mod api; -pub mod application; -pub mod biz; -pub mod config; -pub mod core; -pub mod error; -pub mod models; -pub mod response; diff --git a/services/appflowy-history_deprecated/src/main.rs b/services/appflowy-history_deprecated/src/main.rs deleted file mode 100644 index 18dab559a..000000000 --- a/services/appflowy-history_deprecated/src/main.rs +++ /dev/null @@ -1,11 +0,0 @@ -use appflowy_history::application::run_server; -use appflowy_history::config::Config; - -use tokio::net::TcpListener; - -#[tokio::main] -async fn main() -> Result<(), Box> { - let listener = TcpListener::bind("0.0.0.0:50051").await.unwrap(); - let config = Config::from_env().expect("failed to load config"); - run_server(listener, config).await -} diff --git a/services/appflowy-history_deprecated/src/models.rs b/services/appflowy-history_deprecated/src/models.rs deleted file mode 100644 index 8b1378917..000000000 --- a/services/appflowy-history_deprecated/src/models.rs +++ /dev/null @@ -1 +0,0 @@ - diff --git a/services/appflowy-history_deprecated/src/response.rs b/services/appflowy-history_deprecated/src/response.rs deleted file mode 100644 index 7798c904f..000000000 --- a/services/appflowy-history_deprecated/src/response.rs +++ /dev/null @@ -1,62 +0,0 @@ -use axum::response::IntoResponse; -use axum::Json; - -#[derive(serde::Serialize)] -pub struct APIResponse { - pub code: Code, - pub message: String, - pub data: T, -} - -impl APIResponse { - pub fn new(data: T) -> Self { - Self { - code: Code::Ok, - message: "success".to_string(), - data, - } - } - - pub fn with_message(self, message: String) -> Self { - Self { message, ..self } - } - - pub fn with_code(self, code: Code) -> Self { - Self { code, ..self } - } -} - -impl IntoResponse for APIResponse -where - T: serde::Serialize, -{ - fn into_response(self) -> axum::response::Response { - Json(self).into_response() - } -} - -impl From for APIResponse -where - T: serde::Serialize, -{ - fn from(data: T) -> Self { - Self::new(data) - } -} - -#[derive(Debug, Clone, serde_repr::Serialize_repr, serde_repr::Deserialize_repr, Default)] -#[repr(i32)] -pub enum Code { - #[default] - Ok = 0, - Unhandled = -1, -} - -impl From for tonic::Code { - fn from(code: Code) -> Self { - match code { - Code::Ok => tonic::Code::Ok, - Code::Unhandled => tonic::Code::Unknown, - } - } -} diff --git a/services/appflowy-history_deprecated/tests/edit_test/mock.rs b/services/appflowy-history_deprecated/tests/edit_test/mock.rs deleted file mode 100644 index 9d706e960..000000000 --- a/services/appflowy-history_deprecated/tests/edit_test/mock.rs +++ /dev/null @@ -1,77 +0,0 @@ -use collab::core::origin::CollabOrigin; -use collab::preclude::{Collab, CollabPlugin, TransactionMut}; -use collab_entity::CollabType; -use collab_stream::model::{CollabControlEvent, CollabUpdateEvent}; -use parking_lot::RwLock; -use std::sync::Arc; - -#[allow(dead_code)] -pub struct StreamEventMock { - pub open_event: CollabControlEvent, - pub close_event: CollabControlEvent, - // each element can be decoded to a Update - pub update_events: Vec, - pub expected_json: serde_json::Value, -} - -#[allow(dead_code)] -pub async fn mock_test_data( - workspace_id: &str, - object_id: &str, - edit_count: usize, -) -> StreamEventMock { - let workspace_id = workspace_id.to_string(); - let object_id = object_id.to_string(); - let mut collab = Collab::new_with_origin(CollabOrigin::Empty, &object_id, vec![], true); - let plugin = ReceiveUpdatesPlugin::default(); - collab.add_plugin(Box::new(plugin.clone())); - collab.initialize(); - - let doc_state = collab - .encode_collab_v1(|_| Ok::<(), anyhow::Error>(())) - .unwrap() - .doc_state - .to_vec(); - - let open_event = CollabControlEvent::Open { - workspace_id: workspace_id.clone(), - object_id: object_id.clone(), - collab_type: CollabType::Unknown, - doc_state, - }; - - let close_event = CollabControlEvent::Close { - object_id: object_id.clone(), - }; - - for i in 0..edit_count { - collab.insert(&format!("key{}", i), vec![i as u8]); - } - - let updates = std::mem::take(&mut *plugin.updates.write()); - let update_events = updates - .into_iter() - .map(|update| CollabUpdateEvent::UpdateV1 { - encode_update: update, - }) - .collect::>(); - let expected_json = collab.to_json_value(); - - StreamEventMock { - open_event, - close_event, - update_events, - expected_json, - } -} - -#[derive(Clone, Default)] -struct ReceiveUpdatesPlugin { - updates: Arc>>>, -} - -impl CollabPlugin for ReceiveUpdatesPlugin { - fn receive_update(&self, _object_id: &str, _txn: &TransactionMut, update: &[u8]) { - self.updates.write().push(update.to_vec()); - } -} diff --git a/services/appflowy-history_deprecated/tests/edit_test/mod.rs b/services/appflowy-history_deprecated/tests/edit_test/mod.rs deleted file mode 100644 index 66d738494..000000000 --- a/services/appflowy-history_deprecated/tests/edit_test/mod.rs +++ /dev/null @@ -1,2 +0,0 @@ -mod mock; -mod recv_update_test; diff --git a/services/appflowy-history_deprecated/tests/edit_test/recv_update_test.rs b/services/appflowy-history_deprecated/tests/edit_test/recv_update_test.rs deleted file mode 100644 index cda1396bf..000000000 --- a/services/appflowy-history_deprecated/tests/edit_test/recv_update_test.rs +++ /dev/null @@ -1,123 +0,0 @@ -use crate::edit_test::mock::mock_test_data; -use crate::util::{check_doc_state_json, redis_stream, run_test_server}; -use collab_entity::CollabType; -use tonic_proto::history::SnapshotRequestPb; - -#[tokio::test] -async fn apply_update_stream_updates_test() { - let redis_stream = redis_stream().await; - let workspace_id = uuid::Uuid::new_v4().to_string(); - let object_id = uuid::Uuid::new_v4().to_string(); - let mock = mock_test_data(&workspace_id, &object_id, 30).await; - - // use a random control stream key - let control_stream_key = uuid::Uuid::new_v4().to_string(); - let client = run_test_server(control_stream_key).await; - - let control_stream_key = client.config.stream_settings.control_key.clone(); - let mut control_group = redis_stream - .collab_control_stream(&control_stream_key, "appflowy_cloud") - .await - .unwrap(); - - // apply open event - control_group - .insert_message(mock.open_event.clone()) - .await - .unwrap(); - - let mut update_group = redis_stream - .collab_update_stream_group(&workspace_id, &object_id, "appflowy_cloud") - .await - .unwrap(); - - // apply updates - for update_event in &mock.update_events { - update_group - .insert_message(update_event.clone()) - .await - .unwrap(); - } - - let request = SnapshotRequestPb { - workspace_id: workspace_id.to_string(), - object_id: object_id.to_string(), - collab_type: CollabType::Unknown.value(), - num_snapshot: 1, - }; - - check_doc_state_json(&object_id, 60, mock.expected_json.clone(), move || { - let mut cloned_client = client.clone(); - let cloned_request = request.clone(); - Box::pin(async move { - cloned_client - .get_latest_snapshot(cloned_request) - .await - .map(|r| r.into_inner().history_state.unwrap()) - }) - }) - .await - .unwrap(); -} - -// #[tokio::test] -// async fn apply_missing_updates_test() { -// let redis_stream = redis_stream().await; -// let workspace_id = uuid::Uuid::new_v4().to_string(); -// let object_id = uuid::Uuid::new_v4().to_string(); -// let mock = mock_test_data(&workspace_id, &object_id, 30).await; -// let client = run_test_server(uuid::Uuid::new_v4().to_string()).await; -// -// let control_stream_key = client.config.stream_settings.control_key.clone(); -// let mut control_group = redis_stream -// .collab_control_stream(&control_stream_key, "appflowy_cloud") -// .await -// .unwrap(); -// -// // apply open event -// control_group -// .insert_message(mock.open_event.clone()) -// .await -// .unwrap(); -// -// let mut update_group = redis_stream -// .collab_update_stream_group(&workspace_id, &object_id, "appflowy_cloud") -// .await -// .unwrap(); -// -// let mut missing_updates = vec![]; -// // apply updates -// for (index, update_event) in mock.update_events.iter().enumerate() { -// if index % 2 == 0 { -// missing_updates.push(update_event.clone()); -// } else { -// update_group -// .insert_message(update_event.clone()) -// .await -// .unwrap(); -// } -// } -// -// for update_event in missing_updates { -// update_group.insert_message(update_event).await.unwrap(); -// } -// -// let request = SnapshotRequestPb { -// workspace_id: workspace_id.to_string(), -// object_id: object_id.to_string(), -// collab_type: CollabType::Unknown.value(), -// }; -// -// check_doc_state_json(&object_id, 120, mock.expected_json.clone(), move || { -// let mut cloned_client = client.clone(); -// let cloned_request = request.clone(); -// Box::pin(async move { -// cloned_client -// .get_in_memory_history(cloned_request) -// .await -// .map(|r| r.into_inner()) -// }) -// }) -// .await -// .unwrap(); -// } diff --git a/services/appflowy-history_deprecated/tests/main.rs b/services/appflowy-history_deprecated/tests/main.rs deleted file mode 100644 index fec95dcdf..000000000 --- a/services/appflowy-history_deprecated/tests/main.rs +++ /dev/null @@ -1,3 +0,0 @@ -// mod edit_test; -mod stream_test; -mod util; diff --git a/services/appflowy-history_deprecated/tests/stream_test/control_stream_test.rs b/services/appflowy-history_deprecated/tests/stream_test/control_stream_test.rs deleted file mode 100644 index 33070572f..000000000 --- a/services/appflowy-history_deprecated/tests/stream_test/control_stream_test.rs +++ /dev/null @@ -1,204 +0,0 @@ -use crate::util::redis_stream; -use collab_entity::CollabType; -use collab_stream::model::{CollabControlEvent, StreamBinary}; -use collab_stream::stream_group::ReadOption; -use serial_test::serial; -use std::time::Duration; -use tokio::time::sleep; - -#[tokio::test] -#[serial] -async fn single_reader_single_sender_test() { - let control_stream_key = uuid::Uuid::new_v4().to_string(); - let redis_stream = redis_stream().await; - let recv_group_name = format!("recv-{}", uuid::Uuid::new_v4()); - let send_group_name = format!("send-{}", uuid::Uuid::new_v4()); - let mut recv_group = redis_stream - .collab_control_stream(&control_stream_key, &recv_group_name) - .await - .unwrap(); - // clear before starting the test. Otherwise, the receive group may have messages from previous tests - recv_group.clear().await.unwrap(); - - let mut send_group = redis_stream - .collab_control_stream(&control_stream_key, &send_group_name) - .await - .unwrap(); - - let send_event = mock_event("object_id".to_string()); - let message: StreamBinary = send_event.clone().try_into().unwrap(); - send_group.insert_messages(vec![message]).await.unwrap(); - - let messages = recv_group - .consumer_messages("consumer1", ReadOption::Count(10)) - .await - .unwrap(); - assert_eq!(messages.len(), 1); - recv_group.ack_messages(&messages).await.unwrap(); - - let recv_event = CollabControlEvent::decode(&messages[0].data).unwrap(); - assert_eq!(send_event, recv_event); - - let messages = recv_group - .consumer_messages("consumer1", ReadOption::Count(10)) - .await - .unwrap(); - assert!( - messages.is_empty(), - "No more messages should be available, but got {}", - messages.len() - ); -} - -#[tokio::test] -#[serial] -async fn multiple_readers_single_sender_test() { - let control_stream_key = uuid::Uuid::new_v4().to_string(); - let redis_stream = redis_stream().await; - let recv_group_1 = format!("recv-{}", uuid::Uuid::new_v4()); - let recv_group_2 = format!("recv-{}", uuid::Uuid::new_v4()); - let send_group = format!("send-{}", uuid::Uuid::new_v4()); - let mut recv_group_1 = redis_stream - .collab_control_stream(&control_stream_key, &recv_group_1) - .await - .unwrap(); - recv_group_1.clear().await.unwrap(); - - let mut recv_group_2 = redis_stream - .collab_control_stream(&control_stream_key, &recv_group_2) - .await - .unwrap(); - recv_group_2.clear().await.unwrap(); - - let mut send_group = redis_stream - .collab_control_stream(&control_stream_key, &send_group) - .await - .unwrap(); - - let send_event = mock_event("object_id".to_string()); - let message: StreamBinary = send_event.clone().try_into().unwrap(); - send_group.insert_messages(vec![message]).await.unwrap(); - - // assert the message was received by recv_group_1 and recv_group_2 - let message = recv_group_1 - .consumer_messages("consumer1", ReadOption::Count(1)) - .await - .unwrap(); - assert_eq!(message.len(), 1); - let recv_event = CollabControlEvent::decode(&message[0].data).unwrap(); - assert_eq!(send_event, recv_event); - - let message = recv_group_2 - .consumer_messages("consumer1", ReadOption::Count(1)) - .await - .unwrap(); - assert_eq!(message.len(), 1); - let recv_event = CollabControlEvent::decode(&message[0].data).unwrap(); - assert_eq!(send_event, recv_event); -} - -#[tokio::test] -#[serial] -async fn reading_pending_event_test() { - let control_stream_key = uuid::Uuid::new_v4().to_string(); - let redis_stream = redis_stream().await; - let send_group_name = format!("send-{}", uuid::Uuid::new_v4()); - let mut send_group = redis_stream - .collab_control_stream(&control_stream_key, &send_group_name) - .await - .unwrap(); - let send_event = mock_event("object_id".to_string()); - let message: StreamBinary = send_event.clone().try_into().unwrap(); - send_group.insert_messages(vec![message]).await.unwrap(); - - let recv_group_name = format!("recv-{}", uuid::Uuid::new_v4()); - let mut recv_group = redis_stream - .collab_control_stream(&control_stream_key, &recv_group_name) - .await - .unwrap(); - - // recv_group will read all non-acknowledged messages. At least one message that was inserted by send_group should be there - let messages = recv_group - .consumer_messages("consumer1", ReadOption::Undelivered) - .await - .unwrap(); - recv_group.ack_messages(&messages).await.unwrap(); - assert!(!messages.is_empty()); - - let messages = recv_group - .consumer_messages("consumer1", ReadOption::Count(1)) - .await - .unwrap(); - assert!(messages.is_empty()); -} - -#[tokio::test] -#[serial] -async fn ack_partial_message_test() { - let control_stream_key = uuid::Uuid::new_v4().to_string(); - let redis_stream = redis_stream().await; - let send_group_name = format!("send-{}", uuid::Uuid::new_v4()); - let mut send_group = redis_stream - .collab_control_stream(&control_stream_key, &send_group_name) - .await - .unwrap(); - let recv_group_name = format!("recv-{}", uuid::Uuid::new_v4()); - let mut recv_group = redis_stream - .collab_control_stream(&control_stream_key, &recv_group_name) - .await - .unwrap(); - recv_group.clear().await.unwrap(); - - for i in 0..3 { - let send_event = mock_event(i.to_string()); - let message: StreamBinary = send_event.clone().try_into().unwrap(); - send_group.insert_messages(vec![message]).await.unwrap(); - } - - // recv_group will read all non-acknowledged messages. At least one message that was inserted by send_group should be there - let mut messages = recv_group - .consumer_messages("consumer1", ReadOption::Undelivered) - .await - .unwrap(); - assert_eq!(messages.len(), 3); - - // simulate the last message is not acked - messages.pop(); - recv_group.ack_messages(&messages).await.unwrap(); - - // sleep for a while to make sure the message is considered as pending - sleep(Duration::from_secs(2)).await; - - let pending = recv_group.get_pending().await.unwrap().unwrap(); - assert_eq!(pending.consumers.len(), 1); - assert_eq!(pending.consumers[0].pending, 1); - let messages = recv_group - .get_unacked_messages_with_range( - &pending.consumers[0].name, - &pending.start_id, - &pending.end_id, - ) - .await - .unwrap(); - assert_eq!(messages.len(), 1); - - let recv_event = CollabControlEvent::decode(&messages[0].data).unwrap(); - assert_eq!( - CollabControlEvent::Open { - workspace_id: "w1".to_string(), - object_id: "2".to_string(), - collab_type: CollabType::Unknown, - doc_state: vec![], - }, - recv_event - ); -} - -fn mock_event(object_id: String) -> CollabControlEvent { - CollabControlEvent::Open { - workspace_id: "w1".to_string(), - object_id, - collab_type: CollabType::Unknown, - doc_state: vec![], - } -} diff --git a/services/appflowy-history_deprecated/tests/stream_test/encode_test.rs b/services/appflowy-history_deprecated/tests/stream_test/encode_test.rs deleted file mode 100644 index d03b15f07..000000000 --- a/services/appflowy-history_deprecated/tests/stream_test/encode_test.rs +++ /dev/null @@ -1,139 +0,0 @@ -use appflowy_history::biz::history::CollabHistory; -use appflowy_history::biz::snapshot::CollabSnapshot; -use appflowy_history::core::open_handle::OpenCollabHandle; -use assert_json_diff::assert_json_eq; -use collab::core::collab::DataSource; -use collab::core::origin::CollabOrigin; -use collab::preclude::updates::decoder::Decode; -use collab::preclude::{Collab, ReadTxn, StateVector, Update}; -use collab_entity::CollabType; -use serde_json::json; -use std::sync::Arc; -use std::time::Duration; -use tokio::time::sleep; - -#[tokio::test] -async fn generate_snapshot_test() { - let object_id = uuid::Uuid::new_v4().to_string(); - let open_collab = - Arc::new(OpenCollabHandle::new(&object_id, vec![], CollabType::Unknown, None, None).unwrap()); - let history = &open_collab.history; - - let updates = update_sequence_for_values( - &object_id, - vec!["a".to_string(), "b".to_string(), "c".to_string()], - ); - for update in updates { - open_collab.apply_update_v1(&update).unwrap(); - sleep(Duration::from_millis(400)).await; - } - - let ctx = history.gen_snapshot_context().await.unwrap().unwrap(); - assert_eq!(ctx.snapshots.len(), 3); - - // decode the doc_state_v2 and check if the state is correct - let collab = Collab::new_with_source( - CollabOrigin::Empty, - &object_id, - DataSource::DocStateV2(ctx.state.doc_state.clone()), - vec![], - true, - ) - .unwrap(); - assert_json_eq!( - collab.to_json_value(), - json!({"map":{"0":"a","1":"b","2":"c"}}) - ); - - for (i, snapshot) in ctx.snapshots.into_iter().enumerate() { - let json = json_from_snapshot(history, &snapshot, &object_id); - match i { - 0 => { - assert_json_eq!(json, json!({"map":{"0":"a"}})); - }, - 1 => { - assert_json_eq!(json, json!({"map":{"0":"a","1":"b"}})); - }, - 2 => { - assert_json_eq!(json, json!({"map":{"0":"a","1":"b","2":"c"}})); - }, - _ => unreachable!(), - } - } -} - -#[tokio::test] -async fn snapshot_before_apply_update_test() { - let object_id = uuid::Uuid::new_v4().to_string(); - let open_collab = - Arc::new(OpenCollabHandle::new(&object_id, vec![], CollabType::Unknown, None, None).unwrap()); - let history = &open_collab.history; - let updates = update_sequence_for_values( - &object_id, - vec!["a".to_string(), "b".to_string(), "c".to_string()], - ); - - let mut snapshots = vec![]; - for update in updates { - // before applying the update, generate a snapshot which will be used to encode the update. - // the snapshot update can be used to restore the state of the collab. - let snapshot = history.gen_snapshot(1).unwrap(); - open_collab.apply_update_v1(&update).unwrap(); - snapshots.push(snapshot); - } - - snapshots.push(history.gen_snapshot(1).unwrap()); - for (i, snapshot) in snapshots.into_iter().enumerate() { - let json = json_from_snapshot(history, &snapshot, &object_id); - match i { - 0 => { - assert_json_eq!(json, json!({})); - }, - 1 => { - assert_json_eq!(json, json!({"map":{"0":"a"}})); - }, - 2 => { - assert_json_eq!(json, json!({"map":{"0":"a","1":"b"}})); - }, - 3 => { - assert_json_eq!(json, json!({"map":{"0":"a","1":"b","2":"c"}})); - }, - _ => unreachable!(), - } - } -} - -pub fn json_from_snapshot( - history: &CollabHistory, - snapshot: &CollabSnapshot, - object_id: &str, -) -> serde_json::Value { - let update = Update::decode_v2(&history.encode_update_v2(snapshot).unwrap()).unwrap(); - let collab = Collab::new_with_origin(CollabOrigin::Empty, object_id, vec![], false); - collab.with_origin_transact_mut(|txn| { - txn.apply_update(update); - }); - - collab.to_json_value() -} - -/// The update sequence is a series of updates that insert a value into a map. -fn update_sequence_for_values(object_id: &str, values: Vec) -> Vec> { - let mut updates = vec![]; - let collab = Collab::new_with_origin(CollabOrigin::Empty, object_id, vec![], false); - let mut sv = None; - for (index, value) in values.iter().enumerate() { - collab.with_origin_transact_mut(|txn| { - let map = collab.insert_map_with_txn_if_not_exist(txn, "map"); - map.insert_with_txn(txn, &index.to_string(), value.to_string()); - }); - - { - let txn = collab.transact(); - let update = txn.encode_state_as_update_v1(&sv.unwrap_or(StateVector::default())); - updates.push(update); - } - sv = Some(collab.transact().state_vector()); - } - updates -} diff --git a/services/appflowy-history_deprecated/tests/stream_test/mod.rs b/services/appflowy-history_deprecated/tests/stream_test/mod.rs deleted file mode 100644 index 53c510353..000000000 --- a/services/appflowy-history_deprecated/tests/stream_test/mod.rs +++ /dev/null @@ -1,3 +0,0 @@ -mod control_stream_test; -// mod encode_test; -mod update_stream_test; diff --git a/services/appflowy-history_deprecated/tests/stream_test/update_stream_test.rs b/services/appflowy-history_deprecated/tests/stream_test/update_stream_test.rs deleted file mode 100644 index ab1ac86a4..000000000 --- a/services/appflowy-history_deprecated/tests/stream_test/update_stream_test.rs +++ /dev/null @@ -1,90 +0,0 @@ -use crate::util::redis_stream; -use collab_stream::stream_group::ReadOption; -use std::time::Duration; -use tokio::time::sleep; - -#[tokio::test] -async fn single_reader_single_sender_update_stream_test() { - let redis_stream = redis_stream().await; - let workspace = uuid::Uuid::new_v4().to_string(); - let object_id = uuid::Uuid::new_v4().to_string(); - - let mut send_group = redis_stream - .collab_update_stream_group(&workspace, &object_id, "write") - .await - .unwrap(); - for i in 0..5 { - send_group.insert_message(vec![i]).await.unwrap(); - } - - let mut recv_group = redis_stream - .collab_update_stream_group(&workspace, &object_id, "read1") - .await - .unwrap(); - - // the following messages are not acked so they should be pending - // and should be returned by the next get_unacked_messages call - let first_consume_messages = recv_group - .consumer_messages("consumer1", ReadOption::Count(2)) - .await - .unwrap(); - assert_eq!(first_consume_messages.len(), 2); - assert_eq!(first_consume_messages[0].data, vec![0]); - assert_eq!(first_consume_messages[1].data, vec![1]); - sleep(Duration::from_secs(4)).await; - - let unacked_messages = recv_group.get_unacked_messages("consumer1").await.unwrap(); - assert_eq!(unacked_messages.len(), first_consume_messages.len()); - assert_eq!(unacked_messages[0].data, first_consume_messages[0].data); - assert_eq!(unacked_messages[1].data, first_consume_messages[1].data); - - let messages = recv_group - .consumer_messages("consumer1", ReadOption::Count(5)) - .await - .unwrap(); - assert_eq!(messages.len(), 3); - assert_eq!(messages[0].data, vec![2]); - assert_eq!(messages[1].data, vec![3]); - assert_eq!(messages[2].data, vec![4]); -} - -#[tokio::test] -async fn multiple_reader_single_sender_update_stream_test() { - let redis_stream = redis_stream().await; - let workspace = uuid::Uuid::new_v4().to_string(); - let object_id = uuid::Uuid::new_v4().to_string(); - - let mut send_group = redis_stream - .collab_update_stream_group(&workspace, &object_id, "write") - .await - .unwrap(); - send_group.insert_message(vec![1, 2, 3]).await.unwrap(); - send_group.insert_message(vec![4, 5, 6]).await.unwrap(); - - let recv_group_1 = redis_stream - .collab_update_stream_group(&workspace, &object_id, "read1") - .await - .unwrap(); - - let recv_group_2 = redis_stream - .collab_update_stream_group(&workspace, &object_id, "read2") - .await - .unwrap(); - // Both groups should have the same messages - for mut group in vec![recv_group_1, recv_group_2] { - let messages = group - .consumer_messages("consumer1", ReadOption::Count(10)) - .await - .unwrap(); - assert_eq!(messages.len(), 2); - assert_eq!(messages[0].data, vec![1, 2, 3]); - assert_eq!(messages[1].data, vec![4, 5, 6]); - group.ack_messages(&messages).await.unwrap(); - - let messages = group - .consumer_messages("consumer1", ReadOption::Count(10)) - .await - .unwrap(); - assert!(messages.is_empty()); - } -} diff --git a/services/appflowy-history_deprecated/tests/util.rs b/services/appflowy-history_deprecated/tests/util.rs deleted file mode 100644 index 45d6c42eb..000000000 --- a/services/appflowy-history_deprecated/tests/util.rs +++ /dev/null @@ -1,176 +0,0 @@ -use anyhow::Result; -use anyhow::{anyhow, Context}; -use appflowy_history::application::run_server; -use appflowy_history::config::Config; -use assert_json_diff::{assert_json_matches_no_panic, CompareMode}; -use collab::core::collab::DataSource; -use collab::core::origin::CollabOrigin; -use collab::preclude::Collab; -use collab_stream::client::CollabRedisStream; -use futures::future::BoxFuture; -use rand::{thread_rng, Rng}; -use serde_json::{json, Value}; -use sqlx::PgPool; -use std::net::SocketAddr; -use std::ops::{Deref, DerefMut}; -use std::sync::Arc; -use std::time::Duration; -use tokio::net::TcpListener; -use tokio::sync::Mutex; -use tokio::time::timeout; - -use tonic::Status; -use tonic_proto::history::history_client::HistoryClient; -use tonic_proto::history::HistoryStatePb; - -pub async fn redis_client() -> redis::Client { - let redis_uri = "redis://localhost:6379"; - redis::Client::open(redis_uri) - .context("failed to connect to redis") - .unwrap() -} - -pub async fn redis_stream() -> CollabRedisStream { - let redis_client = redis_client().await; - CollabRedisStream::new(redis_client) - .await - .context("failed to create stream client") - .unwrap() -} - -#[allow(dead_code)] -pub fn random_i64() -> i64 { - let mut rng = thread_rng(); - let num: i64 = rng.gen(); - num -} - -#[allow(dead_code)] -pub async fn setup_db(pool: &PgPool) -> anyhow::Result<()> { - // Have to manually create schema and tables managed by gotrue but referenced by our - // migration scripts. - sqlx::query(r#"create schema auth"#).execute(pool).await?; - sqlx::query( - r#" - CREATE TABLE auth.users( - id uuid NOT NULL UNIQUE, - deleted_at timestamptz null, - CONSTRAINT users_pkey PRIMARY KEY (id) - ) - "#, - ) - .execute(pool) - .await?; - - sqlx::migrate!("../../migrations") - .set_ignore_missing(true) - .run(pool) - .await - .unwrap(); - Ok(()) -} - -#[derive(Clone)] -#[allow(dead_code)] -pub struct TestRpcClient { - pub config: Config, - history_rpc: HistoryClient, -} - -impl Deref for TestRpcClient { - type Target = HistoryClient; - - fn deref(&self) -> &Self::Target { - &self.history_rpc - } -} - -impl DerefMut for TestRpcClient { - fn deref_mut(&mut self) -> &mut Self::Target { - &mut self.history_rpc - } -} - -impl TestRpcClient { - pub async fn new(addr: SocketAddr, config: Config) -> Self { - let url = format!("http://{}", addr); - let history_rpc = HistoryClient::connect(url) - .await - .context("failed to connect to history server") - .unwrap(); - Self { - history_rpc, - config, - } - } -} - -#[allow(dead_code)] -pub async fn run_test_server(control_stream_key: String) -> TestRpcClient { - let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); - let addr = listener.local_addr().unwrap(); - let mut config = Config::from_env().expect("failed to load config"); - config.stream_settings.control_key = control_stream_key; - - let cloned_config = config.clone(); - tokio::spawn(async move { - run_server(listener, cloned_config).await.unwrap(); - }); - - TestRpcClient::new(addr, config).await -} - -#[allow(dead_code)] -pub async fn check_doc_state_json<'a, F>( - object_id: &str, - timeout_secs: u64, - expected_json: Value, - client_action: F, -) -> Result<()> -where - F: Fn() -> BoxFuture<'a, Result> + Send + Sync + 'static, -{ - let duration = Duration::from_secs(timeout_secs); - let check_interval = Duration::from_secs(2); - - let final_json = Arc::new(Mutex::new(json!({}))); - let operation = async { - loop { - if let Ok(data) = client_action().await { - let collab = Collab::new_with_source( - CollabOrigin::Server, - object_id, - DataSource::DocStateV1(data.doc_state.clone()), - vec![], - true, - ) - .unwrap(); - - let json = collab.to_json_value(); - *final_json.lock().await = json.clone(); - - if assert_json_matches_no_panic( - &json, - &expected_json, - assert_json_diff::Config::new(CompareMode::Inclusive), - ) - .is_ok() - { - return Ok::<(), Status>(()); - } - } - tokio::time::sleep(check_interval).await; - } - }; - - if timeout(duration, operation).await.is_err() { - eprintln!( - "Final JSON: {}, Expected: {}", - *final_json.lock().await, - expected_json - ); - Err(anyhow!("Timeout reached without matching expected JSON")) - } else { - Ok(()) - } -} diff --git a/src/api/history.rs b/src/api/history.rs deleted file mode 100644 index 0b369aced..000000000 --- a/src/api/history.rs +++ /dev/null @@ -1,110 +0,0 @@ -use crate::state::AppState; -use actix_web::web::Data; -use actix_web::{web, Scope}; - -use anyhow::anyhow; -use app_error::AppError; -use shared_entity::dto::history_dto::{ - HistoryState, RepeatedSnapshotMeta, SnapshotInfo, SnapshotMeta, -}; -use shared_entity::response::{AppResponse, JsonAppResponse}; - -use tonic_proto::history::SnapshotRequestPb; - -pub fn history_scope() -> Scope { - web::scope("/api/history/{workspace_id}") - .service(web::resource("/{object_id}/{collab_type}").route(web::get().to(get_snapshot_handler))) - .service( - web::resource("/{object_id}/{collab_type}/latest") - .route(web::get().to(get_latest_history_handler)), - ) -} - -async fn get_snapshot_handler( - path: web::Path<(String, String, i32)>, - state: Data, -) -> actix_web::Result> { - let (workspace_id, object_id, collab_type) = path.into_inner(); - let request = SnapshotRequestPb { - workspace_id, - object_id, - collab_type, - num_snapshot: 1, - }; - - let items = state - .grpc_history_client - .lock() - .await - .get_snapshots(request) - .await - .map_err(|err| AppError::Internal(anyhow!(err.to_string())))? - .into_inner() - .items - .into_iter() - .map(|item| SnapshotMeta { - oid: item.oid, - snapshot: item.snapshot, - snapshot_version: item.snapshot_version, - created_at: item.created_at, - }) - .collect::>(); - - Ok( - AppResponse::Ok() - .with_data(RepeatedSnapshotMeta { items }) - .into(), - ) -} - -async fn get_latest_history_handler( - path: web::Path<(String, String, i32)>, - state: Data, -) -> actix_web::Result> { - let (workspace_id, object_id, collab_type) = path.into_inner(); - let request = SnapshotRequestPb { - workspace_id, - object_id, - collab_type, - num_snapshot: 1, - }; - - let pb = state - .grpc_history_client - .lock() - .await - .get_latest_snapshot(request) - .await - .map_err(|err| AppError::Internal(anyhow!(err.to_string())))? - .into_inner(); - - let pb_history_state = pb - .history_state - .ok_or_else(|| AppError::Internal(anyhow!("No history state found")))?; - - let pb_snapshot_meta = pb - .snapshot_meta - .ok_or_else(|| AppError::Internal(anyhow!("No snapshot meta found")))?; - - let history_state = HistoryState { - object_id: pb_history_state.object_id, - doc_state: pb_history_state.doc_state, - doc_state_version: pb_history_state.doc_state_version, - }; - - let snapshot_meta = SnapshotMeta { - oid: pb_snapshot_meta.oid, - snapshot: pb_snapshot_meta.snapshot, - snapshot_version: pb_snapshot_meta.snapshot_version, - created_at: pb_snapshot_meta.created_at, - }; - - Ok( - AppResponse::Ok() - .with_data(SnapshotInfo { - history: history_state, - snapshot_meta, - }) - .into(), - ) -} diff --git a/src/api/mod.rs b/src/api/mod.rs index 58b8920a8..7b2360834 100644 --- a/src/api/mod.rs +++ b/src/api/mod.rs @@ -3,7 +3,6 @@ pub mod ai; pub mod chat; pub mod data_import; pub mod file_storage; -pub mod history; pub mod metrics; pub mod search; pub mod server_info; diff --git a/src/application.rs b/src/application.rs index d27874a6b..d7ad47686 100644 --- a/src/application.rs +++ b/src/application.rs @@ -26,7 +26,6 @@ use aws_sdk_s3::operation::create_bucket::CreateBucketError; use aws_sdk_s3::types::{ BucketInfo, BucketLocationConstraint, BucketType, CreateBucketConfiguration, }; -use collab::lock::Mutex; use mailer::config::MailerSetting; use openssl::ssl::{SslAcceptor, SslAcceptorBuilder, SslFiletype, SslMethod}; use openssl::x509::X509; @@ -50,14 +49,12 @@ use indexer::scheduler::{IndexerConfiguration, IndexerScheduler}; use infra::env_util::get_env_var; use mailer::sender::Mailer; use snowflake::Snowflake; -use tonic_proto::history::history_client::HistoryClient; use crate::api::access_request::access_request_scope; use crate::api::ai::ai_completion_scope; use crate::api::chat::chat_scope; use crate::api::data_import::data_import_scope; use crate::api::file_storage::file_storage_scope; -use crate::api::history::history_scope; use crate::api::metrics::metrics_scope; use crate::api::search::search_scope; use crate::api::server_info::server_info_scope; @@ -165,7 +162,6 @@ pub async fn run_actix_server( .service(file_storage_scope()) .service(chat_scope()) .service(ai_completion_scope()) - .service(history_scope()) .service(metrics_scope()) .service(search_scope()) .service(template_scope()) @@ -315,16 +311,6 @@ pub async fn init_state(config: &Config, rt_cmd_tx: CLCommandSender) -> Result Result Result { port: get_env_var("AI_SERVER_PORT", "5001").into(), host: get_env_var("AI_SERVER_HOST", "localhost").into(), }, - grpc_history: GrpcHistorySetting { - addrs: get_env_var("APPFLOWY_GRPC_HISTORY_ADDRS", "http://localhost:50051"), - }, collab: CollabSetting { group_persistence_interval_secs: get_env_var( "APPFLOWY_COLLAB_GROUP_PERSISTENCE_INTERVAL", diff --git a/src/state.rs b/src/state.rs index 2c7f0cf27..c22e79ad2 100644 --- a/src/state.rs +++ b/src/state.rs @@ -2,7 +2,6 @@ use std::sync::Arc; use access_control::collab::{CollabAccessControl, RealtimeAccessControl}; use access_control::workspace::WorkspaceAccessControl; -use collab::lock::Mutex; use dashmap::DashMap; use secrecy::{ExposeSecret, Secret}; use sqlx::PgPool; @@ -25,7 +24,6 @@ use gotrue::grant::{Grant, PasswordGrant}; use indexer::metrics::EmbeddingMetrics; use indexer::scheduler::IndexerScheduler; use snowflake::Snowflake; -use tonic_proto::history::history_client::HistoryClient; use crate::api::metrics::{AppFlowyWebMetrics, PublishedCollabMetrics, RequestMetrics}; use crate::biz::chat::metrics::AIMetrics; @@ -57,7 +55,6 @@ pub struct AppState { pub gotrue_admin: GoTrueAdmin, pub mailer: AFCloudMailer, pub ai_client: AppFlowyAIClient, - pub grpc_history_client: Arc>>, pub indexer_scheduler: Arc, } diff --git a/tests/sql_test/history_test.rs b/tests/sql_test/history_test.rs index 91380e036..ca154479c 100644 --- a/tests/sql_test/history_test.rs +++ b/tests/sql_test/history_test.rs @@ -4,7 +4,7 @@ use database::history::ops::{ get_latest_snapshot, get_latest_snapshot_state, get_snapshot_meta_list, insert_history, }; use sqlx::PgPool; -use tonic_proto::history::{SnapshotMetaPb, SnapshotStatePb}; +use tonic_proto::history::SnapshotMetaPb; use uuid::Uuid; #[sqlx::test(migrations = false)] @@ -38,19 +38,16 @@ async fn insert_snapshot_test(pool: PgPool) { }, ]; - let snapshot_state = SnapshotStatePb { - oid: object_id.clone(), - doc_state: vec![10, 11, 12], - doc_state_version: 1, - deps_snapshot_id: None, - }; + let doc_state = vec![10, 11, 12]; + let doc_state_version = 1; + let deps_snapshot_id = None; insert_history( &workspace_id, - &snapshot_state.oid, - snapshot_state.doc_state, - snapshot_state.doc_state_version, - snapshot_state.deps_snapshot_id, + &object_id, + doc_state, + doc_state_version, + deps_snapshot_id, collab_type.clone(), timestamp + 200, snapshots,