diff --git a/applications/tari_validator_node/src/consensus/mod.rs b/applications/tari_validator_node/src/consensus/mod.rs index 83d446b42..b6175a896 100644 --- a/applications/tari_validator_node/src/consensus/mod.rs +++ b/applications/tari_validator_node/src/consensus/mod.rs @@ -41,7 +41,7 @@ pub async fn spawn( store: SqliteStateStore, keypair: RistrettoKeypair, epoch_manager: EpochManagerHandle, - rx_new_transactions: mpsc::Receiver, + rx_new_transactions: mpsc::Receiver<(TransactionId, usize)>, inbound_messaging: ConsensusInboundMessaging, outbound_messaging: ConsensusOutboundMessaging, client_factory: TariValidatorNodeRpcClientFactory, diff --git a/applications/tari_validator_node/src/p2p/services/mempool/initializer.rs b/applications/tari_validator_node/src/p2p/services/mempool/initializer.rs index 2e2048ea4..c3ecdc912 100644 --- a/applications/tari_validator_node/src/p2p/services/mempool/initializer.rs +++ b/applications/tari_validator_node/src/p2p/services/mempool/initializer.rs @@ -39,7 +39,7 @@ use crate::{ pub fn spawn( gossip: Gossip, - tx_executed_transactions: mpsc::Sender, + tx_executed_transactions: mpsc::Sender<(TransactionId, usize)>, epoch_manager: EpochManagerHandle, transaction_executor: TExecutor, substate_resolver: TSubstateResolver, diff --git a/applications/tari_validator_node/src/p2p/services/mempool/service.rs b/applications/tari_validator_node/src/p2p/services/mempool/service.rs index d73b06623..38c8dd327 100644 --- a/applications/tari_validator_node/src/p2p/services/mempool/service.rs +++ b/applications/tari_validator_node/src/p2p/services/mempool/service.rs @@ -66,7 +66,7 @@ pub struct MempoolService, pending_executions: FuturesUnordered>, mempool_requests: mpsc::Receiver, - tx_executed_transactions: mpsc::Sender, + tx_executed_transactions: mpsc::Sender<(TransactionId, usize)>, epoch_manager: EpochManagerHandle, before_execute_validator: TValidator, after_execute_validator: TExecutedValidator, @@ -90,7 +90,7 @@ where pub(super) fn new( mempool_requests: mpsc::Receiver, gossip: Gossip, - tx_executed_transactions: mpsc::Sender, + tx_executed_transactions: mpsc::Sender<(TransactionId, usize)>, epoch_manager: EpochManagerHandle, transaction_executor: TExecutor, substate_resolver: TSubstateResolver, @@ -624,7 +624,13 @@ where } // Notify consensus that a transaction is ready to go! - if is_consensus_running && self.tx_executed_transactions.send(*executed.id()).await.is_err() { + let pending_exec_size = self.pending_executions.len(); + if is_consensus_running && + self.tx_executed_transactions + .send((*executed.id(), pending_exec_size)) + .await + .is_err() + { debug!( target: LOG_TARGET, "Executed transaction channel closed before executed transaction could be sent" diff --git a/applications/tari_validator_node/src/p2p/services/mempool/validators/after/outputs_dont_exist_locally.rs b/applications/tari_validator_node/src/p2p/services/mempool/validators/after/outputs_dont_exist_locally.rs index caa536e6a..60b1f5407 100644 --- a/applications/tari_validator_node/src/p2p/services/mempool/validators/after/outputs_dont_exist_locally.rs +++ b/applications/tari_validator_node/src/p2p/services/mempool/validators/after/outputs_dont_exist_locally.rs @@ -31,7 +31,7 @@ where TStateStore: StateStore + Send + Sync async fn validate(&self, executed: &ExecutedTransaction) -> Result<(), Self::Error> { if executed.resulting_outputs().is_empty() { - info!(target: LOG_TARGET, "OutputsDontExistLocally - OK"); + debug!(target: LOG_TARGET, "OutputsDontExistLocally - OK"); return Ok(()); } @@ -39,13 +39,13 @@ where TStateStore: StateStore + Send + Sync .store .with_read_tx(|tx| SubstateRecord::any_exist(tx, executed.resulting_outputs()))? { - info!(target: LOG_TARGET, "OutputsDontExistLocally - FAIL"); + warn!(target: LOG_TARGET, "OutputsDontExistLocally - FAIL"); return Err(MempoolError::OutputSubstateExists { transaction_id: *executed.id(), }); } - info!(target: LOG_TARGET, "OutputsDontExistLocally - OK"); + debug!(target: LOG_TARGET, "OutputsDontExistLocally - OK"); Ok(()) } } diff --git a/dan_layer/consensus/src/hotstuff/on_inbound_message.rs b/dan_layer/consensus/src/hotstuff/on_inbound_message.rs index 1a8e11245..b40d58549 100644 --- a/dan_layer/consensus/src/hotstuff/on_inbound_message.rs +++ b/dan_layer/consensus/src/hotstuff/on_inbound_message.rs @@ -19,7 +19,7 @@ use tokio::{sync::mpsc, time}; use crate::{ block_validations::{check_hash_and_height, check_proposed_by_leader, check_quorum_certificate, check_signature}, - hotstuff::{error::HotStuffError, pacemaker_handle::PaceMakerHandle}, + hotstuff::error::HotStuffError, messages::{HotstuffMessage, ProposalMessage, RequestMissingTransactionsMessage}, traits::{ConsensusSpec, OutboundMessaging}, }; @@ -32,7 +32,6 @@ pub struct OnInboundMessage { store: TConsensusSpec::StateStore, epoch_manager: TConsensusSpec::EpochManager, leader_strategy: TConsensusSpec::LeaderStrategy, - pacemaker: PaceMakerHandle, vote_signing_service: TConsensusSpec::SignatureService, outbound_messaging: TConsensusSpec::OutboundMessaging, tx_msg_ready: mpsc::UnboundedSender<(TConsensusSpec::Addr, HotstuffMessage)>, @@ -46,7 +45,6 @@ where TConsensusSpec: ConsensusSpec store: TConsensusSpec::StateStore, epoch_manager: TConsensusSpec::EpochManager, leader_strategy: TConsensusSpec::LeaderStrategy, - pacemaker: PaceMakerHandle, vote_signing_service: TConsensusSpec::SignatureService, outbound_messaging: TConsensusSpec::OutboundMessaging, ) -> Self { @@ -55,7 +53,6 @@ where TConsensusSpec: ConsensusSpec store, epoch_manager, leader_strategy, - pacemaker, vote_signing_service, outbound_messaging, tx_msg_ready, @@ -176,7 +173,6 @@ where TConsensusSpec: ConsensusSpec HotstuffMessage::Proposal(ProposalMessage { block: unparked_block }), )?; } - self.pacemaker.beat(); Ok(()) } diff --git a/dan_layer/consensus/src/hotstuff/on_receive_local_proposal.rs b/dan_layer/consensus/src/hotstuff/on_receive_local_proposal.rs index c9ff78384..897c0c1dd 100644 --- a/dan_layer/consensus/src/hotstuff/on_receive_local_proposal.rs +++ b/dan_layer/consensus/src/hotstuff/on_receive_local_proposal.rs @@ -126,8 +126,6 @@ impl OnReceiveLocalProposalHandler, - block_time: Duration, current_height: CurrentHeight, current_high_qc_height: NodeHeight, } @@ -47,8 +47,6 @@ impl PaceMaker { on_leader_timeout, current_height.clone(), ), - // TODO: make network constant. We're starting slow with 10s but should be 1s in the future - block_time: Duration::from_secs(10), current_height, current_high_qc_height: NodeHeight(0), } @@ -101,7 +99,7 @@ impl PaceMaker { info!(target: LOG_TARGET, "Reset! Current height: {}, Delta: {:.2?}", self.current_height, delta); leader_timeout.as_mut().reset(tokio::time::Instant::now() + delta); // set a timer for when we must send a block... - block_timer.as_mut().reset(tokio::time::Instant::now() + self.block_time); + block_timer.as_mut().reset(tokio::time::Instant::now() + BLOCK_TIME); }, PacemakerRequest::Start { high_qc_height } => { info!(target: LOG_TARGET, "🚀 Starting pacemaker at leaf height {} and high QC: {}", self.current_height, high_qc_height); @@ -112,7 +110,7 @@ impl PaceMaker { let delta = self.delta_time(); info!(target: LOG_TARGET, "Reset! Current height: {}, Delta: {:.2?}", self.current_height, delta); leader_timeout.as_mut().reset(tokio::time::Instant::now() + delta); - block_timer.as_mut().reset(tokio::time::Instant::now() + self.block_time); + block_timer.as_mut().reset(tokio::time::Instant::now() + BLOCK_TIME); on_beat.beat(); started = true; } @@ -130,11 +128,11 @@ impl PaceMaker { } }, () = &mut block_timer => { - block_timer.as_mut().reset(tokio::time::Instant::now() + self.block_time); + block_timer.as_mut().reset(tokio::time::Instant::now() + BLOCK_TIME); on_force_beat.beat(None); } () = &mut leader_timeout => { - block_timer.as_mut().reset(tokio::time::Instant::now() + self.block_time); + block_timer.as_mut().reset(tokio::time::Instant::now() + BLOCK_TIME); let delta = self.delta_time(); leader_timeout.as_mut().reset(tokio::time::Instant::now() + delta); @@ -156,7 +154,7 @@ impl PaceMaker { let current_height = self.current_height.get(); if current_height.is_zero() { // Allow extra time for the first block - return self.block_time * 2; + return BLOCK_TIME * 2; } let exp = u32::try_from(cmp::min( u64::from(u32::MAX), @@ -169,7 +167,7 @@ impl PaceMaker { ); // TODO: get real avg latency let avg_latency = Duration::from_secs(2); - self.block_time + delta + avg_latency + BLOCK_TIME + delta + avg_latency } } diff --git a/dan_layer/consensus/src/hotstuff/worker.rs b/dan_layer/consensus/src/hotstuff/worker.rs index 756fde82d..29329a956 100644 --- a/dan_layer/consensus/src/hotstuff/worker.rs +++ b/dan_layer/consensus/src/hotstuff/worker.rs @@ -49,7 +49,7 @@ pub struct HotstuffWorker { tx_events: broadcast::Sender, outbound_messaging: TConsensusSpec::OutboundMessaging, inbound_messaging: TConsensusSpec::InboundMessaging, - rx_new_transactions: mpsc::Receiver, + rx_new_transactions: mpsc::Receiver<(TransactionId, usize)>, on_inbound_message: OnInboundMessage, on_next_sync_view: OnNextSyncViewHandler, @@ -77,7 +77,7 @@ impl HotstuffWorker { validator_addr: TConsensusSpec::Addr, inbound_messaging: TConsensusSpec::InboundMessaging, outbound_messaging: TConsensusSpec::OutboundMessaging, - rx_new_transactions: mpsc::Receiver, + rx_new_transactions: mpsc::Receiver<(TransactionId, usize)>, state_store: TConsensusSpec::StateStore, epoch_manager: TConsensusSpec::EpochManager, leader_strategy: TConsensusSpec::LeaderStrategy, @@ -109,7 +109,6 @@ impl HotstuffWorker { state_store.clone(), epoch_manager.clone(), leader_strategy.clone(), - pacemaker.clone_handle(), signing_service.clone(), outbound_messaging.clone(), ), @@ -234,10 +233,14 @@ where TConsensusSpec: ConsensusSpec } }, - Some(tx_id) = self.rx_new_transactions.recv() => { + Some((tx_id, pending)) = self.rx_new_transactions.recv() => { if let Err(err) = self.on_inbound_message.update_parked_blocks(current_height, &tx_id).await { error!(target: LOG_TARGET, "Error checking parked blocks: {}", err); } + // Only propose now if there are no pending transactions + if pending == 0 { + self.pacemaker.beat(); + } }, Ok(event) = epoch_manager_events.recv() => { diff --git a/dan_layer/consensus_tests/src/support/network.rs b/dan_layer/consensus_tests/src/support/network.rs index 206146543..00a0dc6e9 100644 --- a/dan_layer/consensus_tests/src/support/network.rs +++ b/dan_layer/consensus_tests/src/support/network.rs @@ -172,7 +172,15 @@ impl TestNetworkDestination { pub struct TestNetworkWorker { rx_new_transaction: Option>, - tx_new_transactions: HashMap, SqliteStateStore)>, + #[allow(clippy::type_complexity)] + tx_new_transactions: HashMap< + TestAddress, + ( + Shard, + mpsc::Sender<(TransactionId, usize)>, + SqliteStateStore, + ), + >, tx_hs_message: HashMap>, #[allow(clippy::type_complexity)] rx_broadcast: Option, HotstuffMessage)>>>, @@ -225,7 +233,7 @@ impl TestNetworkWorker { }) .unwrap(); log::info!("🐞 New transaction {}", executed.id()); - tx_new_transaction_to_consensus.send(*executed.id()).await.unwrap(); + tx_new_transaction_to_consensus.send((*executed.id(), 0)).await.unwrap(); } } } @@ -352,6 +360,6 @@ impl TestNetworkWorker { }) .unwrap(); - sender.send(*existing_executed_tx.id()).await.unwrap(); + sender.send((*existing_executed_tx.id(), 0)).await.unwrap(); } } diff --git a/dan_layer/consensus_tests/src/support/validator/instance.rs b/dan_layer/consensus_tests/src/support/validator/instance.rs index 443b5ca39..4040c73fc 100644 --- a/dan_layer/consensus_tests/src/support/validator/instance.rs +++ b/dan_layer/consensus_tests/src/support/validator/instance.rs @@ -24,7 +24,7 @@ pub struct ValidatorChannels { pub bucket: Shard, pub state_store: SqliteStateStore, - pub tx_new_transactions: mpsc::Sender, + pub tx_new_transactions: mpsc::Sender<(TransactionId, usize)>, pub tx_hs_message: mpsc::Sender<(TestAddress, HotstuffMessage)>, pub rx_broadcast: mpsc::Receiver<(Vec, HotstuffMessage)>, pub rx_leader: mpsc::Receiver<(TestAddress, HotstuffMessage)>, diff --git a/networking/core/src/worker.rs b/networking/core/src/worker.rs index 8981ae1c0..762edf09c 100644 --- a/networking/core/src/worker.rs +++ b/networking/core/src/worker.rs @@ -652,7 +652,7 @@ where match event { mdns::Event::Discovered(peers_and_addrs) => { for (peer, addr) in peers_and_addrs { - info!(target: LOG_TARGET, "📡 mDNS discovered peer {} at {}", peer, addr); + debug!(target: LOG_TARGET, "📡 mDNS discovered peer {} at {}", peer, addr); self.swarm .dial(DialOpts::peer_id(peer).addresses(vec![addr]).build()) .or_else(|err| {