Skip to content

Commit

Permalink
refactor(consensus): clean up inbound message processing
Browse files Browse the repository at this point in the history
  • Loading branch information
sdbondi committed Jan 12, 2024
1 parent 63a1563 commit 62ab3b5
Show file tree
Hide file tree
Showing 2 changed files with 74 additions and 69 deletions.
94 changes: 43 additions & 51 deletions dan_layer/consensus/src/hotstuff/on_inbound_message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,9 @@ pub struct OnInboundMessage<TConsensusSpec: ConsensusSpec> {
leader_strategy: TConsensusSpec::LeaderStrategy,
pacemaker: PaceMakerHandle,
vote_signing_service: TConsensusSpec::SignatureService,
pub rx_hotstuff_message: mpsc::Receiver<(TConsensusSpec::Addr, HotstuffMessage)>,
tx_outbound_message: mpsc::Sender<(TConsensusSpec::Addr, HotstuffMessage)>,
tx_msg_ready: mpsc::UnboundedSender<(TConsensusSpec::Addr, HotstuffMessage)>,
pub rx_new_transactions: mpsc::Receiver<TransactionId>,
pub message_buffer: MessageBuffer<TConsensusSpec::Addr>,
message_buffer: MessageBuffer<TConsensusSpec::Addr>,
shutdown: ShutdownSignal,
}

Expand All @@ -52,9 +50,7 @@ where TConsensusSpec: ConsensusSpec
leader_strategy: TConsensusSpec::LeaderStrategy,
pacemaker: PaceMakerHandle,
vote_signing_service: TConsensusSpec::SignatureService,
rx_hotstuff_message: mpsc::Receiver<(TConsensusSpec::Addr, HotstuffMessage)>,
tx_outbound_message: mpsc::Sender<(TConsensusSpec::Addr, HotstuffMessage)>,
rx_new_transactions: mpsc::Receiver<TransactionId>,
shutdown: ShutdownSignal,
) -> Self {
let (tx_msg_ready, rx_msg_ready) = mpsc::unbounded_channel();
Expand All @@ -64,32 +60,14 @@ where TConsensusSpec: ConsensusSpec
leader_strategy,
pacemaker,
vote_signing_service,
rx_hotstuff_message,
tx_outbound_message,
tx_msg_ready,
rx_new_transactions,
message_buffer: MessageBuffer::new(rx_msg_ready),
shutdown,
}
}

pub async fn discard(&mut self) {
loop {
tokio::select! {
biased;
_ = self.shutdown.wait() => { break; }
_ = self.message_buffer.discard() => { }
_ = self.rx_hotstuff_message.recv() => { },
_ = self.rx_new_transactions.recv() => { },
}
}
}

pub fn clear_buffer(&mut self) {
self.message_buffer.clear_buffer();
}

pub async fn handle_hotstuff_message(
pub async fn handle(
&self,
current_height: NodeHeight,
from: TConsensusSpec::Addr,
Expand All @@ -101,22 +79,35 @@ where TConsensusSpec: ConsensusSpec
},
HotstuffMessage::ForeignProposal(ref proposal) => {
self.check_proposal(proposal.block.clone()).await?;
self.tx_msg_ready
.send((from, msg))
.map_err(|_| HotStuffError::InternalChannelClosed {
context: "tx_msg_ready in InboundMessageWorker::handle_hotstuff_message",
})?;
self.report_message_ready(from, msg)?;
},
msg => {
self.report_message_ready(from, msg)?;
},
msg => self
.tx_msg_ready
.send((from, msg))
.map_err(|_| HotStuffError::InternalChannelClosed {
context: "tx_msg_ready in InboundMessageWorker::handle_hotstuff_message",
})?,
}
Ok(())
}

/// Returns the next message that is ready for consensus. The future returned from this function is cancel safe, and
/// can be used with tokio::select! macro.
pub async fn next_message(&mut self, current_height: NodeHeight) -> IncomingMessageResult<TConsensusSpec::Addr> {
self.message_buffer.next(current_height).await
}

pub async fn discard(&mut self) {
loop {
tokio::select! {
biased;
_ = self.shutdown.wait() => { break; }
_ = self.message_buffer.discard() => { }
}
}
}

