Skip to content

Commit

Permalink
fix(consensus): improve block fullness when tx rate is high
Browse files Browse the repository at this point in the history
  • Loading branch information
sdbondi committed Jan 25, 2024
1 parent 7448b4f commit 55a346e
Show file tree
Hide file tree
Showing 11 changed files with 42 additions and 33 deletions.
2 changes: 1 addition & 1 deletion applications/tari_validator_node/src/consensus/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ pub async fn spawn(
store: SqliteStateStore<PeerAddress>,
keypair: RistrettoKeypair,
epoch_manager: EpochManagerHandle<PeerAddress>,
rx_new_transactions: mpsc::Receiver<TransactionId>,
rx_new_transactions: mpsc::Receiver<(TransactionId, usize)>,
inbound_messaging: ConsensusInboundMessaging<SqliteMessageLogger>,
outbound_messaging: ConsensusOutboundMessaging<SqliteMessageLogger>,
client_factory: TariValidatorNodeRpcClientFactory,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ use crate::{

pub fn spawn<TExecutor, TValidator, TExecutedValidator, TSubstateResolver>(
gossip: Gossip,
tx_executed_transactions: mpsc::Sender<TransactionId>,
tx_executed_transactions: mpsc::Sender<(TransactionId, usize)>,
epoch_manager: EpochManagerHandle<PeerAddress>,
transaction_executor: TExecutor,
substate_resolver: TSubstateResolver,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ pub struct MempoolService<TValidator, TExecutedValidator, TExecutor, TSubstateRe
transactions: HashSet<TransactionId>,
pending_executions: FuturesUnordered<BoxFuture<'static, MempoolTransactionExecution>>,
mempool_requests: mpsc::Receiver<MempoolRequest>,
tx_executed_transactions: mpsc::Sender<TransactionId>,
tx_executed_transactions: mpsc::Sender<(TransactionId, usize)>,
epoch_manager: EpochManagerHandle<PeerAddress>,
before_execute_validator: TValidator,
after_execute_validator: TExecutedValidator,
Expand All @@ -90,7 +90,7 @@ where
pub(super) fn new(
mempool_requests: mpsc::Receiver<MempoolRequest>,
gossip: Gossip,
tx_executed_transactions: mpsc::Sender<TransactionId>,
tx_executed_transactions: mpsc::Sender<(TransactionId, usize)>,
epoch_manager: EpochManagerHandle<PeerAddress>,
transaction_executor: TExecutor,
substate_resolver: TSubstateResolver,
Expand Down Expand Up @@ -624,7 +624,13 @@ where
}

// Notify consensus that a transaction is ready to go!
if is_consensus_running && self.tx_executed_transactions.send(*executed.id()).await.is_err() {
let pending_exec_size = self.pending_executions.len();
if is_consensus_running &&
self.tx_executed_transactions
.send((*executed.id(), pending_exec_size))
.await
.is_err()
{
debug!(
target: LOG_TARGET,
"Executed transaction channel closed before executed transaction could be sent"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,21 +31,21 @@ where TStateStore: StateStore + Send + Sync

async fn validate(&self, executed: &ExecutedTransaction) -> Result<(), Self::Error> {
if executed.resulting_outputs().is_empty() {
info!(target: LOG_TARGET, "OutputsDontExistLocally - OK");
debug!(target: LOG_TARGET, "OutputsDontExistLocally - OK");
return Ok(());
}

if self
.store
.with_read_tx(|tx| SubstateRecord::any_exist(tx, executed.resulting_outputs()))?
{
info!(target: LOG_TARGET, "OutputsDontExistLocally - FAIL");
warn!(target: LOG_TARGET, "OutputsDontExistLocally - FAIL");
return Err(MempoolError::OutputSubstateExists {
transaction_id: *executed.id(),
});
}

info!(target: LOG_TARGET, "OutputsDontExistLocally - OK");
debug!(target: LOG_TARGET, "OutputsDontExistLocally - OK");
Ok(())
}
}
6 changes: 1 addition & 5 deletions dan_layer/consensus/src/hotstuff/on_inbound_message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use tokio::{sync::mpsc, time};

use crate::{
block_validations::{check_hash_and_height, check_proposed_by_leader, check_quorum_certificate, check_signature},
hotstuff::{error::HotStuffError, pacemaker_handle::PaceMakerHandle},
hotstuff::error::HotStuffError,
messages::{HotstuffMessage, ProposalMessage, RequestMissingTransactionsMessage},
traits::{ConsensusSpec, OutboundMessaging},
};
Expand All @@ -32,7 +32,6 @@ pub struct OnInboundMessage<TConsensusSpec: ConsensusSpec> {
store: TConsensusSpec::StateStore,
epoch_manager: TConsensusSpec::EpochManager,
leader_strategy: TConsensusSpec::LeaderStrategy,
pacemaker: PaceMakerHandle,
vote_signing_service: TConsensusSpec::SignatureService,
outbound_messaging: TConsensusSpec::OutboundMessaging,
tx_msg_ready: mpsc::UnboundedSender<(TConsensusSpec::Addr, HotstuffMessage)>,
Expand All @@ -46,7 +45,6 @@ where TConsensusSpec: ConsensusSpec
store: TConsensusSpec::StateStore,
epoch_manager: TConsensusSpec::EpochManager,
leader_strategy: TConsensusSpec::LeaderStrategy,
pacemaker: PaceMakerHandle,
vote_signing_service: TConsensusSpec::SignatureService,
outbound_messaging: TConsensusSpec::OutboundMessaging,
) -> Self {
Expand All @@ -55,7 +53,6 @@ where TConsensusSpec: ConsensusSpec
store,
epoch_manager,
leader_strategy,
pacemaker,
vote_signing_service,
outbound_messaging,
tx_msg_ready,
Expand Down Expand Up @@ -176,7 +173,6 @@ where TConsensusSpec: ConsensusSpec
HotstuffMessage::Proposal(ProposalMessage { block: unparked_block }),
)?;
}
self.pacemaker.beat();
Ok(())
}

Expand Down
2 changes: 0 additions & 2 deletions dan_layer/consensus/src/hotstuff/on_receive_local_proposal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,8 +126,6 @@ impl<TConsensusSpec: ConsensusSpec> OnReceiveLocalProposalHandler<TConsensusSpec
.await?;

self.on_ready_to_vote_on_local_block.handle(valid_block).await?;

self.pacemaker.beat();
}

Ok(())
Expand Down
16 changes: 7 additions & 9 deletions dan_layer/consensus/src/hotstuff/pacemaker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,11 @@ use crate::hotstuff::{

const LOG_TARGET: &str = "tari::dan::consensus::hotstuff::pacemaker";
const MAX_DELTA: Duration = Duration::from_secs(300);
const BLOCK_TIME: Duration = Duration::from_secs(10);

pub struct PaceMaker {
pace_maker_handle: PaceMakerHandle,
handle_receiver: mpsc::Receiver<PacemakerRequest>,
block_time: Duration,
current_height: CurrentHeight,
current_high_qc_height: NodeHeight,
}
Expand All @@ -47,8 +47,6 @@ impl PaceMaker {
on_leader_timeout,
current_height.clone(),
),
// TODO: make network constant. We're starting slow with 10s but should be 1s in the future
block_time: Duration::from_secs(10),
current_height,
current_high_qc_height: NodeHeight(0),
}
Expand Down Expand Up @@ -101,7 +99,7 @@ impl PaceMaker {
info!(target: LOG_TARGET, "Reset! Current height: {}, Delta: {:.2?}", self.current_height, delta);
leader_timeout.as_mut().reset(tokio::time::Instant::now() + delta);
// set a timer for when we must send a block...
block_timer.as_mut().reset(tokio::time::Instant::now() + self.block_time);
block_timer.as_mut().reset(tokio::time::Instant::now() + BLOCK_TIME);
},
PacemakerRequest::Start { high_qc_height } => {
info!(target: LOG_TARGET, "🚀 Starting pacemaker at leaf height {} and high QC: {}", self.current_height, high_qc_height);
Expand All @@ -112,7 +110,7 @@ impl PaceMaker {
let delta = self.delta_time();
info!(target: LOG_TARGET, "Reset! Current height: {}, Delta: {:.2?}", self.current_height, delta);
leader_timeout.as_mut().reset(tokio::time::Instant::now() + delta);
block_timer.as_mut().reset(tokio::time::Instant::now() + self.block_time);
block_timer.as_mut().reset(tokio::time::Instant::now() + BLOCK_TIME);
on_beat.beat();
started = true;
}
Expand All @@ -130,11 +128,11 @@ impl PaceMaker {
}
},
() = &mut block_timer => {
block_timer.as_mut().reset(tokio::time::Instant::now() + self.block_time);
block_timer.as_mut().reset(tokio::time::Instant::now() + BLOCK_TIME);
on_force_beat.beat(None);
}
() = &mut leader_timeout => {
block_timer.as_mut().reset(tokio::time::Instant::now() + self.block_time);
block_timer.as_mut().reset(tokio::time::Instant::now() + BLOCK_TIME);

let delta = self.delta_time();
leader_timeout.as_mut().reset(tokio::time::Instant::now() + delta);
Expand All @@ -156,7 +154,7 @@ impl PaceMaker {
let current_height = self.current_height.get();
if current_height.is_zero() {
// Allow extra time for the first block
return self.block_time * 2;
return BLOCK_TIME * 2;
}
let exp = u32::try_from(cmp::min(
u64::from(u32::MAX),
Expand All @@ -169,7 +167,7 @@ impl PaceMaker {
);
// TODO: get real avg latency
let avg_latency = Duration::from_secs(2);
self.block_time + delta + avg_latency
BLOCK_TIME + delta + avg_latency
}
}

Expand Down
11 changes: 7 additions & 4 deletions dan_layer/consensus/src/hotstuff/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ pub struct HotstuffWorker<TConsensusSpec: ConsensusSpec> {
tx_events: broadcast::Sender<HotstuffEvent>,
outbound_messaging: TConsensusSpec::OutboundMessaging,
inbound_messaging: TConsensusSpec::InboundMessaging,
rx_new_transactions: mpsc::Receiver<TransactionId>,
rx_new_transactions: mpsc::Receiver<(TransactionId, usize)>,

on_inbound_message: OnInboundMessage<TConsensusSpec>,
on_next_sync_view: OnNextSyncViewHandler<TConsensusSpec>,
Expand Down Expand Up @@ -77,7 +77,7 @@ impl<TConsensusSpec: ConsensusSpec> HotstuffWorker<TConsensusSpec> {
validator_addr: TConsensusSpec::Addr,
inbound_messaging: TConsensusSpec::InboundMessaging,
outbound_messaging: TConsensusSpec::OutboundMessaging,
rx_new_transactions: mpsc::Receiver<TransactionId>,
rx_new_transactions: mpsc::Receiver<(TransactionId, usize)>,
state_store: TConsensusSpec::StateStore,
epoch_manager: TConsensusSpec::EpochManager,
leader_strategy: TConsensusSpec::LeaderStrategy,
Expand Down Expand Up @@ -109,7 +109,6 @@ impl<TConsensusSpec: ConsensusSpec> HotstuffWorker<TConsensusSpec> {
state_store.clone(),
epoch_manager.clone(),
leader_strategy.clone(),
pacemaker.clone_handle(),
signing_service.clone(),
outbound_messaging.clone(),
),
Expand Down Expand Up @@ -234,10 +233,14 @@ where TConsensusSpec: ConsensusSpec
}
},

Some(tx_id) = self.rx_new_transactions.recv() => {
Some((tx_id, pending)) = self.rx_new_transactions.recv() => {
if let Err(err) = self.on_inbound_message.update_parked_blocks(current_height, &tx_id).await {
error!(target: LOG_TARGET, "Error checking parked blocks: {}", err);
}
// Only propose now if there are no pending transactions
if pending == 0 {
self.pacemaker.beat();
}
},

Ok(event) = epoch_manager_events.recv() => {
Expand Down
14 changes: 11 additions & 3 deletions dan_layer/consensus_tests/src/support/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,15 @@ impl TestNetworkDestination {

pub struct TestNetworkWorker {
rx_new_transaction: Option<mpsc::Receiver<(TestNetworkDestination, ExecutedTransaction)>>,
tx_new_transactions: HashMap<TestAddress, (Shard, mpsc::Sender<TransactionId>, SqliteStateStore<TestAddress>)>,
#[allow(clippy::type_complexity)]
tx_new_transactions: HashMap<
TestAddress,
(
Shard,
mpsc::Sender<(TransactionId, usize)>,
SqliteStateStore<TestAddress>,
),
>,
tx_hs_message: HashMap<TestAddress, mpsc::Sender<(TestAddress, HotstuffMessage)>>,
#[allow(clippy::type_complexity)]
rx_broadcast: Option<HashMap<TestAddress, mpsc::Receiver<(Vec<TestAddress>, HotstuffMessage)>>>,
Expand Down Expand Up @@ -225,7 +233,7 @@ impl TestNetworkWorker {
})
.unwrap();
log::info!("🐞 New transaction {}", executed.id());
tx_new_transaction_to_consensus.send(*executed.id()).await.unwrap();
tx_new_transaction_to_consensus.send((*executed.id(), 0)).await.unwrap();
}
}
}
Expand Down Expand Up @@ -352,6 +360,6 @@ impl TestNetworkWorker {
})
.unwrap();

sender.send(*existing_executed_tx.id()).await.unwrap();
sender.send((*existing_executed_tx.id(), 0)).await.unwrap();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ pub struct ValidatorChannels {
pub bucket: Shard,
pub state_store: SqliteStateStore<TestAddress>,

pub tx_new_transactions: mpsc::Sender<TransactionId>,
pub tx_new_transactions: mpsc::Sender<(TransactionId, usize)>,
pub tx_hs_message: mpsc::Sender<(TestAddress, HotstuffMessage)>,
pub rx_broadcast: mpsc::Receiver<(Vec<TestAddress>, HotstuffMessage)>,
pub rx_leader: mpsc::Receiver<(TestAddress, HotstuffMessage)>,
Expand Down
2 changes: 1 addition & 1 deletion networking/core/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -652,7 +652,7 @@ where
match event {
mdns::Event::Discovered(peers_and_addrs) => {
for (peer, addr) in peers_and_addrs {
info!(target: LOG_TARGET, "📡 mDNS discovered peer {} at {}", peer, addr);
debug!(target: LOG_TARGET, "📡 mDNS discovered peer {} at {}", peer, addr);
self.swarm
.dial(DialOpts::peer_id(peer).addresses(vec![addr]).build())
.or_else(|err| {
Expand Down

0 comments on commit 55a346e

Please sign in to comment.