Skip to content

Commit

Permalink
feat: add randomx support (#32)
Browse files Browse the repository at this point in the history
Add randomx support

---------

Co-authored-by: ksrichard <[email protected]>
  • Loading branch information
stringhandler and ksrichard authored Sep 4, 2024
1 parent 0075b4f commit edebb3f
Show file tree
Hide file tree
Showing 13 changed files with 885 additions and 495 deletions.
656 changes: 341 additions & 315 deletions Cargo.lock

Large diffs are not rendered by default.

21 changes: 19 additions & 2 deletions src/cli/commands/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,17 @@ use std::sync::Arc;
use libp2p::identity::Keypair;
use tari_common::configuration::Network;
use tari_common::initialize_logging;
use tari_core::consensus::ConsensusManager;
use tari_core::proof_of_work::randomx_factory::RandomXFactory;
use tari_core::proof_of_work::PowAlgorithm;
use tari_shutdown::ShutdownSignal;

use crate::cli::args::{Cli, StartArgs};
use crate::server as main_server;
use crate::server::p2p::Tribe;
use crate::server::Server;
use crate::sharechain::in_memory::InMemoryShareChain;
use crate::sharechain::{BlockValidationParams, MAX_BLOCKS_COUNT};

pub async fn server(
cli: Arc<Cli>,
Expand Down Expand Up @@ -74,7 +78,20 @@ pub async fn server(
config_builder.with_base_node_address(args.base_node_address.clone());

let config = config_builder.build();
let share_chain = InMemoryShareChain::default();
let randomx_factory = RandomXFactory::new(1);
let consensus_manager = ConsensusManager::builder(Network::get_current_or_user_setting_or_default()).build()?;
let genesis_block_hash = *consensus_manager.get_genesis_block().hash();
let block_validation_params = Arc::new(BlockValidationParams::new(
randomx_factory,
consensus_manager,
genesis_block_hash,
));
let share_chain_sha3x = InMemoryShareChain::new(MAX_BLOCKS_COUNT, PowAlgorithm::Sha3x, None)?;
let share_chain_random_x = InMemoryShareChain::new(
MAX_BLOCKS_COUNT,
PowAlgorithm::RandomX,
Some(block_validation_params.clone()),
)?;

Ok(Server::new(config, share_chain, shutdown_signal).await?)
Ok(Server::new(config, share_chain_sha3x, share_chain_random_x, shutdown_signal).await?)
}
191 changes: 148 additions & 43 deletions src/server/grpc/p2pool.rs
Original file line number Diff line number Diff line change
@@ -1,25 +1,30 @@
// Copyright 2024 The Tari Project
// SPDX-License-Identifier: BSD-3-Clause

use std::collections::HashMap;
use std::sync::Arc;

use log::{info, warn};
use log::{error, info, warn};
use minotari_app_grpc::tari_rpc::pow_algo::PowAlgos;
use minotari_app_grpc::tari_rpc::{
base_node_client::BaseNodeClient, pow_algo::PowAlgos, sha_p2_pool_server::ShaP2Pool, GetNewBlockRequest,
GetNewBlockResponse, GetNewBlockTemplateWithCoinbasesRequest, HeightRequest, NewBlockTemplateRequest, PowAlgo,
SubmitBlockRequest, SubmitBlockResponse,
base_node_client::BaseNodeClient, sha_p2_pool_server::ShaP2Pool, GetNewBlockRequest, GetNewBlockResponse,
GetNewBlockTemplateWithCoinbasesRequest, NewBlockTemplateRequest, SubmitBlockRequest, SubmitBlockResponse,
};
use tari_core::proof_of_work::sha3x_difficulty;
use tari_common_types::types::FixedHash;
use tari_core::consensus::ConsensusManager;
use tari_core::proof_of_work::randomx_factory::RandomXFactory;
use tari_core::proof_of_work::{randomx_difficulty, sha3x_difficulty, PowAlgorithm};
use tari_shutdown::ShutdownSignal;
use tari_utilities::hex::Hex;
use tokio::sync::Mutex;
use tonic::{Request, Response, Status};

use crate::server::http::stats::{
MINER_STAT_ACCEPTED_BLOCKS_COUNT, MINER_STAT_REJECTED_BLOCKS_COUNT, P2POOL_STAT_ACCEPTED_BLOCKS_COUNT,
P2POOL_STAT_REJECTED_BLOCKS_COUNT,
algo_stat_key, MINER_STAT_ACCEPTED_BLOCKS_COUNT, MINER_STAT_REJECTED_BLOCKS_COUNT,
P2POOL_STAT_ACCEPTED_BLOCKS_COUNT, P2POOL_STAT_REJECTED_BLOCKS_COUNT,
};
use crate::server::stats_store::StatsStore;
use crate::sharechain::BlockValidationParams;
use crate::{
server::{
grpc::{error::Error, util},
Expand All @@ -39,10 +44,16 @@ where
client: Arc<Mutex<BaseNodeClient<tonic::transport::Channel>>>,
/// P2P service client
p2p_client: p2p::ServiceClient,
/// Current share chain
share_chain: Arc<S>,
/// SHA-3 share chain
share_chain_sha3x: Arc<S>,
/// RandomX share chain
share_chain_random_x: Arc<S>,
/// Stats store
stats_store: Arc<StatsStore>,
/// Block validation params to be used when checking block difficulty.
block_validation_params: BlockValidationParams,
block_height_difficulty_cache: Arc<Mutex<HashMap<u64, u64>>>,
stats_max_difficulty_since_last_success: Arc<Mutex<u64>>,
}

impl<S> ShaP2PoolGrpc<S>
Expand All @@ -52,26 +63,43 @@ where
pub async fn new(
base_node_address: String,
p2p_client: p2p::ServiceClient,
share_chain: Arc<S>,
share_chain_sha3x: Arc<S>,
share_chain_random_x: Arc<S>,
stats_store: Arc<StatsStore>,
shutdown_signal: ShutdownSignal,
random_x_factory: RandomXFactory,
consensus_manager: ConsensusManager,
genesis_block_hash: FixedHash,
) -> Result<Self, Error> {
Ok(Self {
client: Arc::new(Mutex::new(
util::connect_base_node(base_node_address, shutdown_signal).await?,
)),
p2p_client,
share_chain,
share_chain_sha3x,
share_chain_random_x,
stats_store,
block_validation_params: BlockValidationParams::new(
random_x_factory,
consensus_manager,
genesis_block_hash,
),
block_height_difficulty_cache: Arc::new(Mutex::new(HashMap::new())),
stats_max_difficulty_since_last_success: Arc::new(Mutex::new(0)),
})
}

/// Submits a new block to share chain and broadcasts to the p2p network.
pub async fn submit_share_chain_block(&self, block: &Block) -> Result<(), Status> {
match self.share_chain.submit_block(block).await {
let pow_algo = block.original_block_header().pow.pow_algo;
let share_chain = match pow_algo {
PowAlgorithm::RandomX => self.share_chain_random_x.clone(),
PowAlgorithm::Sha3x => self.share_chain_sha3x.clone(),
};
match share_chain.submit_block(block).await {
Ok(_) => {
self.stats_store
.inc(&MINER_STAT_ACCEPTED_BLOCKS_COUNT.to_string(), 1)
.inc(&algo_stat_key(pow_algo, MINER_STAT_ACCEPTED_BLOCKS_COUNT), 1)
.await;
info!(target: LOG_TARGET, "Broadcast new block: {:?}", block.hash().to_hex());
self.p2p_client
Expand All @@ -82,7 +110,7 @@ where
Err(error) => {
warn!(target: LOG_TARGET, "Failed to add new block: {error:?}");
self.stats_store
.inc(&MINER_STAT_REJECTED_BLOCKS_COUNT.to_string(), 1)
.inc(&algo_stat_key(pow_algo, MINER_STAT_REJECTED_BLOCKS_COUNT), 1)
.await;
Ok(())
},
Expand All @@ -99,14 +127,23 @@ where
/// from the current share chain as coinbase transactions.
async fn get_new_block(
&self,
_request: Request<GetNewBlockRequest>,
request: Request<GetNewBlockRequest>,
) -> Result<Response<GetNewBlockResponse>, Status> {
let mut pow_algo = PowAlgo::default();
pow_algo.set_pow_algo(PowAlgos::Sha3x);
// extract pow algo
let grpc_block_header_pow = request
.into_inner()
.pow
.ok_or(Status::invalid_argument("missing pow in request"))?;
let grpc_pow_algo = PowAlgos::from_i32(grpc_block_header_pow.pow_algo)
.ok_or_else(|| Status::internal("invalid block header pow algo in request"))?;
let pow_algo = match grpc_pow_algo {
PowAlgos::Randomx => PowAlgorithm::RandomX,
PowAlgos::Sha3x => PowAlgorithm::Sha3x,
};

// request original block template to get reward
let req = NewBlockTemplateRequest {
algo: Some(pow_algo.clone()),
algo: Some(grpc_block_header_pow.clone()),
max_weight: 0,
};
let template_response = self.client.lock().await.get_new_block_template(req).await?.into_inner();
Expand All @@ -116,14 +153,18 @@ where
let reward = miner_data.reward;

// request new block template with shares as coinbases
let shares = self.share_chain.generate_shares(reward).await;
let share_chain = match pow_algo {
PowAlgorithm::RandomX => self.share_chain_random_x.clone(),
PowAlgorithm::Sha3x => self.share_chain_sha3x.clone(),
};
let shares = share_chain.generate_shares(reward).await;

let mut response = self
.client
.lock()
.await
.get_new_block_template_with_coinbases(GetNewBlockTemplateWithCoinbasesRequest {
algo: Some(pow_algo),
algo: Some(grpc_block_header_pow),
max_weight: 0,
coinbases: shares,
})
Expand All @@ -135,6 +176,13 @@ where
.clone()
.miner_data
.ok_or_else(|| Status::internal("missing miner data"))?;
if let Some(header) = &response.block {
let height = header.header.as_ref().map(|h| h.height).unwrap_or(0);
self.block_height_difficulty_cache
.lock()
.await
.insert(height, miner_data.target_difficulty);
}
let target_difficulty = miner_data.target_difficulty / SHARE_COUNT;
if let Some(mut miner_data) = response.miner_data {
miner_data.target_difficulty = target_difficulty;
Expand All @@ -154,13 +202,35 @@ where
&self,
request: Request<SubmitBlockRequest>,
) -> Result<Response<SubmitBlockResponse>, Status> {
// get all grpc request related data
let grpc_block = request.get_ref();
let grpc_request_payload = grpc_block
.block
.clone()
.ok_or_else(|| Status::internal("missing block in request"))?;
let mut block = self
.share_chain
let grpc_block_header = grpc_request_payload
.header
.clone()
.ok_or_else(|| Status::internal("missing block header in request"))?;
let grpc_block_header_pow = grpc_block_header
.pow
.ok_or_else(|| Status::internal("missing block header pow in request"))?;
let grpc_pow_algo = PowAlgos::from_i32(i32::try_from(grpc_block_header_pow.pow_algo).map_err(|error| {
error!("Failed to get pow algo: {error:?}");
Status::internal("general error")
})?)
.ok_or_else(|| Status::internal("invalid block header pow algo in request"))?;

// get new share chain block
let pow_algo = match grpc_pow_algo {
PowAlgos::Randomx => PowAlgorithm::RandomX,
PowAlgos::Sha3x => PowAlgorithm::Sha3x,
};
let share_chain = match pow_algo {
PowAlgorithm::RandomX => self.share_chain_random_x.clone(),
PowAlgorithm::Sha3x => self.share_chain_sha3x.clone(),
};
let mut block = share_chain
.new_block(grpc_block)
.await
.map_err(|error| Status::internal(error.to_string()))?;
Expand All @@ -169,43 +239,78 @@ where

// Check block's difficulty compared to the latest network one to increase the probability
// to get the block accepted (and also a block with lower difficulty than latest one is invalid anyway).
let request_block_difficulty =
sha3x_difficulty(origin_block_header).map_err(|error| Status::internal(error.to_string()))?;
let mut network_difficulty_stream = self
.client
let request_block_difficulty = match origin_block_header.pow.pow_algo {
PowAlgorithm::Sha3x => {
sha3x_difficulty(origin_block_header).map_err(|error| Status::internal(error.to_string()))?
},
PowAlgorithm::RandomX => randomx_difficulty(
origin_block_header,
self.block_validation_params.random_x_factory(),
self.block_validation_params.genesis_block_hash(),
self.block_validation_params.consensus_manager(),
)
.map_err(|error| Status::internal(error.to_string()))?,
};
// TODO: Cache this so that we don't ask each time. If we have a block we should not
// waste time before submitting it, or we might lose a share
// let mut network_difficulty_stream = self
// .client
// .lock()
// .await
// .get_network_difficulty(HeightRequest {
// from_tip: 0,
// start_height: origin_block_header.height - 1,
// end_height: origin_block_header.height,
// })
// .await?
// .into_inner();
// let mut network_difficulty_matches = false;
// while let Ok(Some(diff_resp)) = network_difficulty_stream.message().await {
// dbg!("Diff resp: {:?}", &diff_resp);
// if origin_block_header.height == diff_resp.height + 1
// && request_block_difficulty.as_u64() >= diff_resp.difficulty
// {
// network_difficulty_matches = true;
// }
// }
let network_difficulty_matches = match self
.block_height_difficulty_cache
.lock()
.await
.get_network_difficulty(HeightRequest {
from_tip: 0,
start_height: origin_block_header.height - 1,
end_height: origin_block_header.height,
})
.await?
.into_inner();
let mut network_difficulty_matches = false;
while let Ok(Some(diff_resp)) = network_difficulty_stream.message().await {
if origin_block_header.height == diff_resp.height + 1
&& request_block_difficulty.as_u64() >= diff_resp.difficulty
{
network_difficulty_matches = true;
}
.get(&(origin_block_header.height))
{
Some(difficulty) => request_block_difficulty.as_u64() >= *difficulty,
None => false,
};
let mut max_difficulty = self.stats_max_difficulty_since_last_success.lock().await;
if *max_difficulty < request_block_difficulty.as_u64() {
*max_difficulty = request_block_difficulty.as_u64();
}

if !network_difficulty_matches {
block.set_sent_to_main_chain(false);
self.submit_share_chain_block(&block).await?;
// Don't error if we can't submit it.
match self.submit_share_chain_block(&block).await {
Ok(_) => {
info!("🔗 Block submitted to share chain!");
},
Err(error) => {
warn!("Failed to submit block to share chain: {error:?}");
},
};
return Ok(Response::new(SubmitBlockResponse {
block_hash: block.hash().to_vec(),
}));
}

// submit block to base node
let (metadata, extensions, _inner) = request.into_parts();
info!("🔗 Submitting block to base node...");
let grpc_request = Request::from_parts(metadata, extensions, grpc_request_payload);
match self.client.lock().await.submit_block(grpc_request).await {
Ok(resp) => {
self.stats_store
.inc(&P2POOL_STAT_ACCEPTED_BLOCKS_COUNT.to_string(), 1)
.inc(&algo_stat_key(pow_algo, P2POOL_STAT_ACCEPTED_BLOCKS_COUNT), 1)
.await;
info!("💰 New matching block found and sent to network!");
block.set_sent_to_main_chain(true);
Expand All @@ -215,7 +320,7 @@ where
Err(error) => {
warn!("Failed to submit block to Tari network: {error:?}");
self.stats_store
.inc(&P2POOL_STAT_REJECTED_BLOCKS_COUNT.to_string(), 1)
.inc(&algo_stat_key(pow_algo, P2POOL_STAT_REJECTED_BLOCKS_COUNT), 1)
.await;
block.set_sent_to_main_chain(false);
self.submit_share_chain_block(&block).await?;
Expand Down
Loading

0 comments on commit edebb3f

Please sign in to comment.