pub fn clear_buffer(&mut self) {
self.message_buffer.clear_buffer();
}

async fn check_proposal(&self, block: Block) -> Result<Option<Block>, HotStuffError> {
check_hash_and_height(&block)?;
let committee_for_block = self
Expand Down Expand Up @@ -164,21 +155,19 @@ where TConsensusSpec: ConsensusSpec
.get_validator_node_by_public_key(ready_block.epoch(), ready_block.proposed_by())
.await?;

self.send_ready_block(vn.address, ready_block)?;
self.report_message_ready(
vn.address,
HotstuffMessage::Proposal(ProposalMessage { block: ready_block }),
)?;

Ok(())
}

pub async fn check_if_parked_blocks_ready(
pub async fn update_parked_blocks(
&self,
current_height: NodeHeight,
transaction_id: &TransactionId,
) -> Result<(), HotStuffError> {
debug!(
target: LOG_TARGET,
"🚀 Consensus (height={}) READY for new transaction with id: {}",current_height,
transaction_id
);
let maybe_unparked_block = self
.store
.with_write_tx(|tx| tx.missing_transactions_remove(current_height, transaction_id))?;
Expand All @@ -191,12 +180,23 @@ where TConsensusSpec: ConsensusSpec
.get_validator_node_by_public_key(unparked_block.epoch(), unparked_block.proposed_by())
.await?;

self.send_ready_block(vn.address, unparked_block)?;
self.report_message_ready(
vn.address,
HotstuffMessage::Proposal(ProposalMessage { block: unparked_block }),
)?;
}
self.pacemaker.beat();
Ok(())
}

fn report_message_ready(&self, from: TConsensusSpec::Addr, msg: HotstuffMessage) -> Result<(), HotStuffError> {
self.tx_msg_ready
.send((from, msg))
.map_err(|_| HotStuffError::InternalChannelClosed {
context: "tx_msg_ready in InboundMessageWorker::handle_hotstuff_message",
})
}

async fn handle_missing_transactions(&self, block: Block) -> Result<Option<Block>, HotStuffError> {
let (missing_tx_ids, awaiting_execution) = self
.store
Expand Down Expand Up @@ -276,14 +276,6 @@ where TConsensusSpec: ConsensusSpec
);
}
}

fn send_ready_block(&self, dest_addr: TConsensusSpec::Addr, block: Block) -> Result<(), HotStuffError> {
self.tx_msg_ready
.send((dest_addr, HotstuffMessage::Proposal(ProposalMessage { block })))
.map_err(|_| HotStuffError::InternalChannelClosed {
context: "tx_msg_ready in InboundMessageWorker::process_proposal",
})
}
}

