diff --git a/mutiny-core/src/federation.rs b/mutiny-core/src/federation.rs index 85b1ef7d8..9d992a003 100644 --- a/mutiny-core/src/federation.rs +++ b/mutiny-core/src/federation.rs @@ -931,9 +931,9 @@ impl<'a, S: MutinyStorage> IRawDatabaseTransaction for IndexedDBPseudoTransactio version, value: serde_json::to_value(hex_serialized_data).unwrap(), }; - // TODO await on persisting remotely self.storage - .set_data(key_id(&self.federation_id), value, Some(version))?; + .set_data_async_queue_remote(key_id(&self.federation_id), value, version) + .await?; Ok(()) } diff --git a/mutiny-core/src/storage.rs b/mutiny-core/src/storage.rs index 93335c201..0574aeab6 100644 --- a/mutiny-core/src/storage.rs +++ b/mutiny-core/src/storage.rs @@ -1,4 +1,3 @@ -use crate::ldkstorage::CHANNEL_MANAGER_KEY; use crate::logging::MutinyLogger; use crate::nodemanager::{NodeStorage, DEVICE_LOCK_INTERVAL_SECS}; use crate::utils::{now, spawn}; @@ -11,11 +10,13 @@ use crate::{ error::{MutinyError, MutinyStorageError}, event::PaymentInfo, }; +use crate::{ldkstorage::CHANNEL_MANAGER_KEY, utils::sleep}; use async_trait::async_trait; use bdk::chain::{Append, PersistBackend}; use bip39::Mnemonic; use bitcoin::hashes::hex::ToHex; use bitcoin::hashes::Hash; +use futures_util::lock::Mutex; use hex::FromHex; use lightning::{ln::PaymentHash, util::logger::Logger}; use lightning::{log_error, log_trace}; @@ -40,6 +41,25 @@ pub(crate) const EXPECTED_NETWORK_KEY: &str = "network"; const PAYMENT_INBOUND_PREFIX_KEY: &str = "payment_inbound/"; const PAYMENT_OUTBOUND_PREFIX_KEY: &str = "payment_outbound/"; pub const LAST_DM_SYNC_TIME_KEY: &str = "last_dm_sync_time"; +const DELAYED_WRITE_MS: i32 = 50; + +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +pub struct DelayedKeyValueItem { + pub key: String, + pub value: Value, + pub version: u32, + pub write_time: u128, +} + +impl From for VssKeyValueItem { + fn from(item: DelayedKeyValueItem) -> Self { + VssKeyValueItem { + key: item.key, + value: item.value, + version: item.version, + } + } +} fn needs_encryption(key: &str) -> bool { match key { @@ -213,6 +233,65 @@ pub trait MutinyStorage: Clone + Sized + Send + Sync + 'static { Ok(()) } + fn get_delayed_objects(&self) -> Arc>>; + + /// Set a value to persist in local storage, queues remote save + /// The function will encrypt the value if needed + async fn set_data_async_queue_remote( + &self, + key: String, + value: T, + version: u32, + ) -> Result<(), MutinyError> + where + T: Serialize + Send, + { + let data = serde_json::to_value(value).map_err(|e| MutinyError::PersistenceFailed { + source: MutinyStorageError::SerdeError { source: e }, + })?; + + // save locally first + let local_data = data.clone(); + let key_clone = key.clone(); + let json: Value = encrypt_value(key_clone.clone(), local_data, self.cipher())?; + self.set_async(key_clone, json).await?; + + // save to VSS if it is enabled + // queue up keys to persist later + if let Some(vss) = self.vss_client() { + let initial_write_time = now().as_millis(); + let item = DelayedKeyValueItem { + key: key.clone(), + value: data, + version, + write_time: initial_write_time, + }; + + let delayed_lock = self.get_delayed_objects(); + let mut delayed_keys = delayed_lock.lock().await; + delayed_keys.insert(key.clone(), item.clone()); + drop(delayed_keys); + + let delayed_keys_ref = self.get_delayed_objects(); + let original_item = item.clone(); + spawn(async move { + sleep(DELAYED_WRITE_MS).await; + + let threaded_keys = delayed_keys_ref.lock().await; + + if let Some(key_to_check) = threaded_keys.get(&key) { + if key_to_check.write_time == initial_write_time { + drop(threaded_keys); + + let _ = vss.put_objects(vec![original_item.into()]).await; + } + } + }); + } + + Ok(()) + } + /// Get a value from the storage, use get_data if you want the value to be decrypted fn get(&self, key: impl AsRef) -> Result, MutinyError> where @@ -481,6 +560,7 @@ pub struct MemoryStorage { pub cipher: Option, pub memory: Arc>>, pub vss_client: Option>, + delayed_keys: Arc>>, } impl MemoryStorage { @@ -494,6 +574,7 @@ impl MemoryStorage { password, memory: Arc::new(RwLock::new(HashMap::new())), vss_client, + delayed_keys: Arc::new(Mutex::new(HashMap::new())), } } @@ -629,6 +710,10 @@ impl MutinyStorage for MemoryStorage { async fn fetch_device_lock(&self) -> Result, MutinyError> { self.get_device_lock() } + + fn get_delayed_objects(&self) -> Arc>> { + self.delayed_keys.clone() + } } // Dummy implementation for testing or if people want to ignore persistence @@ -695,6 +780,10 @@ impl MutinyStorage for () { async fn fetch_device_lock(&self) -> Result, MutinyError> { self.get_device_lock() } + + fn get_delayed_objects(&self) -> Arc>> { + Arc::new(Mutex::new(HashMap::new())) + } } fn payment_key(inbound: bool, payment_hash: &[u8; 32]) -> String { diff --git a/mutiny-wasm/src/indexed_db.rs b/mutiny-wasm/src/indexed_db.rs index b9f1208e4..092af3721 100644 --- a/mutiny-wasm/src/indexed_db.rs +++ b/mutiny-wasm/src/indexed_db.rs @@ -1,6 +1,7 @@ use anyhow::anyhow; use async_trait::async_trait; use bip39::Mnemonic; +use futures::lock::Mutex; use gloo_utils::format::JsValueSerdeExt; use lightning::util::logger::Logger; use lightning::{log_debug, log_error}; @@ -43,6 +44,7 @@ pub struct IndexedDbStorage { pub(crate) indexed_db: Arc>, vss: Option>, logger: Arc, + delayed_keys: Arc>>, } impl IndexedDbStorage { @@ -73,6 +75,7 @@ impl IndexedDbStorage { indexed_db, vss, logger, + delayed_keys: Arc::new(Mutex::new(HashMap::new())), }) } @@ -786,6 +789,10 @@ impl MutinyStorage for IndexedDbStorage { } } } + + fn get_delayed_objects(&self) -> Arc>> { + self.delayed_keys.clone() + } } #[cfg(test)]