Skip to content

Commit

Permalink
feat: add foreign proposal validation
Browse files Browse the repository at this point in the history
  • Loading branch information
Cifko committed Jan 12, 2024
1 parent b7ccff2 commit d0a0827
Show file tree
Hide file tree
Showing 4 changed files with 56 additions and 45 deletions.
71 changes: 28 additions & 43 deletions dan_layer/consensus/src/hotstuff/on_inbound_message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,11 @@ pub struct OnInboundMessage<TConsensusSpec: ConsensusSpec> {
leader_strategy: TConsensusSpec::LeaderStrategy,
pacemaker: PaceMakerHandle,
vote_signing_service: TConsensusSpec::SignatureService,
rx_hotstuff_message: mpsc::Receiver<(TConsensusSpec::Addr, HotstuffMessage)>,
pub rx_hotstuff_message: mpsc::Receiver<(TConsensusSpec::Addr, HotstuffMessage)>,
tx_outbound_message: mpsc::Sender<(TConsensusSpec::Addr, HotstuffMessage)>,
tx_msg_ready: mpsc::UnboundedSender<(TConsensusSpec::Addr, HotstuffMessage)>,
rx_new_transactions: mpsc::Receiver<TransactionId>,
message_buffer: MessageBuffer<TConsensusSpec::Addr>,
pub rx_new_transactions: mpsc::Receiver<TransactionId>,
pub message_buffer: MessageBuffer<TConsensusSpec::Addr>,
shutdown: ShutdownSignal,
}

Expand Down Expand Up @@ -73,32 +73,6 @@ where TConsensusSpec: ConsensusSpec
}
}

pub async fn next(&mut self, current_height: NodeHeight) -> IncomingMessageResult<TConsensusSpec::Addr> {
loop {
tokio::select! {
biased;

_ = self.shutdown.wait() => { break Ok(None); }

msg_or_sync = self.message_buffer.next(current_height) => {
break msg_or_sync;
},

Some((from, msg)) = self.rx_hotstuff_message.recv() => {
if let Err(err) = self.handle_hotstuff_message(current_height, from, msg).await {
error!(target: LOG_TARGET, "Error handling message: {}", err);
}
},

Some(tx_id) = self.rx_new_transactions.recv() => {
if let Err(err) = self.check_if_parked_blocks_ready(current_height, &tx_id).await {
error!(target: LOG_TARGET, "Error checking parked blocks: {}", err);
}
},
}
}
}

