Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add foreign proposal validation #889

Merged
merged 1 commit into from
Jan 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 28 additions & 43 deletions dan_layer/consensus/src/hotstuff/on_inbound_message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,11 @@ pub struct OnInboundMessage<TConsensusSpec: ConsensusSpec> {
leader_strategy: TConsensusSpec::LeaderStrategy,
pacemaker: PaceMakerHandle,
vote_signing_service: TConsensusSpec::SignatureService,
rx_hotstuff_message: mpsc::Receiver<(TConsensusSpec::Addr, HotstuffMessage)>,
pub rx_hotstuff_message: mpsc::Receiver<(TConsensusSpec::Addr, HotstuffMessage)>,
tx_outbound_message: mpsc::Sender<(TConsensusSpec::Addr, HotstuffMessage)>,
tx_msg_ready: mpsc::UnboundedSender<(TConsensusSpec::Addr, HotstuffMessage)>,
rx_new_transactions: mpsc::Receiver<TransactionId>,
message_buffer: MessageBuffer<TConsensusSpec::Addr>,
pub rx_new_transactions: mpsc::Receiver<TransactionId>,
pub message_buffer: MessageBuffer<TConsensusSpec::Addr>,
shutdown: ShutdownSignal,
}

Expand Down Expand Up @@ -73,32 +73,6 @@ where TConsensusSpec: ConsensusSpec
}
}

pub async fn next(&mut self, current_height: NodeHeight) -> IncomingMessageResult<TConsensusSpec::Addr> {
loop {
tokio::select! {
biased;

_ = self.shutdown.wait() => { break Ok(None); }

msg_or_sync = self.message_buffer.next(current_height) => {
break msg_or_sync;
},

Some((from, msg)) = self.rx_hotstuff_message.recv() => {
if let Err(err) = self.handle_hotstuff_message(current_height, from, msg).await {
error!(target: LOG_TARGET, "Error handling message: {}", err);
}
},

Some(tx_id) = self.rx_new_transactions.recv() => {
if let Err(err) = self.check_if_parked_blocks_ready(current_height, &tx_id).await {
error!(target: LOG_TARGET, "Error checking parked blocks: {}", err);
}
},
}
}
}

