Skip to content

Commit

Permalink
perf: add profiling to get_new_block (#242)
Browse files Browse the repository at this point in the history
Add profiling logs to debug GRPC get_new_block performance. It can be
commented out in log4rs config if needed.

Also removes a read/write lock on the base node client and removes an
extra read lock when getting the target difficulty
  • Loading branch information
stringhandler authored Jan 22, 2025
1 parent 33c4e46 commit 5ddc6d0
Show file tree
Hide file tree
Showing 8 changed files with 76 additions and 55 deletions.
7 changes: 7 additions & 0 deletions log4rs_sample.yml
Original file line number Diff line number Diff line change
Expand Up @@ -199,3 +199,10 @@ loggers:
# appenders:
# - message_logging
# additive: false

# For Profiling
# tari::profiling:
# level: debug
# appenders:
# - message_logging
# additive: false
2 changes: 0 additions & 2 deletions src/cli/commands/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,6 @@ genesis_block_hash.to_hex());
config.clone(),
PowAlgorithm::Sha3x,
None,
consensus_manager.clone(),
coinbase_extras_sha3x.clone(),
stats_broadcast_client.clone(),
)?;
Expand All @@ -150,7 +149,6 @@ genesis_block_hash.to_hex());
config.clone(),
PowAlgorithm::RandomX,
Some(block_validation_params.clone()),
consensus_manager,
coinbase_extras_random_x.clone(),
stats_broadcast_client.clone(),
)?;
Expand Down
2 changes: 2 additions & 0 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ mod cli;
mod server;
mod sharechain;

pub const PROFILING_LOG_TARGET: &str = "tari::profiling";

