diff --git a/log4rs_sample.yml b/log4rs_sample.yml index d620d5a8..b8a99d08 100644 --- a/log4rs_sample.yml +++ b/log4rs_sample.yml @@ -199,3 +199,10 @@ loggers: # appenders: # - message_logging # additive: false + + # For Profiling + # tari::profiling: + # level: debug + # appenders: + # - message_logging + # additive: false diff --git a/src/cli/commands/util.rs b/src/cli/commands/util.rs index f2918e0c..011195b1 100644 --- a/src/cli/commands/util.rs +++ b/src/cli/commands/util.rs @@ -141,7 +141,6 @@ genesis_block_hash.to_hex()); config.clone(), PowAlgorithm::Sha3x, None, - consensus_manager.clone(), coinbase_extras_sha3x.clone(), stats_broadcast_client.clone(), )?; @@ -150,7 +149,6 @@ genesis_block_hash.to_hex()); config.clone(), PowAlgorithm::RandomX, Some(block_validation_params.clone()), - consensus_manager, coinbase_extras_random_x.clone(), stats_broadcast_client.clone(), )?; diff --git a/src/main.rs b/src/main.rs index 4f7a8999..07eabca1 100644 --- a/src/main.rs +++ b/src/main.rs @@ -18,6 +18,8 @@ mod cli; mod server; mod sharechain; +pub const PROFILING_LOG_TARGET: &str = "tari::profiling"; + fn format_system_time(time: SystemTime) -> String { let datetime = time.duration_since(UNIX_EPOCH).unwrap(); let seconds = datetime.as_secs(); diff --git a/src/server/grpc/mod.rs b/src/server/grpc/mod.rs index 88f20392..897a0bb8 100644 --- a/src/server/grpc/mod.rs +++ b/src/server/grpc/mod.rs @@ -8,4 +8,4 @@ pub mod error; pub mod p2pool; pub mod util; -pub(crate) const MAX_ACCEPTABLE_GRPC_TIMEOUT: std::time::Duration = std::time::Duration::from_millis(500); +pub(crate) const MAX_ACCEPTABLE_GRPC_TIMEOUT: std::time::Duration = std::time::Duration::from_millis(1000); diff --git a/src/server/grpc/p2pool.rs b/src/server/grpc/p2pool.rs index d0370a70..03ec0a96 100644 --- a/src/server/grpc/p2pool.rs +++ b/src/server/grpc/p2pool.rs @@ -14,7 +14,6 @@ use std::{ use libp2p::PeerId; use log::{debug, error, info, warn}; use minotari_app_grpc::tari_rpc::{ - base_node_client::BaseNodeClient, pow_algo::PowAlgos, sha_p2_pool_server::ShaP2Pool, GetNewBlockRequest, @@ -23,6 +22,7 @@ use minotari_app_grpc::tari_rpc::{ SubmitBlockRequest, SubmitBlockResponse, }; +use minotari_node_grpc_client::BaseNodeGrpcClient; use tari_common_types::{tari_address::TariAddress, types::FixedHash}; use tari_core::{ blocks::Block, @@ -36,18 +36,18 @@ use tari_core::{ PowData, }, }; -use tari_shutdown::ShutdownSignal; use tari_utilities::hex::Hex; use tokio::{sync::RwLock, time::timeout}; use tonic::{Request, Response, Status}; use crate::{ server::{ - grpc::{error::Error, util, util::convert_coinbase_extra, MAX_ACCEPTABLE_GRPC_TIMEOUT}, + grpc::{error::Error, util::convert_coinbase_extra, MAX_ACCEPTABLE_GRPC_TIMEOUT}, http::stats_collector::StatsBroadcastClient, p2p::{client::ServiceClient, messages::NotifyNewTipBlock}, }, sharechain::{p2block::P2Block, BlockValidationParams, ShareChain}, + PROFILING_LOG_TARGET, }; pub const MAX_STORED_TEMPLATES_RX: usize = 100; @@ -60,7 +60,8 @@ where S: ShareChain { local_peer_id: PeerId, /// Base node client - client: Arc>>, + // client: Arc>>, + client_address: String, /// P2P service client p2p_client: ServiceClient, /// SHA-3 share chain @@ -93,7 +94,6 @@ where S: ShareChain p2p_client: ServiceClient, share_chain_sha3x: Arc, share_chain_random_x: Arc, - shutdown_signal: ShutdownSignal, random_x_factory: RandomXFactory, consensus_manager: ConsensusManager, genesis_block_hash: FixedHash, @@ -104,9 +104,10 @@ where S: ShareChain ) -> Result { Ok(Self { local_peer_id, - client: Arc::new(RwLock::new( - util::connect_base_node(base_node_address, shutdown_signal).await?, - )), + // client: Arc::new(RwLock::new( + // util::connect_base_node(base_node_address, shutdown_signal).await?, + // )), + client_address: base_node_address, p2p_client, share_chain_sha3x, share_chain_random_x, @@ -211,11 +212,13 @@ where S: ShareChain PowAlgos::Sha3x => PowAlgorithm::Sha3x, }; + info!(target: PROFILING_LOG_TARGET, "get_new_block timer: {:?}", timer.elapsed()); // update coinbase extras cache // investigate lock usage let wallet_payment_address = TariAddress::from_str(grpc_req.wallet_payment_address.as_str()) .map_err(|error| Status::failed_precondition(format!("Invalid wallet payment address: {}", error)))?; + info!(target: PROFILING_LOG_TARGET, "get_new_block timer: {:?}", timer.elapsed()); // request new block template with shares as coinbases let (share_chain, synced_status) = match pow_algo { PowAlgorithm::RandomX => ( @@ -229,20 +232,25 @@ where S: ShareChain }; let squad = self.squad.clone(); let coinbase_extra = convert_coinbase_extra(squad, grpc_req.coinbase_extra).unwrap_or_default(); + info!(target: PROFILING_LOG_TARGET, "get_new_block timer: {:?}", timer.elapsed()); let mut new_tip_block = (*share_chain .generate_new_tip_block(&wallet_payment_address, coinbase_extra.clone()) .await .map_err(|error| Status::internal(format!("failed to get new tip block {error:?}")))?) .clone(); - let shares = share_chain - .generate_shares(&new_tip_block, !synced_status) + info!(target: PROFILING_LOG_TARGET, "get_new_block timer: {:?}", timer.elapsed()); + let (shares, target_difficulty) = share_chain + .generate_shares_and_get_target_difficulty(&new_tip_block, !synced_status) .await .map_err(|error| Status::internal(format!("failed to generate shares {error:?}")))?; - let mut response = self - .client - .write() + info!(target: PROFILING_LOG_TARGET, "get_new_block timer: {:?}", timer.elapsed()); + let mut client = BaseNodeGrpcClient::connect(self.client_address.clone()) .await + .map_err(|e| Status::internal(format!("Could not connect to base node {e:?}")))?; + + info!(target: PROFILING_LOG_TARGET, "get_new_block timer: {:?}", timer.elapsed()); + let mut response = client .get_new_block_template_with_coinbases(GetNewBlockTemplateWithCoinbasesRequest { algo: Some(grpc_block_header_pow), max_weight: 0, @@ -251,6 +259,7 @@ where S: ShareChain .await? .into_inner(); + info!(target: PROFILING_LOG_TARGET, "get_new_block timer: {:?}", timer.elapsed()); // set target difficulty let miner_data = response .miner_data @@ -278,6 +287,7 @@ where S: ShareChain .ok_or_else(|| Status::internal("missing missing header"))?; let actual_diff = Difficulty::from_u64(miner_data.target_difficulty) .map_err(|e| Status::internal(format!("Invalid target difficulty: {}", e)))?; + info!(target: PROFILING_LOG_TARGET, "get_new_block timer: {:?}", timer.elapsed()); match pow_algo { PowAlgorithm::RandomX => self .randomx_block_height_difficulty_cache @@ -290,7 +300,7 @@ where S: ShareChain .await .insert(height, actual_diff), }; - let target_difficulty = share_chain.get_target_difficulty(height).await; + info!(target: PROFILING_LOG_TARGET, "get_new_block timer: {:?}", timer.elapsed()); new_tip_block .change_target_difficulty(target_difficulty) .map_err(|e| Status::internal(format!("Invalid target difficulty: {}", e)))?; @@ -313,6 +323,7 @@ where S: ShareChain let _unused = self.stats_broadcast.send_target_difficulty(pow_algo, target_difficulty); + info!(target: PROFILING_LOG_TARGET, "get_new_block timer: {:?}", timer.elapsed()); // save template match pow_algo { PowAlgorithm::Sha3x => { @@ -331,11 +342,13 @@ where S: ShareChain }, }; + info!(target: PROFILING_LOG_TARGET, "get_new_block timer: {:?}", timer.elapsed()); match pow_algo { PowAlgorithm::Sha3x => self.template_store_sha3x.write().await.insert(tari_hash, new_tip_block), PowAlgorithm::RandomX => self.template_store_rx.write().await.insert(tari_hash, new_tip_block), }; + info!(target: PROFILING_LOG_TARGET, "get_new_block timer: {:?}", timer.elapsed()); if timer.elapsed() > MAX_ACCEPTABLE_GRPC_TIMEOUT { warn!(target: LOG_TARGET, "get_new_block took {}ms", timer.elapsed().as_millis()); } @@ -477,7 +490,11 @@ where S: ShareChain info!(target: LOG_TARGET, "🔗 Submitting block {} to base node...", mined_tari_hash); let grpc_request = Request::from_parts(metadata, extensions, grpc_request_payload); - match self.client.write().await.submit_block(grpc_request).await { + let mut client = BaseNodeGrpcClient::connect(self.client_address.clone()) + .await + .map_err(|e| Status::internal(format!("Could not connect to base node {e:?}")))?; + + match client.submit_block(grpc_request).await { Ok(_resp) => { *max_difficulty = Difficulty::min(); let _unused = self.stats_broadcast.send_pool_block_accepted(pow_algo); diff --git a/src/server/server.rs b/src/server/server.rs index ae327b51..acefec92 100644 --- a/src/server/server.rs +++ b/src/server/server.rs @@ -87,7 +87,6 @@ where S: ShareChain p2p_service.client(), share_chain_sha3x.clone(), share_chain_random_x.clone(), - shutdown_signal.clone(), randomx_factory, consensus_manager, genesis_block_hash, diff --git a/src/sharechain/in_memory.rs b/src/sharechain/in_memory.rs index b1612c26..4494f142 100644 --- a/src/sharechain/in_memory.rs +++ b/src/sharechain/in_memory.rs @@ -8,16 +8,13 @@ use async_trait::async_trait; use log::*; use minotari_app_grpc::tari_rpc::NewBlockCoinbase; use tari_common_types::{tari_address::TariAddress, types::FixedHash}; -use tari_core::{ - consensus::ConsensusManager, - proof_of_work::{ - randomx_difficulty, - sha3x_difficulty, - AccumulatedDifficulty, - Difficulty, - DifficultyAdjustment, - PowAlgorithm, - }, +use tari_core::proof_of_work::{ + randomx_difficulty, + sha3x_difficulty, + AccumulatedDifficulty, + Difficulty, + DifficultyAdjustment, + PowAlgorithm, }; use tari_utilities::{epoch_time::EpochTime, hex::Hex}; use tokio::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard}; @@ -55,7 +52,6 @@ pub(crate) struct InMemoryShareChain { p2_chain: Arc>>, pow_algo: PowAlgorithm, block_validation_params: Option>, - consensus_manager: ConsensusManager, coinbase_extras: Arc>>>, stat_client: StatsBroadcastClient, config: Config, @@ -67,7 +63,6 @@ impl InMemoryShareChain { config: Config, pow_algo: PowAlgorithm, block_validation_params: Option>, - consensus_manager: ConsensusManager, coinbase_extras: Arc>>>, stat_client: StatsBroadcastClient, ) -> Result { @@ -135,7 +130,6 @@ impl InMemoryShareChain { p2_chain: Arc::new(RwLock::new(p2chain)), pow_algo, block_validation_params, - consensus_manager, coinbase_extras, stat_client, config, @@ -570,11 +564,11 @@ impl ShareChain for InMemoryShareChain { // res // } - async fn generate_shares( + async fn generate_shares_and_get_target_difficulty( &self, new_tip_block: &P2Block, solo_mine: bool, - ) -> Result, ShareChainError> { + ) -> Result<(Vec, Difficulty), ShareChainError> { let mut chain_read_lock = self.p2_chain.read().await; // first check if there is a cached hashmap of shares let mut miners_to_shares = if solo_mine { @@ -635,7 +629,15 @@ impl ShareChain for InMemoryShareChain { }); } - Ok(res) + let min = match self.pow_algo { + PowAlgorithm::RandomX => Difficulty::from_u64(MIN_RANDOMX_DIFFICULTY).unwrap(), + PowAlgorithm::Sha3x => Difficulty::from_u64(MIN_SHA3X_DIFFICULTY).unwrap(), + }; + + let difficulty = chain_read_lock.lwma.get_difficulty().unwrap_or(Difficulty::min()); + let difficulty = cmp::max(min, difficulty); + + Ok((res, difficulty)) } async fn generate_new_tip_block( @@ -786,20 +788,20 @@ impl ShareChain for InMemoryShareChain { Ok((blocks, tip_level, chain_pow)) } - async fn get_target_difficulty(&self, height: u64) -> Difficulty { - let min = match self.pow_algo { - PowAlgorithm::RandomX => Difficulty::from_u64(MIN_RANDOMX_DIFFICULTY).unwrap(), - PowAlgorithm::Sha3x => Difficulty::from_u64(MIN_SHA3X_DIFFICULTY).unwrap(), - }; - let max = self - .consensus_manager - .consensus_constants(height) - .max_pow_difficulty(self.pow_algo); - let chain_read_lock = self.p2_chain.read().await; - - let difficulty = chain_read_lock.lwma.get_difficulty().unwrap_or(Difficulty::min()); - cmp::max(min, cmp::min(max, difficulty)) - } + // async fn get_target_difficulty(&self, height: u64) -> Difficulty { + // let min = match self.pow_algo { + // PowAlgorithm::RandomX => Difficulty::from_u64(MIN_RANDOMX_DIFFICULTY).unwrap(), + // PowAlgorithm::Sha3x => Difficulty::from_u64(MIN_SHA3X_DIFFICULTY).unwrap(), + // }; + // let max = self + // .consensus_manager + // .consensus_constants(height) + // .max_pow_difficulty(self.pow_algo); + // let chain_read_lock = self.p2_chain.read().await; + + // let difficulty = chain_read_lock.lwma.get_difficulty().unwrap_or(Difficulty::min()); + // cmp::max(min, cmp::min(max, difficulty)) + // } async fn get_total_chain_pow(&self) -> AccumulatedDifficulty { let chain_read_lock = self.p2_chain.read().await; @@ -880,7 +882,6 @@ pub mod test { use super::*; pub fn new_chain() -> InMemoryShareChain { - let consensus_manager = ConsensusManager::builder(Network::LocalNet).build().unwrap(); let coinbase_extras = Arc::new(RwLock::new(HashMap::>::new())); let (stats_tx, _) = tokio::sync::broadcast::channel(1000); let stat_client = StatsBroadcastClient::new(stats_tx); @@ -900,7 +901,6 @@ pub mod test { p2_chain: Arc::new(RwLock::new(p2chain)), pow_algo, block_validation_params: None, - consensus_manager, coinbase_extras, stat_client, config, diff --git a/src/sharechain/mod.rs b/src/sharechain/mod.rs index 19882ce6..dae0822b 100644 --- a/src/sharechain/mod.rs +++ b/src/sharechain/mod.rs @@ -100,11 +100,11 @@ pub(crate) trait ShareChain: Send + Sync + 'static { async fn get_tip(&self) -> Result, ShareChainError>; /// Generate shares based on the previous blocks. - async fn generate_shares( + async fn generate_shares_and_get_target_difficulty( &self, new_tip_block: &P2Block, solo_mine: bool, - ) -> Result, ShareChainError>; + ) -> Result<(Vec, Difficulty), ShareChainError>; /// Generate a new block on tip of the chain. async fn generate_new_tip_block( @@ -126,8 +126,6 @@ pub(crate) trait ShareChain: Send + Sync + 'static { last_block_received: Option<(u64, FixedHash)>, ) -> Result<(Vec>, Option<(u64, FixedHash)>, AccumulatedDifficulty), ShareChainError>; - async fn get_target_difficulty(&self, height: u64) -> Difficulty; - async fn all_blocks( &self, start_height: Option,