From ac444e2d2fd5e609dfdcfbe1a4e2b821ef419b41 Mon Sep 17 00:00:00 2001 From: Stan Bondi Date: Thu, 11 Jan 2024 12:51:26 +0400 Subject: [PATCH] fix(consensus): dont send dummy blocks on catch up --- dan_layer/consensus/src/hotstuff/error.rs | 2 +- .../src/hotstuff/on_receive_local_proposal.rs | 4 +- .../consensus/src/hotstuff/on_sync_request.rs | 21 +---- .../src/hotstuff/on_sync_response.rs | 82 ------------------- dan_layer/consensus/src/hotstuff/worker.rs | 6 +- dan_layer/state_store_sqlite/src/reader.rs | 10 ++- .../storage/src/consensus_models/block.rs | 3 +- dan_layer/storage/src/state_store/mod.rs | 1 + networking/libp2p-peersync/src/error.rs | 2 - 9 files changed, 19 insertions(+), 112 deletions(-) delete mode 100644 dan_layer/consensus/src/hotstuff/on_sync_response.rs diff --git a/dan_layer/consensus/src/hotstuff/error.rs b/dan_layer/consensus/src/hotstuff/error.rs index 8f2962d141..b5d71f2403 100644 --- a/dan_layer/consensus/src/hotstuff/error.rs +++ b/dan_layer/consensus/src/hotstuff/error.rs @@ -148,7 +148,7 @@ pub enum ProposalValidationError { }, #[error("Proposed block {block_id} {height} already has been processed")] BlockAlreadyProcessed { block_id: BlockId, height: NodeHeight }, - #[error("Proposed block {block_id} {height} doesn't have signature")] + #[error("Proposed block {block_id} {height} doesn't have a signature")] MissingSignature { block_id: BlockId, height: NodeHeight }, #[error("Proposed block {block_id} {height} has invalid signature")] InvalidSignature { block_id: BlockId, height: NodeHeight }, diff --git a/dan_layer/consensus/src/hotstuff/on_receive_local_proposal.rs b/dan_layer/consensus/src/hotstuff/on_receive_local_proposal.rs index 7e6f032998..7492833161 100644 --- a/dan_layer/consensus/src/hotstuff/on_receive_local_proposal.rs +++ b/dan_layer/consensus/src/hotstuff/on_receive_local_proposal.rs @@ -36,7 +36,7 @@ use crate::{ const LOG_TARGET: &str = "tari::dan::consensus::hotstuff::on_receive_local_proposal"; -pub struct OnReceiveProposalHandler { +pub struct OnReceiveLocalProposalHandler { store: TConsensusSpec::StateStore, epoch_manager: TConsensusSpec::EpochManager, leader_strategy: TConsensusSpec::LeaderStrategy, @@ -44,7 +44,7 @@ pub struct OnReceiveProposalHandler { on_ready_to_vote_on_local_block: OnReadyToVoteOnLocalBlock, } -impl OnReceiveProposalHandler { +impl OnReceiveLocalProposalHandler { pub fn new( validator_addr: TConsensusSpec::Addr, store: TConsensusSpec::StateStore, diff --git a/dan_layer/consensus/src/hotstuff/on_sync_request.rs b/dan_layer/consensus/src/hotstuff/on_sync_request.rs index c76b3c43d0..4ffe5dbb96 100644 --- a/dan_layer/consensus/src/hotstuff/on_sync_request.rs +++ b/dan_layer/consensus/src/hotstuff/on_sync_request.rs @@ -48,7 +48,7 @@ impl OnSyncRequest { msg.high_qc, last_voted ); - let blocks = Block::get_all_blocks_between(tx, msg.high_qc.block_id(), last_voted.block_id())?; + let blocks = Block::get_all_blocks_between(tx, msg.high_qc.block_id(), last_voted.block_id(), false)?; debug!( target: LOG_TARGET, @@ -58,25 +58,6 @@ impl OnSyncRequest { ); Ok::<_, HotStuffError>(blocks) - - // let mut full_blocks = Vec::with_capacity(blocks.len()); - // for block in blocks { - // let all_qcs = block - // .commands() - // .iter() - // .flat_map(|cmd| cmd.evidence().qc_ids_iter()) - // .collect::>(); - // let qcs = QuorumCertificate::get_all(tx, all_qcs)?; - // let transactions = block.get_transactions(tx)?; - // - // full_blocks.push(FullBlock { - // block, - // qcs, - // transactions: transactions.into_iter().map(|t| t.into_transaction()).collect(), - // }); - // } - // - // Ok::<_, HotStuffError>(full_blocks) }); let blocks = match result { diff --git a/dan_layer/consensus/src/hotstuff/on_sync_response.rs b/dan_layer/consensus/src/hotstuff/on_sync_response.rs deleted file mode 100644 index 7b89d8a4e8..0000000000 --- a/dan_layer/consensus/src/hotstuff/on_sync_response.rs +++ /dev/null @@ -1,82 +0,0 @@ -// Copyright 2023 The Tari Project -// SPDX-License-Identifier: BSD-3-Clause - -// Copyright 2023 The Tari Project -// SPDX-License-Identifier: BSD-3-Clause - -use std::collections::HashSet; - -use log::*; -use tari_dan_storage::{consensus_models::Block, StateStore}; -use tari_transaction::Transaction; -use tokio::sync::mpsc; - -use crate::{hotstuff::HotStuffError, messages::SyncResponseMessage, traits::ConsensusSpec}; - -const LOG_TARGET: &str = "tari::dan::consensus::hotstuff::on_sync_request"; - -#[derive(Debug)] -pub struct OnSyncResponse { - store: TConsensusSpec::StateStore, - inflight_requests: HashSet, - tx_mempool: mpsc::UnboundedSender, -} - -impl OnSyncResponse { - pub fn new(store: TConsensusSpec::StateStore, tx_mempool: mpsc::UnboundedSender) -> Self { - Self { - store, - inflight_requests: HashSet::new(), - tx_mempool, - } - } - - pub fn add_inflight_request(&mut self, addr: TConsensusSpec::Addr) { - self.inflight_requests.insert(addr); - } - - pub fn handle( - &mut self, - from: TConsensusSpec::Addr, - msg: SyncResponseMessage, - ) -> Result>, HotStuffError> { - if !self.inflight_requests.remove(&from) { - warn!( - target: LOG_TARGET, - "⚠️ Ignoring unrequested SyncResponse from {}", from - ); - return Ok(vec![]); - } - - if msg.blocks.is_empty() { - warn!( - target: LOG_TARGET, - "⚠️ Ignoring empty SyncResponse from {}", from - ); - return Ok(vec![]); - } - - let mut blocks = Vec::with_capacity(msg.blocks.len()); - for full_block in msg.blocks { - for transaction in full_block.transactions { - if self.tx_mempool.send(transaction).is_err() { - warn!(target: LOG_TARGET, "Mempool channel closed while sending transactions from SyncResponse"); - return Ok(vec![]); - } - } - self.store.with_write_tx(|tx| { - // TODO: validate - for qc in full_block.qcs { - qc.save(tx)?; - } - Ok::<_, HotStuffError>(()) - })?; - - blocks.push(full_block.block); - } - - blocks.sort_by_key(|b| b.height()); - - Ok(blocks) - } -} diff --git a/dan_layer/consensus/src/hotstuff/worker.rs b/dan_layer/consensus/src/hotstuff/worker.rs index 42fdb40134..a3579a5651 100644 --- a/dan_layer/consensus/src/hotstuff/worker.rs +++ b/dan_layer/consensus/src/hotstuff/worker.rs @@ -29,7 +29,7 @@ use crate::{ on_next_sync_view::OnNextSyncViewHandler, on_propose::OnPropose, on_receive_foreign_proposal::OnReceiveForeignProposalHandler, - on_receive_local_proposal::OnReceiveProposalHandler, + on_receive_local_proposal::OnReceiveLocalProposalHandler, on_receive_new_view::OnReceiveNewViewHandler, on_receive_request_missing_transactions::OnReceiveRequestMissingTransactions, on_receive_vote::OnReceiveVoteHandler, @@ -52,7 +52,7 @@ pub struct HotstuffWorker { inbound_message_worker: OnInboundMessage, on_next_sync_view: OnNextSyncViewHandler, - on_receive_local_proposal: OnReceiveProposalHandler, + on_receive_local_proposal: OnReceiveLocalProposalHandler, on_receive_foreign_proposal: OnReceiveForeignProposalHandler, on_receive_vote: OnReceiveVoteHandler, on_receive_new_view: OnReceiveNewViewHandler, @@ -120,7 +120,7 @@ impl HotstuffWorker { leader_strategy.clone(), epoch_manager.clone(), ), - on_receive_local_proposal: OnReceiveProposalHandler::new( + on_receive_local_proposal: OnReceiveLocalProposalHandler::new( validator_addr, state_store.clone(), epoch_manager.clone(), diff --git a/dan_layer/state_store_sqlite/src/reader.rs b/dan_layer/state_store_sqlite/src/reader.rs index 85fac6bbd3..fd9f3dabfd 100644 --- a/dan_layer/state_store_sqlite/src/reader.rs +++ b/dan_layer/state_store_sqlite/src/reader.rs @@ -609,6 +609,7 @@ impl StateStoreReadTransa &mut self, start_block_id_exclusive: &BlockId, end_block_id_inclusive: &BlockId, + include_dummy_blocks: bool, ) -> Result, StorageError> { use crate::schema::{blocks, quorum_certificates}; @@ -620,10 +621,17 @@ impl StateStoreReadTransa // Exclude start block block_ids.pop(); - let results = blocks::table + let mut query = blocks::table .left_join(quorum_certificates::table.on(blocks::qc_id.eq(quorum_certificates::qc_id))) .select((blocks::all_columns, quorum_certificates::all_columns.nullable())) .filter(blocks::block_id.eq_any(block_ids)) + .boxed(); + + if include_dummy_blocks { + query = query.filter(blocks::is_dummy.eq(false)).boxed(); + } + + let results = query .order_by(blocks::height.asc()) .get_results::<(sql_models::Block, Option)>(self.connection()) .map_err(|e| SqliteStorageError::DieselError { diff --git a/dan_layer/storage/src/consensus_models/block.rs b/dan_layer/storage/src/consensus_models/block.rs index f30c4c97b2..93159ffadd 100644 --- a/dan_layer/storage/src/consensus_models/block.rs +++ b/dan_layer/storage/src/consensus_models/block.rs @@ -358,8 +358,9 @@ impl Block { tx: &mut TTx, start_block_id_exclusive: &BlockId, end_block_id_inclusive: &BlockId, + include_dummy_blocks: bool, ) -> Result, StorageError> { - tx.blocks_get_all_between(start_block_id_exclusive, end_block_id_inclusive) + tx.blocks_get_all_between(start_block_id_exclusive, end_block_id_inclusive, include_dummy_blocks) } pub fn exists(&self, tx: &mut TTx) -> Result { diff --git a/dan_layer/storage/src/state_store/mod.rs b/dan_layer/storage/src/state_store/mod.rs index a226eabf16..119e747f2e 100644 --- a/dan_layer/storage/src/state_store/mod.rs +++ b/dan_layer/storage/src/state_store/mod.rs @@ -121,6 +121,7 @@ pub trait StateStoreReadTransaction { &mut self, start_block_id_exclusive: &BlockId, end_block_id_inclusive: &BlockId, + include_dummy_blocks: bool, ) -> Result, StorageError>; fn blocks_exists(&mut self, block_id: &BlockId) -> Result; fn blocks_is_ancestor(&mut self, descendant: &BlockId, ancestor: &BlockId) -> Result; diff --git a/networking/libp2p-peersync/src/error.rs b/networking/libp2p-peersync/src/error.rs index f8c9e325a4..24bbaeb539 100644 --- a/networking/libp2p-peersync/src/error.rs +++ b/networking/libp2p-peersync/src/error.rs @@ -30,8 +30,6 @@ pub enum Error { InvalidMessage { peer_id: PeerId, details: String }, #[error("Failed to decode multiaddr: {0}")] DecodeMultiaddr(#[from] multiaddr::Error), - #[error("ProtoBuf peer record has no signature")] - ProtoBufMissingSignature, #[error("Invalid signed peer receord from peer `{peer_id}`: {details}")] InvalidSignedPeer { peer_id: PeerId, details: String },