fn format_system_time(time: SystemTime) -> String {
let datetime = time.duration_since(UNIX_EPOCH).unwrap();
let seconds = datetime.as_secs();
Expand Down
2 changes: 1 addition & 1 deletion src/server/grpc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,4 @@ pub mod error;
pub mod p2pool;
pub mod util;

pub(crate) const MAX_ACCEPTABLE_GRPC_TIMEOUT: std::time::Duration = std::time::Duration::from_millis(500);
pub(crate) const MAX_ACCEPTABLE_GRPC_TIMEOUT: std::time::Duration = std::time::Duration::from_millis(1000);
47 changes: 32 additions & 15 deletions src/server/grpc/p2pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ use std::{
use libp2p::PeerId;
use log::{debug, error, info, warn};
use minotari_app_grpc::tari_rpc::{
base_node_client::BaseNodeClient,
pow_algo::PowAlgos,
sha_p2_pool_server::ShaP2Pool,
GetNewBlockRequest,
Expand All @@ -23,6 +22,7 @@ use minotari_app_grpc::tari_rpc::{
SubmitBlockRequest,
SubmitBlockResponse,
};
use minotari_node_grpc_client::BaseNodeGrpcClient;
use tari_common_types::{tari_address::TariAddress, types::FixedHash};
use tari_core::{
blocks::Block,
Expand All @@ -36,18 +36,18 @@ use tari_core::{
PowData,
},
};
use tari_shutdown::ShutdownSignal;
use tari_utilities::hex::Hex;
use tokio::{sync::RwLock, time::timeout};
use tonic::{Request, Response, Status};

use crate::{
server::{
grpc::{error::Error, util, util::convert_coinbase_extra, MAX_ACCEPTABLE_GRPC_TIMEOUT},
grpc::{error::Error, util::convert_coinbase_extra, MAX_ACCEPTABLE_GRPC_TIMEOUT},
http::stats_collector::StatsBroadcastClient,
p2p::{client::ServiceClient, messages::NotifyNewTipBlock},
},
sharechain::{p2block::P2Block, BlockValidationParams, ShareChain},
PROFILING_LOG_TARGET,
};

pub const MAX_STORED_TEMPLATES_RX: usize = 100;
Expand All @@ -60,7 +60,8 @@ where S: ShareChain
{
local_peer_id: PeerId,
/// Base node client
client: Arc<RwLock<BaseNodeClient<tonic::transport::Channel>>>,
// client: Arc<RwLock<BaseNodeClient<tonic::transport::Channel>>>,
client_address: String,
/// P2P service client
p2p_client: ServiceClient,
/// SHA-3 share chain
Expand Down Expand Up @@ -93,7 +94,6 @@ where S: ShareChain
p2p_client: ServiceClient,
share_chain_sha3x: Arc<S>,
share_chain_random_x: Arc<S>,
shutdown_signal: ShutdownSignal,
random_x_factory: RandomXFactory,
consensus_manager: ConsensusManager,
genesis_block_hash: FixedHash,
Expand All @@ -104,9 +104,10 @@ where S: ShareChain
) -> Result<Self, Error> {
Ok(Self {
local_peer_id,
client: Arc::new(RwLock::new(
util::connect_base_node(base_node_address, shutdown_signal).await?,
)),
// client: Arc::new(RwLock::new(
// util::connect_base_node(base_node_address, shutdown_signal).await?,
// )),
client_address: base_node_address,
p2p_client,
share_chain_sha3x,
share_chain_random_x,
Expand Down Expand Up @@ -211,11 +212,13 @@ where S: ShareChain
PowAlgos::Sha3x => PowAlgorithm::Sha3x,
};

info!(target: PROFILING_LOG_TARGET, "get_new_block timer: {:?}", timer.elapsed());
// update coinbase extras cache
// investigate lock usage
let wallet_payment_address = TariAddress::from_str(grpc_req.wallet_payment_address.as_str())
.map_err(|error| Status::failed_precondition(format!("Invalid wallet payment address: {}", error)))?;

info!(target: PROFILING_LOG_TARGET, "get_new_block timer: {:?}", timer.elapsed());
// request new block template with shares as coinbases
let (share_chain, synced_status) = match pow_algo {
PowAlgorithm::RandomX => (
Expand All @@ -229,20 +232,25 @@ where S: ShareChain
};
let squad = self.squad.clone();
let coinbase_extra = convert_coinbase_extra(squad, grpc_req.coinbase_extra).unwrap_or_default();
info!(target: PROFILING_LOG_TARGET, "get_new_block timer: {:?}", timer.elapsed());
let mut new_tip_block = (*share_chain
.generate_new_tip_block(&wallet_payment_address, coinbase_extra.clone())
.await
.map_err(|error| Status::internal(format!("failed to get new tip block {error:?}")))?)
.clone();
let shares = share_chain
.generate_shares(&new_tip_block, !synced_status)
info!(target: PROFILING_LOG_TARGET, "get_new_block timer: {:?}", timer.elapsed());
let (shares, target_difficulty) = share_chain
.generate_shares_and_get_target_difficulty(&new_tip_block, !synced_status)
.await
.map_err(|error| Status::internal(format!("failed to generate shares {error:?}")))?;

let mut response = self
.client
.write()
info!(target: PROFILING_LOG_TARGET, "get_new_block timer: {:?}", timer.elapsed());
let mut client = BaseNodeGrpcClient::connect(self.client_address.clone())
.await
.map_err(|e| Status::internal(format!("Could not connect to base node {e:?}")))?;

info!(target: PROFILING_LOG_TARGET, "get_new_block timer: {:?}", timer.elapsed());
let mut response = client
.get_new_block_template_with_coinbases(GetNewBlockTemplateWithCoinbasesRequest {
algo: Some(grpc_block_header_pow),
max_weight: 0,
Expand All @@ -251,6 +259,7 @@ where S: ShareChain
.await?
.into_inner();

info!(target: PROFILING_LOG_TARGET, "get_new_block timer: {:?}", timer.elapsed());
// set target difficulty
let miner_data = response
.miner_data
Expand Down Expand Up @@ -278,6 +287,7 @@ where S: ShareChain
.ok_or_else(|| Status::internal("missing missing header"))?;
let actual_diff = Difficulty::from_u64(miner_data.target_difficulty)
.map_err(|e| Status::internal(format!("Invalid target difficulty: {}", e)))?;
info!(target: PROFILING_LOG_TARGET, "get_new_block timer: {:?}", timer.elapsed());
match pow_algo {
PowAlgorithm::RandomX => self
.randomx_block_height_difficulty_cache
Expand All @@ -290,7 +300,7 @@ where S: ShareChain
.await
.insert(height, actual_diff),
};
let target_difficulty = share_chain.get_target_difficulty(height).await;
info!(target: PROFILING_LOG_TARGET, "get_new_block timer: {:?}", timer.elapsed());
new_tip_block
.change_target_difficulty(target_difficulty)
.map_err(|e| Status::internal(format!("Invalid target difficulty: {}", e)))?;
Expand All @@ -313,6 +323,7 @@ where S: ShareChain

let _unused = self.stats_broadcast.send_target_difficulty(pow_algo, target_difficulty);

info!(target: PROFILING_LOG_TARGET, "get_new_block timer: {:?}", timer.elapsed());
// save template
match pow_algo {
PowAlgorithm::Sha3x => {
Expand All @@ -331,11 +342,13 @@ where S: ShareChain
},
};

info!(target: PROFILING_LOG_TARGET, "get_new_block timer: {:?}", timer.elapsed());
match pow_algo {
PowAlgorithm::Sha3x => self.template_store_sha3x.write().await.insert(tari_hash, new_tip_block),
PowAlgorithm::RandomX => self.template_store_rx.write().await.insert(tari_hash, new_tip_block),
};

info!(target: PROFILING_LOG_TARGET, "get_new_block timer: {:?}", timer.elapsed());
if timer.elapsed() > MAX_ACCEPTABLE_GRPC_TIMEOUT {
warn!(target: LOG_TARGET, "get_new_block took {}ms", timer.elapsed().as_millis());
}
Expand Down Expand Up @@ -477,7 +490,11 @@ where S: ShareChain
info!(target: LOG_TARGET, "🔗 Submitting block {} to base node...", mined_tari_hash);

let grpc_request = Request::from_parts(metadata, extensions, grpc_request_payload);
match self.client.write().await.submit_block(grpc_request).await {
let mut client = BaseNodeGrpcClient::connect(self.client_address.clone())
.await
.map_err(|e| Status::internal(format!("Could not connect to base node {e:?}")))?;

match client.submit_block(grpc_request).await {
Ok(_resp) => {
*max_difficulty = Difficulty::min();
let _unused = self.stats_broadcast.send_pool_block_accepted(pow_algo);
Expand Down
1 change: 0 additions & 1 deletion src/server/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,6 @@ where S: ShareChain
p2p_service.client(),
share_chain_sha3x.clone(),
share_chain_random_x.clone(),
shutdown_signal.clone(),
randomx_factory,
consensus_manager,
genesis_block_hash,
Expand Down
64 changes: 32 additions & 32 deletions src/sharechain/in_memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,13 @@ use async_trait::async_trait;
use log::*;
use minotari_app_grpc::tari_rpc::NewBlockCoinbase;
use tari_common_types::{tari_address::TariAddress, types::FixedHash};
use tari_core::{
consensus::ConsensusManager,
proof_of_work::{
randomx_difficulty,
sha3x_difficulty,
AccumulatedDifficulty,
Difficulty,
DifficultyAdjustment,
PowAlgorithm,
},
use tari_core::proof_of_work::{
randomx_difficulty,
sha3x_difficulty,
AccumulatedDifficulty,
Difficulty,
DifficultyAdjustment,
PowAlgorithm,
};
use tari_utilities::{epoch_time::EpochTime, hex::Hex};
use tokio::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard};
Expand Down Expand Up @@ -55,7 +52,6 @@ pub(crate) struct InMemoryShareChain {
p2_chain: Arc<RwLock<P2Chain<LmdbBlockStorage>>>,
pow_algo: PowAlgorithm,
block_validation_params: Option<Arc<BlockValidationParams>>,
consensus_manager: ConsensusManager,
coinbase_extras: Arc<RwLock<HashMap<String, Vec<u8>>>>,
stat_client: StatsBroadcastClient,
config: Config,
Expand All @@ -67,7 +63,6 @@ impl InMemoryShareChain {
config: Config,
pow_algo: PowAlgorithm,
block_validation_params: Option<Arc<BlockValidationParams>>,
consensus_manager: ConsensusManager,
coinbase_extras: Arc<RwLock<HashMap<String, Vec<u8>>>>,
stat_client: StatsBroadcastClient,
) -> Result<Self, ShareChainError> {
Expand Down Expand Up @@ -135,7 +130,6 @@ impl InMemoryShareChain {
p2_chain: Arc::new(RwLock::new(p2chain)),
pow_algo,
block_validation_params,
consensus_manager,
coinbase_extras,
stat_client,
config,
Expand Down Expand Up @@ -570,11 +564,11 @@ impl ShareChain for InMemoryShareChain {
// res
// }

async fn generate_shares(
async fn generate_shares_and_get_target_difficulty(
&self,
new_tip_block: &P2Block,
solo_mine: bool,
) -> Result<Vec<NewBlockCoinbase>, ShareChainError> {
) -> Result<(Vec<NewBlockCoinbase>, Difficulty), ShareChainError> {
let mut chain_read_lock = self.p2_chain.read().await;
// first check if there is a cached hashmap of shares
let mut miners_to_shares = if solo_mine {
Expand Down Expand Up @@ -635,7 +629,15 @@ impl ShareChain for InMemoryShareChain {
});
}

Ok(res)
let min = match self.pow_algo {
PowAlgorithm::RandomX => Difficulty::from_u64(MIN_RANDOMX_DIFFICULTY).unwrap(),
PowAlgorithm::Sha3x => Difficulty::from_u64(MIN_SHA3X_DIFFICULTY).unwrap(),
};

let difficulty = chain_read_lock.lwma.get_difficulty().unwrap_or(Difficulty::min());
let difficulty = cmp::max(min, difficulty);

Ok((res, difficulty))
}

async fn generate_new_tip_block(
Expand Down Expand Up @@ -786,20 +788,20 @@ impl ShareChain for InMemoryShareChain {
Ok((blocks, tip_level, chain_pow))
}

async fn get_target_difficulty(&self, height: u64) -> Difficulty {
let min = match self.pow_algo {
PowAlgorithm::RandomX => Difficulty::from_u64(MIN_RANDOMX_DIFFICULTY).unwrap(),
PowAlgorithm::Sha3x => Difficulty::from_u64(MIN_SHA3X_DIFFICULTY).unwrap(),
};
let max = self
.consensus_manager
.consensus_constants(height)
.max_pow_difficulty(self.pow_algo);
let chain_read_lock = self.p2_chain.read().await;

let difficulty = chain_read_lock.lwma.get_difficulty().unwrap_or(Difficulty::min());
cmp::max(min, cmp::min(max, difficulty))
}
// async fn get_target_difficulty(&self, height: u64) -> Difficulty {
// let min = match self.pow_algo {
// PowAlgorithm::RandomX => Difficulty::from_u64(MIN_RANDOMX_DIFFICULTY).unwrap(),
// PowAlgorithm::Sha3x => Difficulty::from_u64(MIN_SHA3X_DIFFICULTY).unwrap(),
// };
// let max = self
// .consensus_manager
// .consensus_constants(height)
// .max_pow_difficulty(self.pow_algo);
// let chain_read_lock = self.p2_chain.read().await;

// let difficulty = chain_read_lock.lwma.get_difficulty().unwrap_or(Difficulty::min());
// cmp::max(min, cmp::min(max, difficulty))
// }

async fn get_total_chain_pow(&self) -> AccumulatedDifficulty {
let chain_read_lock = self.p2_chain.read().await;
Expand Down Expand Up @@ -880,7 +882,6 @@ pub mod test {
use super::*;

pub fn new_chain() -> InMemoryShareChain {
let consensus_manager = ConsensusManager::builder(Network::LocalNet).build().unwrap();
let coinbase_extras = Arc::new(RwLock::new(HashMap::<String, Vec<u8>>::new()));
let (stats_tx, _) = tokio::sync::broadcast::channel(1000);
let stat_client = StatsBroadcastClient::new(stats_tx);
Expand All @@ -900,7 +901,6 @@ pub mod test {
p2_chain: Arc::new(RwLock::new(p2chain)),
pow_algo,
block_validation_params: None,
consensus_manager,
coinbase_extras,
stat_client,
config,
Expand Down
6 changes: 2 additions & 4 deletions src/sharechain/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,11 +100,11 @@ pub(crate) trait ShareChain: Send + Sync + 'static {
async fn get_tip(&self) -> Result<Option<(u64, FixedHash)>, ShareChainError>;

/// Generate shares based on the previous blocks.
async fn generate_shares(
async fn generate_shares_and_get_target_difficulty(
&self,
new_tip_block: &P2Block,
solo_mine: bool,
) -> Result<Vec<NewBlockCoinbase>, ShareChainError>;
) -> Result<(Vec<NewBlockCoinbase>, Difficulty), ShareChainError>;

/// Generate a new block on tip of the chain.
async fn generate_new_tip_block(
Expand All @@ -126,8 +126,6 @@ pub(crate) trait ShareChain: Send + Sync + 'static {
last_block_received: Option<(u64, FixedHash)>,
) -> Result<(Vec<Arc<P2Block>>, Option<(u64, FixedHash)>, AccumulatedDifficulty), ShareChainError>;

async fn get_target_difficulty(&self, height: u64) -> Difficulty;

async fn all_blocks(
&self,
start_height: Option<u64>,
Expand Down

0 comments on commit 5ddc6d0

Please sign in to comment.