Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
stringhandler committed Jan 23, 2025
1 parent d6a7114 commit 2042636
Show file tree
Hide file tree
Showing 4 changed files with 133 additions and 86 deletions.
18 changes: 18 additions & 0 deletions src/server/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use minotari_app_grpc::tari_rpc::{base_node_server::BaseNodeServer, sha_p2_pool_
use tari_common::configuration::Network;
use tari_core::{consensus::ConsensusManager, proof_of_work::randomx_factory::RandomXFactory};
use tari_shutdown::ShutdownSignal;
use tokio::task;

use super::http::stats_collector::{StatsBroadcastClient, StatsCollector};
use crate::{
Expand Down Expand Up @@ -55,6 +56,23 @@ where S: ShareChain
) -> Result<Self, Error> {
let share_chain_sha3x = Arc::new(share_chain_sha3x);
let share_chain_random_x = Arc::new(share_chain_random_x);
let sha3x_chain = share_chain_sha3x.clone();
let rx_chain = share_chain_random_x.clone();
task::spawn(async move {
let _res = sha3x_chain
.load()
.await
.inspect_err(|e| error!(target: LOG_TARGET, "Error loading sha3x chain: {:?}", e));
let _res = rx_chain
.load()
.await
.inspect_err(|e| error!(target: LOG_TARGET, "Error loading randomx chain: {:?}", e));
});
// task::spawn(async move {
// rx_chain.load().await;
// });
share_chain_sha3x.load().await?;
share_chain_random_x.load().await?;
let are_we_synced_with_randomx_p2pool = Arc::new(AtomicBool::new(false));
let are_we_synced_with_sha3x_p2pool = Arc::new(AtomicBool::new(false));
let stats_client = stats_collector.create_client();
Expand Down
135 changes: 78 additions & 57 deletions src/sharechain/in_memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ use tari_utilities::{epoch_time::EpochTime, hex::Hex};
use tokio::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard};

use super::{
lmdb_block_storage::LmdbBlockStorage,
lmdb_block_storage::{BlockCache, InMemoryBlockCache, LmdbBlockStorage},
p2chain,
MAIN_REWARD_SHARE,
MIN_RANDOMX_DIFFICULTY,
MIN_SHA3X_DIFFICULTY,
Expand Down Expand Up @@ -69,62 +70,15 @@ impl InMemoryShareChain {
if pow_algo == PowAlgorithm::RandomX && block_validation_params.is_none() {
return Err(ShareChainError::MissingBlockValidationParams);
}

let data_path = config.block_cache_file.join(pow_algo.to_string());

let mut p2chain = None;
if fs::exists(&data_path).map_err(|e| anyhow!("block cache file errored when checking exists: {}", e))? {
let bkp_file = config
.block_cache_file
.as_path()
.parent()
.ok_or_else(|| anyhow!("Block cache file has no parent"))?
.join("block_cache_backup")
.join(pow_algo.to_string());
info!(target: LOG_TARGET, "Found old block cache file, renaming from {:?} to {:?}", data_path.as_path(), &bkp_file);

// First remove the old backup file
let _unused = fs::remove_dir_all(bkp_file.as_path())
.inspect_err(|e| error!(target: LOG_TARGET, "Could not remove old block cache file:{:?}", e));
fs::create_dir_all(bkp_file.parent().unwrap())
.map_err(|e| anyhow::anyhow!("Could not create block cache backup directory:{:?}", e))?;
fs::rename(data_path.as_path(), bkp_file.as_path())
.map_err(|e| anyhow::anyhow!("Could not rename file to old file:{:?}", e))?;
let old = LmdbBlockStorage::new_from_path(bkp_file.as_path());
let new = LmdbBlockStorage::new_from_path(&data_path);
match P2Chain::try_load(
pow_algo,
config.share_window * 2,
config.share_window,
config.block_time,
old,
new,
) {
Ok(p) => {
let _unused =
stat_client.send_chain_changed(pow_algo, p.get_height(), p.get_max_chain_length() as u64);

p2chain = Some(p);
},
Err(e) => error!(target: LOG_TARGET, "Could not load chain from file: {}", e),
};

// fs::remove_dir_all(bkp_file.as_path())
// .map_err(|e| anyhow::anyhow!("Could not remove old block cache file:{:?}", e))?;
}

if p2chain.is_none() {
let block_cache = LmdbBlockStorage::new_from_path(&data_path);
p2chain = Some(P2Chain::new_empty(
pow_algo,
config.share_window * 2,
config.share_window,
config.block_time,
block_cache,
));
}

let p2chain = p2chain.unwrap();
// Create a temporary chain so that we can start all services and then
// load the actual one
let p2chain = P2Chain::new_empty(
pow_algo,
config.share_window * 2,
config.share_window,
config.block_time,
LmdbBlockStorage::new_from_temp_dir(),
);

Ok(Self {
p2_chain: Arc::new(RwLock::new(p2chain)),
Expand Down Expand Up @@ -425,6 +379,73 @@ impl InMemoryShareChain {

#[async_trait]
impl ShareChain for InMemoryShareChain {
async fn load(&self) -> Result<(), ShareChainError> {
let mut p2_chain_w_lock = self.p2_chain.write().await;
let data_path = self.config.block_cache_file.join(self.pow_algo.to_string());

let mut p2chain = None;
if fs::exists(&data_path).map_err(|e| anyhow!("block cache file errored when checking exists: {}", e))? {
let bkp_file = self
.config
.block_cache_file
.as_path()
.parent()
.ok_or_else(|| anyhow!("Block cache file has no parent"))?
.join("block_cache_backup")
.join(self.pow_algo.to_string());
info!(target: LOG_TARGET, "Found old block cache file, renaming from {:?} to {:?}", data_path.as_path(), &bkp_file);

// First remove the old backup file
let _unused = fs::remove_dir_all(bkp_file.as_path())
.inspect_err(|e| error!(target: LOG_TARGET, "Could not remove old block cache file:{:?}", e));
fs::create_dir_all(bkp_file.parent().unwrap())
.map_err(|e| anyhow::anyhow!("Could not create block cache backup directory:{:?}", e))?;
fs::rename(data_path.as_path(), bkp_file.as_path())
.map_err(|e| anyhow::anyhow!("Could not rename file to old file:{:?}", e))?;
let old = LmdbBlockStorage::new_from_path(bkp_file.as_path());
let new = LmdbBlockStorage::new_from_path(&data_path);
match P2Chain::try_load(
self.pow_algo,
self.config.share_window * 2,
self.config.share_window,
self.config.block_time,
old,
new,
) {
Ok(p) => {
let _unused = self.stat_client.send_chain_changed(
self.pow_algo,
p.get_height(),
p.get_max_chain_length() as u64,
);

p2chain = Some(p);
},
Err(e) => error!(target: LOG_TARGET, "Could not load chain from file: {}", e),
};

// fs::remove_dir_all(bkp_file.as_path())
// .map_err(|e| anyhow::anyhow!("Could not remove old block cache file:{:?}", e))?;
}

if p2chain.is_none() {
let block_cache = LmdbBlockStorage::new_from_path(&data_path);
p2chain = Some(P2Chain::new_empty(
self.pow_algo,
self.config.share_window * 2,
self.config.share_window,
self.config.block_time,
block_cache,
));
}

let p2chain = p2chain.unwrap();

// let mut p2_chain = self.p2_chain.write().await;
*p2_chain_w_lock = p2chain;
Ok(())
}

async fn submit_block(&self, block: Arc<P2Block>) -> Result<ChainAddResult, ShareChainError> {
if block.version != PROTOCOL_VERSION {
return Err(ShareChainError::BlockValidation("Block version is too low".to_string()));
Expand Down
65 changes: 36 additions & 29 deletions src/sharechain/lmdb_block_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
// DAMAGE.

use std::{
collections::HashMap,
fs,
path::Path,
sync::{Arc, RwLock},
Expand All @@ -38,17 +39,18 @@ use rkv::{
};
use tari_common_types::types::BlockHash;
use tari_utilities::ByteArray;
use tempfile::TempDir;

use super::P2Block;
use crate::server::p2p::messages::{deserialize_message, serialize_message};

const LOG_TARGET: &str = "tari::p2pool::sharechain::lmdb_block_storage";
pub(crate) struct LmdbBlockStorage {
file_handle: Arc<RwLock<Rkv<LmdbEnvironment>>>,
temp_dir: Option<TempDir>,
}

impl LmdbBlockStorage {
#[cfg(test)]
pub fn new_from_temp_dir() -> Self {
use rand::{distributions::Alphanumeric, Rng};
use tempfile::Builder;
Expand All @@ -63,7 +65,10 @@ impl LmdbBlockStorage {
let mut manager = Manager::<LmdbEnvironment>::singleton().write().unwrap();
let file_handle = manager.get_or_create(path, Rkv::new::<Lmdb>).unwrap();

Self { file_handle }
Self {
file_handle,
temp_dir: Some(root),
}
}

pub fn new_from_path(path: &Path) -> Self {
Expand All @@ -75,7 +80,10 @@ impl LmdbBlockStorage {
let mut manager = Manager::<LmdbEnvironment>::singleton().write().unwrap();
let file_handle = manager.get_or_create(path, Rkv::new::<Lmdb>).unwrap();

Self { file_handle }
Self {
file_handle,
temp_dir: None,
}
}
}

Expand Down Expand Up @@ -185,41 +193,40 @@ pub trait BlockCache {
fn all_blocks(&self) -> Result<Vec<Arc<P2Block>>, Error>;
}

#[cfg(test)]
pub mod test {
use std::collections::HashMap;
pub(crate) struct InMemoryBlockCache {
blocks: Arc<RwLock<HashMap<BlockHash, Arc<P2Block>>>>,
}

use super::*;
impl InMemoryBlockCache {
pub fn new() -> Self {
Self {
blocks: Arc::new(RwLock::new(HashMap::new())),
}
}
}

pub(crate) struct InMemoryBlockCache {
blocks: Arc<RwLock<HashMap<BlockHash, Arc<P2Block>>>>,
impl BlockCache for InMemoryBlockCache {
fn get(&self, hash: &BlockHash) -> Option<Arc<P2Block>> {
self.blocks.read().unwrap().get(hash).cloned()
}

impl InMemoryBlockCache {
pub fn new() -> Self {
Self {
blocks: Arc::new(RwLock::new(HashMap::new())),
}
}
fn delete(&self, hash: &BlockHash) {
self.blocks.write().unwrap().remove(hash);
}

impl BlockCache for InMemoryBlockCache {
fn get(&self, hash: &BlockHash) -> Option<Arc<P2Block>> {
self.blocks.read().unwrap().get(hash).cloned()
}
fn insert(&self, hash: BlockHash, block: Arc<P2Block>) {
self.blocks.write().unwrap().insert(hash, block);
}

fn delete(&self, hash: &BlockHash) {
self.blocks.write().unwrap().remove(hash);
}
fn all_blocks(&self) -> Result<Vec<Arc<P2Block>>, Error> {
Ok(self.blocks.read().unwrap().values().cloned().collect())
}
}

fn insert(&self, hash: BlockHash, block: Arc<P2Block>) {
self.blocks.write().unwrap().insert(hash, block);
}
#[cfg(test)]
pub mod test {

fn all_blocks(&self) -> Result<Vec<Arc<P2Block>>, Error> {
Ok(self.blocks.read().unwrap().values().cloned().collect())
}
}
use super::*;

#[test]
fn test_saving_and_retrieving_blocks() {
Expand Down
1 change: 1 addition & 0 deletions src/sharechain/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ impl BlockValidationParams {

#[async_trait]
pub(crate) trait ShareChain: Send + Sync + 'static {
async fn load(&self) -> Result<(), ShareChainError>;
async fn get_total_chain_pow(&self) -> AccumulatedDifficulty;
/// Adds a new block if valid to chain.
async fn submit_block(&self, block: Arc<P2Block>) -> Result<ChainAddResult, ShareChainError>;
Expand Down

0 comments on commit 2042636

Please sign in to comment.