From d0a08275d1c02ec13b69e67a11bab3c90d07af39 Mon Sep 17 00:00:00 2001 From: Cifko Date: Thu, 11 Jan 2024 21:45:37 +0100 Subject: [PATCH] feat: add foreign proposal validation --- .../src/hotstuff/on_inbound_message.rs | 71 ++++++++----------- .../on_ready_to_vote_on_local_block.rs | 1 + .../hotstuff/on_receive_foreign_proposal.rs | 15 +++- dan_layer/consensus/src/hotstuff/worker.rs | 14 +++- 4 files changed, 56 insertions(+), 45 deletions(-) diff --git a/dan_layer/consensus/src/hotstuff/on_inbound_message.rs b/dan_layer/consensus/src/hotstuff/on_inbound_message.rs index 740f65d0c..d42de8c20 100644 --- a/dan_layer/consensus/src/hotstuff/on_inbound_message.rs +++ b/dan_layer/consensus/src/hotstuff/on_inbound_message.rs @@ -35,11 +35,11 @@ pub struct OnInboundMessage { leader_strategy: TConsensusSpec::LeaderStrategy, pacemaker: PaceMakerHandle, vote_signing_service: TConsensusSpec::SignatureService, - rx_hotstuff_message: mpsc::Receiver<(TConsensusSpec::Addr, HotstuffMessage)>, + pub rx_hotstuff_message: mpsc::Receiver<(TConsensusSpec::Addr, HotstuffMessage)>, tx_outbound_message: mpsc::Sender<(TConsensusSpec::Addr, HotstuffMessage)>, tx_msg_ready: mpsc::UnboundedSender<(TConsensusSpec::Addr, HotstuffMessage)>, - rx_new_transactions: mpsc::Receiver, - message_buffer: MessageBuffer, + pub rx_new_transactions: mpsc::Receiver, + pub message_buffer: MessageBuffer, shutdown: ShutdownSignal, } @@ -73,32 +73,6 @@ where TConsensusSpec: ConsensusSpec } } - pub async fn next(&mut self, current_height: NodeHeight) -> IncomingMessageResult { - loop { - tokio::select! { - biased; - - _ = self.shutdown.wait() => { break Ok(None); } - - msg_or_sync = self.message_buffer.next(current_height) => { - break msg_or_sync; - }, - - Some((from, msg)) = self.rx_hotstuff_message.recv() => { - if let Err(err) = self.handle_hotstuff_message(current_height, from, msg).await { - error!(target: LOG_TARGET, "Error handling message: {}", err); - } - }, - - Some(tx_id) = self.rx_new_transactions.recv() => { - if let Err(err) = self.check_if_parked_blocks_ready(current_height, &tx_id).await { - error!(target: LOG_TARGET, "Error checking parked blocks: {}", err); - } - }, - } - } - } - pub async fn discard(&mut self) { loop { tokio::select! { @@ -123,7 +97,15 @@ where TConsensusSpec: ConsensusSpec ) -> Result<(), HotStuffError> { match msg { HotstuffMessage::Proposal(msg) => { - self.process_proposal(current_height, msg).await?; + self.process_local_proposal(current_height, msg).await?; + }, + HotstuffMessage::ForeignProposal(ref proposal) => { + self.check_proposal(proposal.block.clone()).await?; + self.tx_msg_ready + .send((from, msg)) + .map_err(|_| HotStuffError::InternalChannelClosed { + context: "tx_msg_ready in InboundMessageWorker::handle_hotstuff_message", + })?; }, msg => self .tx_msg_ready @@ -135,7 +117,19 @@ where TConsensusSpec: ConsensusSpec Ok(()) } - async fn process_proposal( + async fn check_proposal(&self, block: Block) -> Result, HotStuffError> { + check_hash_and_height(&block)?; + let committee_for_block = self + .epoch_manager + .get_committee_by_validator_public_key(block.epoch(), block.proposed_by()) + .await?; + check_proposed_by_leader(&self.leader_strategy, &committee_for_block, &block)?; + check_signature(&block)?; + check_quorum_certificate::(&block, &self.vote_signing_service, &self.epoch_manager).await?; + self.handle_missing_transactions(block).await + } + + async fn process_local_proposal( &self, current_height: NodeHeight, proposal: ProposalMessage, @@ -160,16 +154,7 @@ where TConsensusSpec: ConsensusSpec return Ok(()); } - check_hash_and_height(&block)?; - let committee_for_block = self - .epoch_manager - .get_committee_by_validator_public_key(block.epoch(), block.proposed_by()) - .await?; - check_proposed_by_leader(&self.leader_strategy, &committee_for_block, &block)?; - check_signature(&block)?; - check_quorum_certificate::(&block, &self.vote_signing_service, &self.epoch_manager).await?; - - let Some(ready_block) = self.handle_missing_transactions(block).await? else { + let Some(ready_block) = self.check_proposal(block).await? else { // Block not ready return Ok(()); }; @@ -184,7 +169,7 @@ where TConsensusSpec: ConsensusSpec Ok(()) } - async fn check_if_parked_blocks_ready( + pub async fn check_if_parked_blocks_ready( &self, current_height: NodeHeight, transaction_id: &TransactionId, @@ -301,7 +286,7 @@ where TConsensusSpec: ConsensusSpec } } -struct MessageBuffer { +pub struct MessageBuffer { buffer: BTreeMap>, rx_msg_ready: mpsc::UnboundedReceiver<(TAddr, HotstuffMessage)>, } diff --git a/dan_layer/consensus/src/hotstuff/on_ready_to_vote_on_local_block.rs b/dan_layer/consensus/src/hotstuff/on_ready_to_vote_on_local_block.rs index 3af236249..4670054d4 100644 --- a/dan_layer/consensus/src/hotstuff/on_ready_to_vote_on_local_block.rs +++ b/dan_layer/consensus/src/hotstuff/on_ready_to_vote_on_local_block.rs @@ -950,6 +950,7 @@ where TConsensusSpec: ConsensusSpec // committees and within committees are not different in terms of size, speed, etc. let diff_from_leader = (my_index + local_committee.len() - leader_index as usize) % local_committee.len(); // f+1 nodes (always including the leader) send the proposal to the foreign committee + // if diff_from_leader <= (local_committee.len() - 1) / 3 + 1 { if diff_from_leader <= local_committee.len() / 3 { self.proposer.broadcast_proposal_foreignly(&block).await?; } diff --git a/dan_layer/consensus/src/hotstuff/on_receive_foreign_proposal.rs b/dan_layer/consensus/src/hotstuff/on_receive_foreign_proposal.rs index a628bd66b..ac0ca5610 100644 --- a/dan_layer/consensus/src/hotstuff/on_receive_foreign_proposal.rs +++ b/dan_layer/consensus/src/hotstuff/on_receive_foreign_proposal.rs @@ -69,13 +69,26 @@ where TConsensusSpec: ConsensusSpec .epoch_manager .get_committee_shard(block.epoch(), vn.shard_key) .await?; + let foreign_proposal = ForeignProposal::new(committee_shard.bucket(), *block.id()); + if self + .store + .with_read_tx(|tx| ForeignProposal::exists(tx, &foreign_proposal))? + { + warn!( + target: LOG_TARGET, + "🔥 FOREIGN PROPOSAL: Already received proposal for block {}", + block.id(), + ); + return Ok(()); + } + let local_shard = self.epoch_manager.get_local_committee_shard(block.epoch()).await?; self.validate_proposed_block(&from, &block, committee_shard.bucket(), local_shard.bucket())?; // Is this ok? Can foreign node send invalid block that should still increment the counter? self.foreign_receive_counter.increment(&committee_shard.bucket()); self.store.with_write_tx(|tx| { self.foreign_receive_counter.save(tx)?; - ForeignProposal::new(committee_shard.bucket(), *block.id()).upsert(tx)?; + foreign_proposal.upsert(tx)?; self.on_receive_foreign_block(tx, &block, &committee_shard) })?; diff --git a/dan_layer/consensus/src/hotstuff/worker.rs b/dan_layer/consensus/src/hotstuff/worker.rs index d5bbddf6f..6754f7270 100644 --- a/dan_layer/consensus/src/hotstuff/worker.rs +++ b/dan_layer/consensus/src/hotstuff/worker.rs @@ -222,13 +222,25 @@ where TConsensusSpec: ConsensusSpec ); tokio::select! { - msg_or_sync = self.inbound_message_worker.next(current_height) => { + msg_or_sync = self.inbound_message_worker.message_buffer.next(current_height) => { if let Err(e) = self.on_new_hs_message(msg_or_sync).await { self.on_failure("on_new_hs_message", &e).await; return Err(e); } }, + Some((from, msg)) = self.inbound_message_worker.rx_hotstuff_message.recv() => { + if let Err(err) = self.inbound_message_worker.handle_hotstuff_message(current_height, from, msg).await { + error!(target: LOG_TARGET, "Error handling message: {}", err); + } + }, + + Some(tx_id) = self.inbound_message_worker.rx_new_transactions.recv() => { + if let Err(err) = self.inbound_message_worker.check_if_parked_blocks_ready(current_height, &tx_id).await { + error!(target: LOG_TARGET, "Error checking parked blocks: {}", err); + } + }, + Ok(event) = epoch_manager_events.recv() => { self.handle_epoch_manager_event(event).await?; },