diff --git a/src/server/server.rs b/src/server/server.rs index acefec92..90182ea4 100644 --- a/src/server/server.rs +++ b/src/server/server.rs @@ -13,6 +13,7 @@ use minotari_app_grpc::tari_rpc::{base_node_server::BaseNodeServer, sha_p2_pool_ use tari_common::configuration::Network; use tari_core::{consensus::ConsensusManager, proof_of_work::randomx_factory::RandomXFactory}; use tari_shutdown::ShutdownSignal; +use tokio::task; use super::http::stats_collector::{StatsBroadcastClient, StatsCollector}; use crate::{ @@ -55,6 +56,23 @@ where S: ShareChain ) -> Result { let share_chain_sha3x = Arc::new(share_chain_sha3x); let share_chain_random_x = Arc::new(share_chain_random_x); + let sha3x_chain = share_chain_sha3x.clone(); + let rx_chain = share_chain_random_x.clone(); + task::spawn(async move { + let _res = sha3x_chain + .load() + .await + .inspect_err(|e| error!(target: LOG_TARGET, "Error loading sha3x chain: {:?}", e)); + let _res = rx_chain + .load() + .await + .inspect_err(|e| error!(target: LOG_TARGET, "Error loading randomx chain: {:?}", e)); + }); + // task::spawn(async move { + // rx_chain.load().await; + // }); + share_chain_sha3x.load().await?; + share_chain_random_x.load().await?; let are_we_synced_with_randomx_p2pool = Arc::new(AtomicBool::new(false)); let are_we_synced_with_sha3x_p2pool = Arc::new(AtomicBool::new(false)); let stats_client = stats_collector.create_client(); diff --git a/src/sharechain/in_memory.rs b/src/sharechain/in_memory.rs index 4494f142..7ce6532d 100644 --- a/src/sharechain/in_memory.rs +++ b/src/sharechain/in_memory.rs @@ -20,7 +20,8 @@ use tari_utilities::{epoch_time::EpochTime, hex::Hex}; use tokio::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard}; use super::{ - lmdb_block_storage::LmdbBlockStorage, + lmdb_block_storage::{BlockCache, InMemoryBlockCache, LmdbBlockStorage}, + p2chain, MAIN_REWARD_SHARE, MIN_RANDOMX_DIFFICULTY, MIN_SHA3X_DIFFICULTY, @@ -69,62 +70,15 @@ impl InMemoryShareChain { if pow_algo == PowAlgorithm::RandomX && block_validation_params.is_none() { return Err(ShareChainError::MissingBlockValidationParams); } - - let data_path = config.block_cache_file.join(pow_algo.to_string()); - - let mut p2chain = None; - if fs::exists(&data_path).map_err(|e| anyhow!("block cache file errored when checking exists: {}", e))? { - let bkp_file = config - .block_cache_file - .as_path() - .parent() - .ok_or_else(|| anyhow!("Block cache file has no parent"))? - .join("block_cache_backup") - .join(pow_algo.to_string()); - info!(target: LOG_TARGET, "Found old block cache file, renaming from {:?} to {:?}", data_path.as_path(), &bkp_file); - - // First remove the old backup file - let _unused = fs::remove_dir_all(bkp_file.as_path()) - .inspect_err(|e| error!(target: LOG_TARGET, "Could not remove old block cache file:{:?}", e)); - fs::create_dir_all(bkp_file.parent().unwrap()) - .map_err(|e| anyhow::anyhow!("Could not create block cache backup directory:{:?}", e))?; - fs::rename(data_path.as_path(), bkp_file.as_path()) - .map_err(|e| anyhow::anyhow!("Could not rename file to old file:{:?}", e))?; - let old = LmdbBlockStorage::new_from_path(bkp_file.as_path()); - let new = LmdbBlockStorage::new_from_path(&data_path); - match P2Chain::try_load( - pow_algo, - config.share_window * 2, - config.share_window, - config.block_time, - old, - new, - ) { - Ok(p) => { - let _unused = - stat_client.send_chain_changed(pow_algo, p.get_height(), p.get_max_chain_length() as u64); - - p2chain = Some(p); - }, - Err(e) => error!(target: LOG_TARGET, "Could not load chain from file: {}", e), - }; - - // fs::remove_dir_all(bkp_file.as_path()) - // .map_err(|e| anyhow::anyhow!("Could not remove old block cache file:{:?}", e))?; - } - - if p2chain.is_none() { - let block_cache = LmdbBlockStorage::new_from_path(&data_path); - p2chain = Some(P2Chain::new_empty( - pow_algo, - config.share_window * 2, - config.share_window, - config.block_time, - block_cache, - )); - } - - let p2chain = p2chain.unwrap(); + // Create a temporary chain so that we can start all services and then + // load the actual one + let p2chain = P2Chain::new_empty( + pow_algo, + config.share_window * 2, + config.share_window, + config.block_time, + LmdbBlockStorage::new_from_temp_dir(), + ); Ok(Self { p2_chain: Arc::new(RwLock::new(p2chain)), @@ -425,6 +379,73 @@ impl InMemoryShareChain { #[async_trait] impl ShareChain for InMemoryShareChain { + async fn load(&self) -> Result<(), ShareChainError> { + let mut p2_chain_w_lock = self.p2_chain.write().await; + let data_path = self.config.block_cache_file.join(self.pow_algo.to_string()); + + let mut p2chain = None; + if fs::exists(&data_path).map_err(|e| anyhow!("block cache file errored when checking exists: {}", e))? { + let bkp_file = self + .config + .block_cache_file + .as_path() + .parent() + .ok_or_else(|| anyhow!("Block cache file has no parent"))? + .join("block_cache_backup") + .join(self.pow_algo.to_string()); + info!(target: LOG_TARGET, "Found old block cache file, renaming from {:?} to {:?}", data_path.as_path(), &bkp_file); + + // First remove the old backup file + let _unused = fs::remove_dir_all(bkp_file.as_path()) + .inspect_err(|e| error!(target: LOG_TARGET, "Could not remove old block cache file:{:?}", e)); + fs::create_dir_all(bkp_file.parent().unwrap()) + .map_err(|e| anyhow::anyhow!("Could not create block cache backup directory:{:?}", e))?; + fs::rename(data_path.as_path(), bkp_file.as_path()) + .map_err(|e| anyhow::anyhow!("Could not rename file to old file:{:?}", e))?; + let old = LmdbBlockStorage::new_from_path(bkp_file.as_path()); + let new = LmdbBlockStorage::new_from_path(&data_path); + match P2Chain::try_load( + self.pow_algo, + self.config.share_window * 2, + self.config.share_window, + self.config.block_time, + old, + new, + ) { + Ok(p) => { + let _unused = self.stat_client.send_chain_changed( + self.pow_algo, + p.get_height(), + p.get_max_chain_length() as u64, + ); + + p2chain = Some(p); + }, + Err(e) => error!(target: LOG_TARGET, "Could not load chain from file: {}", e), + }; + + // fs::remove_dir_all(bkp_file.as_path()) + // .map_err(|e| anyhow::anyhow!("Could not remove old block cache file:{:?}", e))?; + } + + if p2chain.is_none() { + let block_cache = LmdbBlockStorage::new_from_path(&data_path); + p2chain = Some(P2Chain::new_empty( + self.pow_algo, + self.config.share_window * 2, + self.config.share_window, + self.config.block_time, + block_cache, + )); + } + + let p2chain = p2chain.unwrap(); + + // let mut p2_chain = self.p2_chain.write().await; + *p2_chain_w_lock = p2chain; + Ok(()) + } + async fn submit_block(&self, block: Arc) -> Result { if block.version != PROTOCOL_VERSION { return Err(ShareChainError::BlockValidation("Block version is too low".to_string())); diff --git a/src/sharechain/lmdb_block_storage.rs b/src/sharechain/lmdb_block_storage.rs index c12a45eb..65f0105d 100644 --- a/src/sharechain/lmdb_block_storage.rs +++ b/src/sharechain/lmdb_block_storage.rs @@ -22,6 +22,7 @@ // DAMAGE. use std::{ + collections::HashMap, fs, path::Path, sync::{Arc, RwLock}, @@ -38,6 +39,7 @@ use rkv::{ }; use tari_common_types::types::BlockHash; use tari_utilities::ByteArray; +use tempfile::TempDir; use super::P2Block; use crate::server::p2p::messages::{deserialize_message, serialize_message}; @@ -45,10 +47,10 @@ use crate::server::p2p::messages::{deserialize_message, serialize_message}; const LOG_TARGET: &str = "tari::p2pool::sharechain::lmdb_block_storage"; pub(crate) struct LmdbBlockStorage { file_handle: Arc>>, + temp_dir: Option, } impl LmdbBlockStorage { - #[cfg(test)] pub fn new_from_temp_dir() -> Self { use rand::{distributions::Alphanumeric, Rng}; use tempfile::Builder; @@ -63,7 +65,10 @@ impl LmdbBlockStorage { let mut manager = Manager::::singleton().write().unwrap(); let file_handle = manager.get_or_create(path, Rkv::new::).unwrap(); - Self { file_handle } + Self { + file_handle, + temp_dir: Some(root), + } } pub fn new_from_path(path: &Path) -> Self { @@ -75,7 +80,10 @@ impl LmdbBlockStorage { let mut manager = Manager::::singleton().write().unwrap(); let file_handle = manager.get_or_create(path, Rkv::new::).unwrap(); - Self { file_handle } + Self { + file_handle, + temp_dir: None, + } } } @@ -185,41 +193,40 @@ pub trait BlockCache { fn all_blocks(&self) -> Result>, Error>; } -#[cfg(test)] -pub mod test { - use std::collections::HashMap; +pub(crate) struct InMemoryBlockCache { + blocks: Arc>>>, +} - use super::*; +impl InMemoryBlockCache { + pub fn new() -> Self { + Self { + blocks: Arc::new(RwLock::new(HashMap::new())), + } + } +} - pub(crate) struct InMemoryBlockCache { - blocks: Arc>>>, +impl BlockCache for InMemoryBlockCache { + fn get(&self, hash: &BlockHash) -> Option> { + self.blocks.read().unwrap().get(hash).cloned() } - impl InMemoryBlockCache { - pub fn new() -> Self { - Self { - blocks: Arc::new(RwLock::new(HashMap::new())), - } - } + fn delete(&self, hash: &BlockHash) { + self.blocks.write().unwrap().remove(hash); } - impl BlockCache for InMemoryBlockCache { - fn get(&self, hash: &BlockHash) -> Option> { - self.blocks.read().unwrap().get(hash).cloned() - } + fn insert(&self, hash: BlockHash, block: Arc) { + self.blocks.write().unwrap().insert(hash, block); + } - fn delete(&self, hash: &BlockHash) { - self.blocks.write().unwrap().remove(hash); - } + fn all_blocks(&self) -> Result>, Error> { + Ok(self.blocks.read().unwrap().values().cloned().collect()) + } +} - fn insert(&self, hash: BlockHash, block: Arc) { - self.blocks.write().unwrap().insert(hash, block); - } +#[cfg(test)] +pub mod test { - fn all_blocks(&self) -> Result>, Error> { - Ok(self.blocks.read().unwrap().values().cloned().collect()) - } - } + use super::*; #[test] fn test_saving_and_retrieving_blocks() { diff --git a/src/sharechain/mod.rs b/src/sharechain/mod.rs index dae0822b..ddd301e3 100644 --- a/src/sharechain/mod.rs +++ b/src/sharechain/mod.rs @@ -86,6 +86,7 @@ impl BlockValidationParams { #[async_trait] pub(crate) trait ShareChain: Send + Sync + 'static { + async fn load(&self) -> Result<(), ShareChainError>; async fn get_total_chain_pow(&self) -> AccumulatedDifficulty; /// Adds a new block if valid to chain. async fn submit_block(&self, block: Arc) -> Result;