From b4b87c5c01a748796b9d4ca4584a753cbde35cda Mon Sep 17 00:00:00 2001 From: muji Date: Mon, 3 Feb 2025 13:17:24 +0800 Subject: [PATCH] Implement StorageEventLogs for fs client storage. --- Cargo.lock | 1 + crates/account/Cargo.toml | 1 + crates/account/src/account.rs | 51 ++---------------- crates/account/src/sync.rs | 56 +++++++++++++++++--- crates/database/src/archive/import.rs | 1 + crates/storage/client/src/filesystem/mod.rs | 2 + crates/storage/client/src/filesystem/sync.rs | 53 ++++++++++++++++++ 7 files changed, 111 insertions(+), 54 deletions(-) create mode 100644 crates/storage/client/src/filesystem/sync.rs diff --git a/Cargo.lock b/Cargo.lock index fb3446b011..d4689d3a50 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4839,6 +4839,7 @@ dependencies = [ "sos-filesystem", "sos-login", "sos-migrate", + "sos-reducers", "sos-sdk", "sos-search", "sos-signer", diff --git a/crates/account/Cargo.toml b/crates/account/Cargo.toml index de84fef16d..b74a5c9766 100644 --- a/crates/account/Cargo.toml +++ b/crates/account/Cargo.toml @@ -48,6 +48,7 @@ sos-filesystem.workspace = true sos-login.workspace = true sos-sync.workspace = true sos-client-storage.workspace = true +sos-reducers.workspace = true sos-signer.workspace = true sos-vault.workspace = true sos-vfs.workspace = true diff --git a/crates/account/src/account.rs b/crates/account/src/account.rs index d67007da13..e9db0c3cc3 100644 --- a/crates/account/src/account.rs +++ b/crates/account/src/account.rs @@ -3,7 +3,7 @@ use crate::{convert::CipherComparison, AccountBuilder, Error, Result}; use sos_backend::compact::compact_folder; use sos_backend::{ reducers::FolderReducer, write_exclusive, AccessPoint, AccountEventLog, - FolderEventLog, StorageError, + StorageError, }; use sos_client_storage::{ AccessOptions, AccountPack, ClientAccountStorage, ClientDeviceStorage, @@ -48,14 +48,14 @@ use { #[cfg(feature = "archive")] use sos_filesystem::archive::{Inventory, RestoreOptions}; -use sos_backend::{BackendTarget, DeviceEventLog}; +use sos_backend::BackendTarget; use sos_core::device::{DevicePublicKey, TrustedDevice}; use sos_login::device::{DeviceManager, DeviceSigner}; use indexmap::IndexSet; #[cfg(feature = "files")] -use {sos_backend::FileEventLog, sos_external_files::FileMutationEvent}; +use sos_external_files::FileMutationEvent; #[cfg(feature = "search")] use sos_search::*; @@ -3326,48 +3326,3 @@ impl Account for LocalAccount { Ok(false) } } - -#[cfg_attr(target_arch = "wasm32", async_trait(?Send))] -#[cfg_attr(not(target_arch = "wasm32"), async_trait)] -impl StorageEventLogs for LocalAccount { - type Error = Error; - - async fn identity_log(&self) -> Result>> { - let storage = self.storage.read().await; - Ok(Arc::clone(&storage.identity_log)) - } - - async fn account_log(&self) -> Result>> { - let storage = self.storage.read().await; - Ok(Arc::clone(&storage.account_log)) - } - - async fn device_log(&self) -> Result>> { - let storage = self.storage.read().await; - Ok(Arc::clone(&storage.device_log)) - } - - #[cfg(feature = "files")] - async fn file_log(&self) -> Result>> { - let storage = self.storage.read().await; - Ok(Arc::clone(&storage.file_log)) - } - - async fn folder_details(&self) -> Result> { - let storage = self.storage.read().await; - let folders = storage.list_folders(); - Ok(folders.into_iter().cloned().collect()) - } - - async fn folder_log( - &self, - id: &VaultId, - ) -> Result>> { - let storage = self.storage.read().await; - let folder = storage - .folders() - .get(id) - .ok_or(StorageError::CacheNotAvailable(*id))?; - Ok(folder.event_log()) - } -} diff --git a/crates/account/src/sync.rs b/crates/account/src/sync.rs index 33be9f6a6d..7260f7907b 100644 --- a/crates/account/src/sync.rs +++ b/crates/account/src/sync.rs @@ -2,10 +2,12 @@ use super::folder_sync::{ FolderMerge, FolderMergeOptions, IdentityFolderMerge, }; -use crate::{Account, LocalAccount, Result}; +use crate::{Account, Error, LocalAccount, Result}; use async_trait::async_trait; -use sos_backend::reducers::DeviceReducer; -use sos_backend::StorageError; +use indexmap::IndexSet; +use sos_backend::{ + AccountEventLog, DeviceEventLog, FolderEventLog, StorageError, +}; use sos_client_storage::{ClientAccountStorage, ClientFolderStorage}; use sos_core::{decode, events::EventLog}; use sos_core::{ @@ -15,15 +17,57 @@ use sos_core::{ }, VaultId, }; +use sos_reducers::DeviceReducer; use sos_sync::{ ForceMerge, Merge, MergeOutcome, StorageEventLogs, SyncStorage, TrackedChanges, }; -use sos_vault::Vault; -use std::collections::HashSet; +use sos_vault::{Summary, Vault}; +use std::{collections::HashSet, sync::Arc}; +use tokio::sync::RwLock; #[cfg(feature = "files")] -use sos_core::events::patch::FileDiff; +use {sos_backend::FileEventLog, sos_core::events::patch::FileDiff}; + +#[cfg_attr(target_arch = "wasm32", async_trait(?Send))] +#[cfg_attr(not(target_arch = "wasm32"), async_trait)] +impl StorageEventLogs for LocalAccount { + type Error = Error; + + async fn identity_log(&self) -> Result>> { + let storage = self.storage.read().await; + Ok(storage.identity_log().await?) + } + + async fn account_log(&self) -> Result>> { + let storage = self.storage.read().await; + Ok(storage.account_log().await?) + } + + async fn device_log(&self) -> Result>> { + let storage = self.storage.read().await; + Ok(storage.device_log().await?) + } + + #[cfg(feature = "files")] + async fn file_log(&self) -> Result>> { + let storage = self.storage.read().await; + Ok(storage.file_log().await?) + } + + async fn folder_details(&self) -> Result> { + let storage = self.storage.read().await; + Ok(storage.folder_details().await?) + } + + async fn folder_log( + &self, + id: &VaultId, + ) -> Result>> { + let storage = self.storage.read().await; + Ok(storage.folder_log(id).await?) + } +} #[cfg_attr(target_arch = "wasm32", async_trait(?Send))] #[cfg_attr(not(target_arch = "wasm32"), async_trait)] diff --git a/crates/database/src/archive/import.rs b/crates/database/src/archive/import.rs index 07dee70dc6..c2445569c7 100644 --- a/crates/database/src/archive/import.rs +++ b/crates/database/src/archive/import.rs @@ -349,6 +349,7 @@ pub(crate) async fn start<'conn>( })?; let mut db_temp = NamedTempFile::new()?; db_temp.as_file_mut().write_all(&db_buffer)?; + db_temp.as_file_mut().flush()?; let source_db = Connection::open(db_temp.path())?; let import = BackupImport { diff --git a/crates/storage/client/src/filesystem/mod.rs b/crates/storage/client/src/filesystem/mod.rs index 05eb3cadb7..7342b920dc 100644 --- a/crates/storage/client/src/filesystem/mod.rs +++ b/crates/storage/client/src/filesystem/mod.rs @@ -64,6 +64,8 @@ use { #[cfg(feature = "search")] use sos_search::{AccountSearch, DocumentCount}; +mod sync; + /// Storage change event with an optional /// collection of file mutation events. #[doc(hidden)] diff --git a/crates/storage/client/src/filesystem/sync.rs b/crates/storage/client/src/filesystem/sync.rs new file mode 100644 index 0000000000..a503e98105 --- /dev/null +++ b/crates/storage/client/src/filesystem/sync.rs @@ -0,0 +1,53 @@ +use crate::{ClientFolderStorage, Error, Result}; +use async_trait::async_trait; +use indexmap::IndexSet; +use sos_backend::{ + AccountEventLog, DeviceEventLog, FolderEventLog, StorageError, +}; +use sos_core::VaultId; +use sos_sync::StorageEventLogs; +use sos_vault::Summary; +use std::sync::Arc; +use tokio::sync::RwLock; + +#[cfg(feature = "files")] +use sos_backend::FileEventLog; + +#[cfg_attr(target_arch = "wasm32", async_trait(?Send))] +#[cfg_attr(not(target_arch = "wasm32"), async_trait)] +impl StorageEventLogs for super::ClientStorage { + type Error = Error; + + async fn identity_log(&self) -> Result>> { + Ok(self.identity_log.clone()) + } + + async fn account_log(&self) -> Result>> { + Ok(self.account_log.clone()) + } + + async fn device_log(&self) -> Result>> { + Ok(self.device_log.clone()) + } + + #[cfg(feature = "files")] + async fn file_log(&self) -> Result>> { + Ok(self.file_log.clone()) + } + + async fn folder_details(&self) -> Result> { + let folders = self.list_folders(); + Ok(folders.into_iter().cloned().collect()) + } + + async fn folder_log( + &self, + id: &VaultId, + ) -> Result>> { + let folder = self + .folders + .get(id) + .ok_or(StorageError::CacheNotAvailable(*id))?; + Ok(folder.event_log()) + } +}