pub struct MessageBuffer<TAddr> {
Expand Down
49 changes: 31 additions & 18 deletions dan_layer/consensus/src/hotstuff/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,10 @@ pub struct HotstuffWorker<TConsensusSpec: ConsensusSpec> {

tx_events: broadcast::Sender<HotstuffEvent>,
tx_leader: mpsc::Sender<(TConsensusSpec::Addr, HotstuffMessage)>,
inbound_message_worker: OnInboundMessage<TConsensusSpec>,
rx_hotstuff_message: mpsc::Receiver<(TConsensusSpec::Addr, HotstuffMessage)>,
rx_new_transactions: mpsc::Receiver<TransactionId>,

on_inbound_message: OnInboundMessage<TConsensusSpec>,
on_next_sync_view: OnNextSyncViewHandler<TConsensusSpec>,
on_receive_local_proposal: OnReceiveProposalHandler<TConsensusSpec>,
on_receive_foreign_proposal: OnReceiveForeignProposalHandler<TConsensusSpec>,
Expand All @@ -75,7 +77,7 @@ impl<TConsensusSpec: ConsensusSpec> HotstuffWorker<TConsensusSpec> {
pub fn new(
validator_addr: TConsensusSpec::Addr,
rx_new_transactions: mpsc::Receiver<TransactionId>,
rx_hs_message: mpsc::Receiver<(TConsensusSpec::Addr, HotstuffMessage)>,
rx_hotstuff_message: mpsc::Receiver<(TConsensusSpec::Addr, HotstuffMessage)>,
state_store: TConsensusSpec::StateStore,
epoch_manager: TConsensusSpec::EpochManager,
leader_strategy: TConsensusSpec::LeaderStrategy,
Expand Down Expand Up @@ -103,15 +105,16 @@ impl<TConsensusSpec: ConsensusSpec> HotstuffWorker<TConsensusSpec> {
validator_addr: validator_addr.clone(),
tx_events: tx_events.clone(),
tx_leader: tx_leader.clone(),
inbound_message_worker: OnInboundMessage::new(
rx_hotstuff_message,
rx_new_transactions,

on_inbound_message: OnInboundMessage::new(
state_store.clone(),
epoch_manager.clone(),
leader_strategy.clone(),
pacemaker.clone_handle(),
signing_service.clone(),
rx_hs_message,
tx_leader.clone(),
rx_new_transactions,
shutdown.clone(),
),

Expand Down Expand Up @@ -222,21 +225,21 @@ where TConsensusSpec: ConsensusSpec
);

tokio::select! {
msg_or_sync = self.inbound_message_worker.message_buffer.next(current_height) => {
if let Err(e) = self.on_new_hs_message(msg_or_sync).await {
self.on_failure("on_new_hs_message", &e).await;
return Err(e);
Some((from, msg)) = self.rx_hotstuff_message.recv() => {
if let Err(err) = self.on_inbound_message.handle(current_height, from, msg).await {
error!(target: LOG_TARGET, "Error handling message: {}", err);
}
},

Some((from, msg)) = self.inbound_message_worker.rx_hotstuff_message.recv() => {
if let Err(err) = self.inbound_message_worker.handle_hotstuff_message(current_height, from, msg).await {
error!(target: LOG_TARGET, "Error handling message: {}", err);
msg_or_sync = self.on_inbound_message.next_message(current_height) => {
if let Err(e) = self.dispatch_hotstuff_message(msg_or_sync).await {
self.on_failure("on_new_hs_message", &e).await;
return Err(e);
}
},

Some(tx_id) = self.inbound_message_worker.rx_new_transactions.recv() => {
if let Err(err) = self.inbound_message_worker.check_if_parked_blocks_ready(current_height, &tx_id).await {
Some(tx_id) = self.rx_new_transactions.recv() => {
if let Err(err) = self.on_inbound_message.update_parked_blocks(current_height, &tx_id).await {
error!(target: LOG_TARGET, "Error checking parked blocks: {}", err);
}
},
Expand Down Expand Up @@ -274,7 +277,7 @@ where TConsensusSpec: ConsensusSpec
}

self.on_receive_new_view.clear_new_views();
self.inbound_message_worker.clear_buffer();
self.on_inbound_message.clear_buffer();
// This only happens if we're shutting down.
if let Err(err) = self.pacemaker.stop().await {
debug!(target: LOG_TARGET, "Pacemaker channel dropped: {}", err);
Expand Down Expand Up @@ -341,12 +344,22 @@ where TConsensusSpec: ConsensusSpec
error!(target: LOG_TARGET, "Error while stopping pacemaker: {}", e);
}
self.on_receive_new_view.clear_new_views();
self.inbound_message_worker.clear_buffer();
self.on_inbound_message.clear_buffer();
}

/// Read and discard messages. This should be used only when consensus is inactive.
pub async fn discard_messages(&mut self) {
self.inbound_message_worker.discard().await;
loop {
tokio::select! {
biased;
_ = self.shutdown.wait() => {
break;
},
_ = self.on_inbound_message.discard() => {},
_ = self.rx_hotstuff_message.recv() => {},
_ = self.rx_new_transactions.recv() => {}
}
}
}

async fn on_leader_timeout(&mut self, new_height: NodeHeight) -> Result<(), HotStuffError> {
Expand Down Expand Up @@ -404,7 +417,7 @@ where TConsensusSpec: ConsensusSpec
Ok(())
}

async fn on_new_hs_message(
async fn dispatch_hotstuff_message(
&mut self,
result: IncomingMessageResult<TConsensusSpec::Addr>,
) -> Result<(), HotStuffError> {
Expand Down

0 comments on commit 62ab3b5

Please sign in to comment.