pub async fn discard(&mut self) {
loop {
tokio::select! {
Expand All @@ -123,7 +97,15 @@ where TConsensusSpec: ConsensusSpec
) -> Result<(), HotStuffError> {
match msg {
HotstuffMessage::Proposal(msg) => {
self.process_proposal(current_height, msg).await?;
self.process_local_proposal(current_height, msg).await?;
},
HotstuffMessage::ForeignProposal(ref proposal) => {
self.check_proposal(proposal.block.clone()).await?;
self.tx_msg_ready
.send((from, msg))
.map_err(|_| HotStuffError::InternalChannelClosed {
context: "tx_msg_ready in InboundMessageWorker::handle_hotstuff_message",
})?;
},
msg => self
.tx_msg_ready
Expand All @@ -135,7 +117,19 @@ where TConsensusSpec: ConsensusSpec
Ok(())
}

async fn process_proposal(
async fn check_proposal(&self, block: Block) -> Result<Option<Block>, HotStuffError> {
check_hash_and_height(&block)?;
let committee_for_block = self
.epoch_manager
.get_committee_by_validator_public_key(block.epoch(), block.proposed_by())
.await?;
check_proposed_by_leader(&self.leader_strategy, &committee_for_block, &block)?;
check_signature(&block)?;
check_quorum_certificate::<TConsensusSpec>(&block, &self.vote_signing_service, &self.epoch_manager).await?;
self.handle_missing_transactions(block).await
}

async fn process_local_proposal(
&self,
current_height: NodeHeight,
proposal: ProposalMessage,
Expand All @@ -160,16 +154,7 @@ where TConsensusSpec: ConsensusSpec
return Ok(());
}

check_hash_and_height(&block)?;
let committee_for_block = self
.epoch_manager
.get_committee_by_validator_public_key(block.epoch(), block.proposed_by())
.await?;
check_proposed_by_leader(&self.leader_strategy, &committee_for_block, &block)?;
check_signature(&block)?;
check_quorum_certificate::<TConsensusSpec>(&block, &self.vote_signing_service, &self.epoch_manager).await?;

let Some(ready_block) = self.handle_missing_transactions(block).await? else {
let Some(ready_block) = self.check_proposal(block).await? else {
// Block not ready
return Ok(());
};
Expand All @@ -184,7 +169,7 @@ where TConsensusSpec: ConsensusSpec
Ok(())
}

async fn check_if_parked_blocks_ready(
pub async fn check_if_parked_blocks_ready(
&self,
current_height: NodeHeight,
transaction_id: &TransactionId,
Expand Down Expand Up @@ -301,7 +286,7 @@ where TConsensusSpec: ConsensusSpec
}
}

struct MessageBuffer<TAddr> {
pub struct MessageBuffer<TAddr> {
buffer: BTreeMap<NodeHeight, VecDeque<(TAddr, HotstuffMessage)>>,
rx_msg_ready: mpsc::UnboundedReceiver<(TAddr, HotstuffMessage)>,
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -950,6 +950,7 @@ where TConsensusSpec: ConsensusSpec
// committees and within committees are not different in terms of size, speed, etc.
let diff_from_leader = (my_index + local_committee.len() - leader_index as usize) % local_committee.len();
// f+1 nodes (always including the leader) send the proposal to the foreign committee
// if diff_from_leader <= (local_committee.len() - 1) / 3 + 1 {
if diff_from_leader <= local_committee.len() / 3 {
self.proposer.broadcast_proposal_foreignly(&block).await?;
}
Expand Down
15 changes: 14 additions & 1 deletion dan_layer/consensus/src/hotstuff/on_receive_foreign_proposal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,13 +69,26 @@ where TConsensusSpec: ConsensusSpec
.epoch_manager
.get_committee_shard(block.epoch(), vn.shard_key)
.await?;
let foreign_proposal = ForeignProposal::new(committee_shard.bucket(), *block.id());
if self
.store
.with_read_tx(|tx| ForeignProposal::exists(tx, &foreign_proposal))?
{
warn!(
target: LOG_TARGET,
"🔥 FOREIGN PROPOSAL: Already received proposal for block {}",
block.id(),
);
return Ok(());
}

let local_shard = self.epoch_manager.get_local_committee_shard(block.epoch()).await?;
self.validate_proposed_block(&from, &block, committee_shard.bucket(), local_shard.bucket())?;
// Is this ok? Can foreign node send invalid block that should still increment the counter?
self.foreign_receive_counter.increment(&committee_shard.bucket());
self.store.with_write_tx(|tx| {
self.foreign_receive_counter.save(tx)?;
ForeignProposal::new(committee_shard.bucket(), *block.id()).upsert(tx)?;
foreign_proposal.upsert(tx)?;
self.on_receive_foreign_block(tx, &block, &committee_shard)
})?;

Expand Down
14 changes: 13 additions & 1 deletion dan_layer/consensus/src/hotstuff/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -222,13 +222,25 @@ where TConsensusSpec: ConsensusSpec
);

tokio::select! {
msg_or_sync = self.inbound_message_worker.next(current_height) => {
msg_or_sync = self.inbound_message_worker.message_buffer.next(current_height) => {
if let Err(e) = self.on_new_hs_message(msg_or_sync).await {
self.on_failure("on_new_hs_message", &e).await;
return Err(e);
}
},

Some((from, msg)) = self.inbound_message_worker.rx_hotstuff_message.recv() => {
if let Err(err) = self.inbound_message_worker.handle_hotstuff_message(current_height, from, msg).await {
error!(target: LOG_TARGET, "Error handling message: {}", err);
}
},

Some(tx_id) = self.inbound_message_worker.rx_new_transactions.recv() => {
if let Err(err) = self.inbound_message_worker.check_if_parked_blocks_ready(current_height, &tx_id).await {
error!(target: LOG_TARGET, "Error checking parked blocks: {}", err);
}
},

Ok(event) = epoch_manager_events.recv() => {
self.handle_epoch_manager_event(event).await?;
},
Expand Down

0 comments on commit d0a0827

Please sign in to comment.