pub async fn discard(&mut self) {
loop {
tokio::select! {
Expand All @@ -123,7 +97,15 @@ where TConsensusSpec: ConsensusSpec
) -> Result<(), HotStuffError> {
match msg {
HotstuffMessage::Proposal(msg) => {
self.process_proposal(current_height, msg).await?;
self.process_local_proposal(current_height, msg).await?;
},
HotstuffMessage::ForeignProposal(ref proposal) => {
self.check_proposal(proposal.block.clone()).await?;
self.tx_msg_ready
.send((from, msg))
.map_err(|_| HotStuffError::InternalChannelClosed {
context: "tx_msg_ready in InboundMessageWorker::handle_hotstuff_message",
})?;
},
msg => self
.tx_msg_ready
Expand All @@ -135,7 +117,19 @@ where TConsensusSpec: ConsensusSpec
Ok(())
}

async fn process_proposal(
async fn check_proposal(&self, block: Block) -> Result<Option<Block>, HotStuffError> {
check_hash_and_height(&block)?;
let committee_for_block = self
.epoch_manager
.get_committee_by_validator_public_key(block.epoch(), block.proposed_by())
.await?;
check_proposed_by_leader(&self.leader_strategy, &committee_for_block, &block)?;
check_signature(&block)?;
check_quorum_certificate::<TConsensusSpec>(&block, &self.vote_signing_service, &self.epoch_manager).await?;
self.handle_missing_transactions(block).await
}

async fn process_local_proposal(
&self,
current_height: NodeHeight,
proposal: ProposalMessage,
Expand All @@ -160,16 +154,7 @@ where TConsensusSpec: ConsensusSpec
return Ok(());
}

check_hash_and_height(&block)?;
let committee_for_block = self
.epoch_manager
.get_committee_by_validator_public_key(block.epoch(), block.proposed_by())
.await?;
check_proposed_by_leader(&self.leader_strategy, &committee_for_block, &block)?;
check_signature(&block)?;
check_quorum_certificate::<TConsensusSpec>(&block, &self.vote_signing_service, &self.epoch_manager).await?;

let Some(ready_block) = self.handle_missing_transactions(block).await? else {
let Some(ready_block) = self.check_proposal(block).await? else {
// Block not ready
return Ok(());
};
Expand All @@ -184,7 +169,7 @@ where TConsensusSpec: ConsensusSpec
Ok(())
}

async fn check_if_parked_blocks_ready(
pub async fn check_if_parked_blocks_ready(
&self,
current_height: NodeHeight,
transaction_id: &TransactionId,
Expand Down Expand Up @@ -301,7 +286,7 @@ where TConsensusSpec: ConsensusSpec
}
}

struct MessageBuffer<TAddr> {
pub struct MessageBuffer<TAddr> {
buffer: BTreeMap<NodeHeight, VecDeque<(TAddr, HotstuffMessage)>>,
rx_msg_ready: mpsc::UnboundedReceiver<(TAddr, HotstuffMessage)>,
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -950,6 +950,7 @@ where TConsensusSpec: ConsensusSpec
// committees and within committees are not different in terms of size, speed, etc.
let diff_from_leader = (my_index + local_committee.len() - leader_index as usize) % local_committee.len();
// f+1 nodes (always including the leader) send the proposal to the foreign committee
// if diff_from_leader <= (local_committee.len() - 1) / 3 + 1 {
if diff_from_leader <= local_committee.len() / 3 {
self.proposer.broadcast_proposal_foreignly(&block).await?;
}
Expand Down
15 changes: 14 additions & 1 deletion dan_layer/consensus/src/hotstuff/on_receive_foreign_proposal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,13 +69,26 @@ where TConsensusSpec: ConsensusSpec
.epoch_manager
.get_committee_shard(block.epoch(), vn.shard_key)
.await?;
let foreign_proposal = ForeignProposal::new(committee_shard.bucket(), *block.id());
if self
.store
.with_read_tx(|tx| ForeignProposal::exists(tx, &foreign_proposal))?
{
warn!(
target: LOG_TARGET,
"🔥 FOREIGN PROPOSAL: Already received proposal for block {}",
block.id(),
);
return Ok(());
}

let local_shard = self.epoch_manager.get_local_committee_shard(block.epoch()).await?;
self.validate_proposed_block(&from, &block, committee_shard.bucket(), local_shard.bucket())?;
// Is this ok? Can foreign node send invalid block that should still increment the counter?
self.foreign_receive_counter.increment(&committee_shard.bucket());
self.store.with_write_tx(|tx| {
self.foreign_receive_counter.save(tx)?;
ForeignProposal::new(committee_shard.bucket(), *block.id()).upsert(tx)?;
foreign_proposal.upsert(tx)?;
self.on_receive_foreign_block(tx, &block, &committee_shard)
})?;

Expand Down
14 changes: 13 additions & 1 deletion dan_layer/consensus/src/hotstuff/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -222,13 +222,25 @@ where TConsensusSpec: ConsensusSpec
);

tokio::select! {
msg_or_sync = self.inbound_message_worker.next(current_height) => {
msg_or_sync = self.inbound_message_worker.message_buffer.next(current_height) => {
if let Err(e) = self.on_new_hs_message(msg_or_sync).await {
self.on_failure("on_new_hs_message", &e).await;
return Err(e);
}
},

Some((from, msg)) = self.inbound_message_worker.rx_hotstuff_message.recv() => {
if let Err(err) = self.inbound_message_worker.handle_hotstuff_message(current_height, from, msg).await {
error!(target: LOG_TARGET, "Error handling message: {}", err);
}
},

Some(tx_id) = self.inbound_message_worker.rx_new_transactions.recv() => {
if let Err(err) = self.inbound_message_worker.check_if_parked_blocks_ready(current_height, &tx_id).await {
error!(target: LOG_TARGET, "Error checking parked blocks: {}", err);
}
},

Ok(event) = epoch_manager_events.recv() => {
self.handle_epoch_manager_event(event).await?;
},
Expand Down
Loading