diff --git a/Cargo.lock b/Cargo.lock index 03900eab9..f37f52a4b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8932,6 +8932,7 @@ version = "0.3.0" dependencies = [ "anyhow", "async-trait", + "indexmap 2.1.0", "log", "serde", "tari_common_types", diff --git a/applications/tari_validator_node/src/bootstrap.rs b/applications/tari_validator_node/src/bootstrap.rs index 5940df98c..5dc037dfa 100644 --- a/applications/tari_validator_node/src/bootstrap.rs +++ b/applications/tari_validator_node/src/bootstrap.rs @@ -51,7 +51,7 @@ use tari_dan_common_types::{Epoch, NodeAddressable, NodeHeight, PeerAddress, Sub use tari_dan_engine::fees::FeeTable; use tari_dan_p2p::TariMessagingSpec; use tari_dan_storage::{ - consensus_models::{Block, BlockId, ExecutedTransaction, ForeignReceiveCounters, SubstateRecord}, + consensus_models::{Block, BlockId, ExecutedTransaction, SubstateRecord}, global::GlobalDb, StateStore, StateStoreReadTransaction, @@ -225,7 +225,6 @@ pub async fn spawn_services( // Consensus let (tx_executed_transaction, rx_executed_transaction) = mpsc::channel(10); - let foreign_receive_counter = state_store.with_read_tx(|tx| ForeignReceiveCounters::get(tx))?; let local_address = PeerAddress::from(keypair.public_key().clone()); let (loopback_sender, loopback_receiver) = mpsc::unbounded_channel(); @@ -246,7 +245,6 @@ pub async fn spawn_services( inbound_messaging, outbound_messaging.clone(), validator_node_client_factory.clone(), - foreign_receive_counter, shutdown.clone(), ) .await; diff --git a/applications/tari_validator_node/src/consensus/mod.rs b/applications/tari_validator_node/src/consensus/mod.rs index f150f5eb4..83d446b42 100644 --- a/applications/tari_validator_node/src/consensus/mod.rs +++ b/applications/tari_validator_node/src/consensus/mod.rs @@ -2,7 +2,7 @@ // SPDX-License-Identifier: BSD-3-Clause use tari_consensus::hotstuff::{ConsensusWorker, ConsensusWorkerContext, HotstuffWorker}; -use tari_dan_storage::consensus_models::{ForeignReceiveCounters, TransactionPool}; +use tari_dan_storage::consensus_models::TransactionPool; use tari_epoch_manager::base_layer::EpochManagerHandle; use tari_shutdown::ShutdownSignal; use tari_state_store_sqlite::SqliteStateStore; @@ -45,7 +45,6 @@ pub async fn spawn( inbound_messaging: ConsensusInboundMessaging, outbound_messaging: ConsensusOutboundMessaging, client_factory: TariValidatorNodeRpcClientFactory, - foreign_receive_counter: ForeignReceiveCounters, shutdown_signal: ShutdownSignal, ) -> ( JoinHandle>, @@ -74,7 +73,6 @@ pub async fn spawn( transaction_pool, tx_hotstuff_events.clone(), tx_mempool, - foreign_receive_counter, shutdown_signal.clone(), ); diff --git a/dan_layer/consensus/Cargo.toml b/dan_layer/consensus/Cargo.toml index c962fbd54..526b19dbb 100644 --- a/dan_layer/consensus/Cargo.toml +++ b/dan_layer/consensus/Cargo.toml @@ -19,6 +19,7 @@ tari_mmr = { workspace = true } tari_shutdown = { workspace = true } anyhow = { workspace = true } +indexmap = { workspace = true } async-trait = { workspace = true } log = { workspace = true } serde = { workspace = true, default-features = true } diff --git a/dan_layer/consensus/src/hotstuff/error.rs b/dan_layer/consensus/src/hotstuff/error.rs index b41da0989..45a681ef1 100644 --- a/dan_layer/consensus/src/hotstuff/error.rs +++ b/dan_layer/consensus/src/hotstuff/error.rs @@ -102,8 +102,12 @@ pub enum ProposalValidationError { NotSafeBlock { proposed_by: String, hash: BlockId }, #[error("Node proposed by {proposed_by} with hash {hash} is missing foreign index")] MissingForeignCounters { proposed_by: String, hash: BlockId }, - #[error("Node proposed by {proposed_by} with hash {hash} has invalid foreign counters")] - InvalidForeignCounters { proposed_by: String, hash: BlockId }, + #[error("Node proposed by {proposed_by} with hash {hash} has invalid foreign counters: {details}")] + InvalidForeignCounters { + proposed_by: String, + hash: BlockId, + details: String, + }, #[error("Node proposed by {proposed_by} with hash {hash} is the genesis block")] ProposingGenesisBlock { proposed_by: String, hash: BlockId }, #[error("Justification block {justify_block} for proposed block {block_description} by {proposed_by} not found")] diff --git a/dan_layer/consensus/src/hotstuff/on_propose.rs b/dan_layer/consensus/src/hotstuff/on_propose.rs index 9440c4e3f..9b8efab8e 100644 --- a/dan_layer/consensus/src/hotstuff/on_propose.rs +++ b/dan_layer/consensus/src/hotstuff/on_propose.rs @@ -3,6 +3,7 @@ use std::{collections::BTreeSet, num::NonZeroU64, ops::DerefMut}; +use indexmap::IndexMap; use log::*; use tari_common_types::types::PublicKey; use tari_dan_common_types::{ @@ -26,7 +27,6 @@ use tari_dan_storage::{ TransactionPoolStage, }, StateStore, - StateStoreWriteTransaction, }; use tari_epoch_manager::EpochManagerReader; @@ -107,16 +107,12 @@ where TConsensusSpec: ConsensusSpec let validator = self.epoch_manager.get_our_validator_node(epoch).await?; let local_committee_shard = self.epoch_manager.get_local_committee_shard(epoch).await?; - // The scope here is due to a shortcoming of rust. The tx is dropped at tx.commit() but it still complains that - // the non-Send tx could be used after the await point, which is not possible. - let next_block; - { - let mut tx = self.store.create_write_tx()?; - let high_qc = HighQc::get(&mut *tx)?; - let high_qc = high_qc.get_quorum_certificate(&mut *tx)?; - let mut foreign_counters = ForeignSendCounters::get(tx.deref_mut(), leaf_block.block_id())?; - next_block = self.build_next_block( - &mut tx, + + let next_block = self.store.with_write_tx(|tx| { + let high_qc = HighQc::get(tx.deref_mut())?; + let high_qc = high_qc.get_quorum_certificate(tx.deref_mut())?; + let next_block = self.build_next_block( + tx, epoch, &leaf_block, high_qc, @@ -125,13 +121,11 @@ where TConsensusSpec: ConsensusSpec // TODO: This just avoids issues with proposed transactions causing leader failures. Not sure if this // is a good idea. is_newview_propose, - &mut foreign_counters, )?; - next_block.as_last_proposed().set(&mut tx)?; - - tx.commit()?; - } + next_block.as_last_proposed().set(tx)?; + Ok::<_, HotStuffError>(next_block) + })?; info!( target: LOG_TARGET, @@ -182,7 +176,6 @@ where TConsensusSpec: ConsensusSpec proposed_by: PublicKey, local_committee_shard: &CommitteeShard, empty_block: bool, - foreign_counters: &mut ForeignSendCounters, ) -> Result { // TODO: Configure const TARGET_BLOCK_SIZE: usize = 1000; @@ -197,17 +190,16 @@ where TConsensusSpec: ConsensusSpec let pending_proposals = ForeignProposal::get_all_pending(tx, locked_block.block_id(), parent_block.block_id())?; let commands = ForeignProposal::get_all_new(tx)? .into_iter() - .filter_map(|foreign_proposal| { - if pending_proposals.iter().any(|pending_proposal| { + .filter(|foreign_proposal| { + // If the foreign proposal is already pending, don't propose it again + !pending_proposals.iter().any(|pending_proposal| { pending_proposal.bucket == foreign_proposal.bucket && pending_proposal.block_id == foreign_proposal.block_id - }) { - None - } else { - Some(Ok(Command::ForeignProposal( - foreign_proposal.set_mined_at(parent_block.height().saturating_add(NodeHeight(1))), - ))) - } + }) + }) + .map(|mut foreign_proposal| { + foreign_proposal.set_proposed_height(parent_block.height().saturating_add(NodeHeight(1))); + Ok(Command::ForeignProposal(foreign_proposal)) }) .chain(batch.into_iter().map(|t| match t.current_stage() { // If the transaction is New, propose to Prepare it @@ -255,10 +247,14 @@ where TConsensusSpec: ConsensusSpec local_committee_shard.shard(), )?; - let foreign_indexes = non_local_buckets + let foreign_counters = ForeignSendCounters::get_or_default(tx, parent_block.block_id())?; + let mut foreign_indexes = non_local_buckets .iter() - .map(|bucket| (*bucket, foreign_counters.increment_counter(*bucket))) - .collect(); + .map(|bucket| (*bucket, foreign_counters.get_count(*bucket) + 1)) + .collect::>(); + + // Ensure that foreign indexes are canonically ordered + foreign_indexes.sort_keys(); let mut next_block = Block::new( *parent_block.block_id(), diff --git a/dan_layer/consensus/src/hotstuff/on_receive_foreign_proposal.rs b/dan_layer/consensus/src/hotstuff/on_receive_foreign_proposal.rs index 7ad3f92bd..ce7314d8d 100644 --- a/dan_layer/consensus/src/hotstuff/on_receive_foreign_proposal.rs +++ b/dan_layer/consensus/src/hotstuff/on_receive_foreign_proposal.rs @@ -30,7 +30,6 @@ pub struct OnReceiveForeignProposalHandler { epoch_manager: TConsensusSpec::EpochManager, transaction_pool: TransactionPool, pacemaker: PaceMakerHandle, - foreign_receive_counter: ForeignReceiveCounters, } impl OnReceiveForeignProposalHandler @@ -41,14 +40,12 @@ where TConsensusSpec: ConsensusSpec epoch_manager: TConsensusSpec::EpochManager, transaction_pool: TransactionPool, pacemaker: PaceMakerHandle, - foreign_receive_counter: ForeignReceiveCounters, ) -> Self { Self { store, epoch_manager, transaction_pool, pacemaker, - foreign_receive_counter, } } @@ -64,6 +61,10 @@ where TConsensusSpec: ConsensusSpec from, ); + let mut foreign_receive_counter = self + .store + .with_read_tx(|tx| ForeignReceiveCounters::get_or_default(tx))?; + let vn = self.epoch_manager.get_validator_node(block.epoch(), &from).await?; let committee_shard = self .epoch_manager @@ -83,11 +84,16 @@ where TConsensusSpec: ConsensusSpec } let local_shard = self.epoch_manager.get_local_committee_shard(block.epoch()).await?; - self.validate_proposed_block(&from, &block, committee_shard.shard(), local_shard.shard())?; - // Is this ok? Can foreign node send invalid block that should still increment the counter? - self.foreign_receive_counter.increment(&committee_shard.shard()); + self.validate_proposed_block( + &from, + &block, + committee_shard.shard(), + local_shard.shard(), + &foreign_receive_counter, + )?; + foreign_receive_counter.increment(&committee_shard.shard()); self.store.with_write_tx(|tx| { - self.foreign_receive_counter.save(tx)?; + foreign_receive_counter.save(tx)?; foreign_proposal.upsert(tx)?; self.on_receive_foreign_block(tx, &block, &committee_shard) })?; @@ -168,24 +174,26 @@ where TConsensusSpec: ConsensusSpec candidate_block: &Block, foreign_bucket: Shard, local_bucket: Shard, + foreign_receive_counter: &ForeignReceiveCounters, ) -> Result<(), ProposalValidationError> { - let incoming_index = match candidate_block.get_foreign_index(&local_bucket) { - Some(i) => *i, - None => { - debug!(target:LOG_TARGET, "Our bucket {local_bucket:?} is missing reliability index in the proposed block {candidate_block:?}"); - return Err(ProposalValidationError::MissingForeignCounters { - proposed_by: from.to_string(), - hash: *candidate_block.id(), - }); - }, + let Some(incoming_count) = candidate_block.get_foreign_counter(&local_bucket) else { + debug!(target:LOG_TARGET, "Our bucket {local_bucket:?} is missing reliability index in the proposed block {candidate_block:?}"); + return Err(ProposalValidationError::MissingForeignCounters { + proposed_by: from.to_string(), + hash: *candidate_block.id(), + }); }; - let current_index = self.foreign_receive_counter.get_index(&foreign_bucket); - if current_index + 1 != incoming_index { - debug!(target:LOG_TARGET, "We were expecting the index to be {expected_index}, but the index was - {incoming_index}", expected_index = current_index + 1); + let current_count = foreign_receive_counter.get_count(&foreign_bucket); + if current_count + 1 != incoming_count { + debug!(target:LOG_TARGET, "We were expecting the index to be {expected_count}, but the index was {incoming_count}", expected_count = current_count + 1); return Err(ProposalValidationError::InvalidForeignCounters { proposed_by: from.to_string(), hash: *candidate_block.id(), + details: format!( + "Expected foreign receive count to be {} but it was {}", + current_count + 1, + incoming_count + ), }); } if candidate_block.height() == NodeHeight::zero() || candidate_block.id().is_genesis() { diff --git a/dan_layer/consensus/src/hotstuff/on_receive_local_proposal.rs b/dan_layer/consensus/src/hotstuff/on_receive_local_proposal.rs index 9205c30b4..c9ff78384 100644 --- a/dan_layer/consensus/src/hotstuff/on_receive_local_proposal.rs +++ b/dan_layer/consensus/src/hotstuff/on_receive_local_proposal.rs @@ -5,7 +5,7 @@ // ----[foreign:LocalPrepared]--->(LocalPrepared, true) ----cmd:AllPrepare ---> (AllPrepared, true) ---cmd:Accept ---> // Complete -use std::{collections::HashMap, ops::DerefMut}; +use std::ops::DerefMut; use log::*; use tari_dan_common_types::{ @@ -101,10 +101,26 @@ impl OnReceiveLocalProposalHandler(Some((high_qc, valid_block))) + })?; + + if let Some((high_qc, valid_block)) = maybe_high_qc_and_block { self.pacemaker .update_view(valid_block.height(), high_qc.block_height()) .await?; @@ -117,74 +133,123 @@ impl OnReceiveLocalProposalHandler Result { - self.store.with_write_tx(|tx| { - valid_block.block().justify().save(tx)?; - valid_block.save_all_dummy_blocks(tx)?; - valid_block.block().save(tx)?; - let high_qc = valid_block.block().justify().update_high_qc(tx)?; - Ok(high_qc) - }) + fn save_block( + &self, + tx: &mut ::WriteTransaction<'_>, + valid_block: &ValidBlock, + ) -> Result { + valid_block.block().save_foreign_send_counters(tx)?; + valid_block.block().justify().save(tx)?; + valid_block.save_all_dummy_blocks(tx)?; + valid_block.block().save(tx)?; + let high_qc = valid_block.block().justify().update_high_qc(tx)?; + Ok(high_qc) } - async fn validate_block(&self, block: Block) -> Result, HotStuffError> { - let local_committee = self - .epoch_manager - .get_committee_by_validator_public_key(block.epoch(), block.proposed_by()) - .await?; - let local_committee_shard = self.epoch_manager.get_local_committee_shard(block.epoch()).await?; - // First save the block in one db transaction - self.store.with_write_tx(|tx| { - match self.validate_local_proposed_block(tx, block, &local_committee, &local_committee_shard) { - Ok(validated) => Ok(Some(validated)), - // Propagate this error out as sync is needed in the case where we have a valid QC but do not know the - // block - Err( - err @ HotStuffError::ProposalValidationError(ProposalValidationError::JustifyBlockNotFound { - .. - }), - ) => Err(err), - // Validation errors should not cause a FAILURE state transition - Err(HotStuffError::ProposalValidationError(err)) => { - warn!(target: LOG_TARGET, "❌ Block failed validation: {}", err); - // A bad block should not cause a FAILURE state transition - Ok(None) - }, - Err(e) => Err(e), - } - }) + fn validate_block( + &self, + tx: &mut ::ReadTransaction<'_>, + block: Block, + local_committee: &Committee, + local_committee_shard: &CommitteeShard, + ) -> Result, HotStuffError> { + match self.validate_local_proposed_block(tx, block, local_committee, local_committee_shard) { + Ok(validated) => Ok(Some(validated)), + // Propagate this error out as sync is needed in the case where we have a valid QC but do not know the + // block + Err(err @ HotStuffError::ProposalValidationError(ProposalValidationError::JustifyBlockNotFound { .. })) => { + Err(err) + }, + // Validation errors should not cause a FAILURE state transition + Err(HotStuffError::ProposalValidationError(err)) => { + warn!(target: LOG_TARGET, "❌ Block failed validation: {}", err); + // A bad block should not cause a FAILURE state transition + Ok(None) + }, + Err(e) => Err(e), + } } fn check_foreign_indexes( &self, - tx: &mut ::WriteTransaction<'_>, + tx: &mut ::ReadTransaction<'_>, num_committees: u32, - local_bucket: Shard, + local_shard: Shard, block: &Block, justify_block: &BlockId, - ) -> Result { - let mut foreign_counters = ForeignSendCounters::get(tx.deref_mut(), justify_block)?; - let non_local_buckets = proposer::get_non_local_shards(tx.deref_mut(), block, num_committees, local_bucket)?; - let mut foreign_indexes = HashMap::new(); - for non_local_bucket in non_local_buckets { - foreign_indexes - .entry(non_local_bucket) - .or_insert(foreign_counters.increment_counter(non_local_bucket)); + ) -> Result<(), HotStuffError> { + let non_local_shards = proposer::get_non_local_shards(tx, block, num_committees, local_shard)?; + let block_foreign_indexes = block.foreign_indexes(); + if block_foreign_indexes.len() != non_local_shards.len() { + return Err(ProposalValidationError::InvalidForeignCounters { + proposed_by: block.proposed_by().to_string(), + hash: *block.id(), + details: format!( + "Foreign indexes length ({}) does not match non-local shards length ({})", + block_foreign_indexes.len(), + non_local_shards.len() + ), + } + .into()); + } + + let mut foreign_counters = ForeignSendCounters::get_or_default(tx, justify_block)?; + let mut current_shard = None; + for (shard, foreign_count) in block_foreign_indexes { + if let Some(current_shard) = current_shard { + // Check ordering + if current_shard > shard { + return Err(ProposalValidationError::InvalidForeignCounters { + proposed_by: block.proposed_by().to_string(), + hash: *block.id(), + details: format!( + "Foreign indexes are not sorted by shard. Current shard: {}, shard: {}", + current_shard, shard + ), + } + .into()); + } + } + + current_shard = Some(shard); + // Check that each shard is correct + if !non_local_shards.contains(shard) { + return Err(ProposalValidationError::InvalidForeignCounters { + proposed_by: block.proposed_by().to_string(), + hash: *block.id(), + details: format!("Shard {} is not a non-local shard", shard), + } + .into()); + } + + // Check that foreign counters are correct + let expected_count = foreign_counters.increment_counter(*shard); + if *foreign_count != expected_count { + return Err(ProposalValidationError::InvalidForeignCounters { + proposed_by: block.proposed_by().to_string(), + hash: *block.id(), + details: format!( + "Foreign counter for shard {} is incorrect. Expected {}, got {}", + shard, expected_count, foreign_count + ), + } + .into()); + } } - foreign_counters.set(tx, block.id())?; - Ok(foreign_indexes == *block.get_foreign_indexes()) + + Ok(()) } /// Perform final block validations (TODO: implement all validations) /// We assume at this point that initial stateless validations have been done (in inbound messages) fn validate_local_proposed_block( &self, - tx: &mut ::WriteTransaction<'_>, + tx: &mut ::ReadTransaction<'_>, candidate_block: Block, local_committee: &Committee, local_committee_shard: &CommitteeShard, ) -> Result { - if Block::has_been_processed(tx.deref_mut(), candidate_block.id())? { + if Block::has_been_processed(tx, candidate_block.id())? { return Err(ProposalValidationError::BlockAlreadyProcessed { block_id: *candidate_block.id(), height: candidate_block.height(), @@ -193,7 +258,7 @@ impl OnReceiveLocalProposalHandler OnReceiveLocalProposalHandler OnReceiveLocalProposalHandler HotstuffWorker { transaction_pool: TransactionPool, tx_events: broadcast::Sender, tx_mempool: mpsc::UnboundedSender, - foreign_receive_counter: ForeignReceiveCounters, shutdown: ShutdownSignal, ) -> Self { let pacemaker = PaceMaker::new(); @@ -139,7 +138,6 @@ impl HotstuffWorker { epoch_manager.clone(), transaction_pool.clone(), pacemaker.clone_handle(), - foreign_receive_counter, ), on_receive_vote: OnReceiveVoteHandler::new(vote_receiver.clone()), on_receive_new_view: OnReceiveNewViewHandler::new( diff --git a/dan_layer/consensus_tests/src/support/validator/builder.rs b/dan_layer/consensus_tests/src/support/validator/builder.rs index 928d26e87..4052dee27 100644 --- a/dan_layer/consensus_tests/src/support/validator/builder.rs +++ b/dan_layer/consensus_tests/src/support/validator/builder.rs @@ -4,7 +4,7 @@ use tari_common_types::types::PublicKey; use tari_consensus::hotstuff::{ConsensusWorker, ConsensusWorkerContext, HotstuffWorker}; use tari_dan_common_types::{shard::Shard, SubstateAddress}; -use tari_dan_storage::consensus_models::{ForeignReceiveCounters, TransactionPool}; +use tari_dan_storage::consensus_models::TransactionPool; use tari_shutdown::ShutdownSignal; use tari_state_store_sqlite::SqliteStateStore; use tokio::sync::{broadcast, mpsc, watch}; @@ -110,7 +110,6 @@ impl ValidatorBuilder { transaction_pool, tx_events.clone(), tx_mempool, - ForeignReceiveCounters::default(), shutdown_signal.clone(), ); diff --git a/dan_layer/p2p/src/conversions/consensus.rs b/dan_layer/p2p/src/conversions/consensus.rs index 59c23112d..1419e9091 100644 --- a/dan_layer/p2p/src/conversions/consensus.rs +++ b/dan_layer/p2p/src/conversions/consensus.rs @@ -258,7 +258,7 @@ impl From<&tari_dan_storage::consensus_models::Block> for proto::consensus::Bloc justify: Some(value.justify().into()), total_leader_fee: value.total_leader_fee(), commands: value.commands().iter().map(Into::into).collect(), - foreign_indexes: encode(value.get_foreign_indexes()).unwrap(), + foreign_indexes: encode(value.foreign_indexes()).unwrap(), signature: value.get_signature().map(Into::into), } } @@ -363,7 +363,7 @@ impl From for proto::consensus::ForeignProposalState { fn from(value: ForeignProposalState) -> Self { match value { ForeignProposalState::New => proto::consensus::ForeignProposalState::New, - ForeignProposalState::Mined => proto::consensus::ForeignProposalState::Mined, + ForeignProposalState::Proposed => proto::consensus::ForeignProposalState::Mined, ForeignProposalState::Deleted => proto::consensus::ForeignProposalState::Deleted, } } @@ -375,7 +375,7 @@ impl TryFrom for ForeignProposalState { fn try_from(value: proto::consensus::ForeignProposalState) -> Result { match value { proto::consensus::ForeignProposalState::New => Ok(ForeignProposalState::New), - proto::consensus::ForeignProposalState::Mined => Ok(ForeignProposalState::Mined), + proto::consensus::ForeignProposalState::Mined => Ok(ForeignProposalState::Proposed), proto::consensus::ForeignProposalState::Deleted => Ok(ForeignProposalState::Deleted), proto::consensus::ForeignProposalState::UnknownState => Err(anyhow!("Foreign proposal state not provided")), } @@ -390,7 +390,7 @@ impl From<&ForeignProposal> for proto::consensus::ForeignProposal { bucket: value.bucket.as_u32(), block_id: value.block_id.as_bytes().to_vec(), state: proto::consensus::ForeignProposalState::from(value.state).into(), - mined_at: value.mined_at.map(|a| a.0).unwrap_or(0), + mined_at: value.proposed_height.map(|a| a.0).unwrap_or(0), } } } @@ -405,7 +405,7 @@ impl TryFrom for ForeignProposal { state: proto::consensus::ForeignProposalState::try_from(value.state) .map_err(|_| anyhow!("Invalid foreign proposal state value {}", value.state))? .try_into()?, - mined_at: if value.mined_at == 0 { + proposed_height: if value.mined_at == 0 { None } else { Some(NodeHeight(value.mined_at)) diff --git a/dan_layer/state_store_sqlite/migrations/2024-01-22-123711_rename_mined_at_to_proposed_at/down.sql b/dan_layer/state_store_sqlite/migrations/2024-01-22-123711_rename_mined_at_to_proposed_at/down.sql new file mode 100644 index 000000000..291a97c5c --- /dev/null +++ b/dan_layer/state_store_sqlite/migrations/2024-01-22-123711_rename_mined_at_to_proposed_at/down.sql @@ -0,0 +1 @@ +-- This file should undo anything in `up.sql` \ No newline at end of file diff --git a/dan_layer/state_store_sqlite/migrations/2024-01-22-123711_rename_mined_at_to_proposed_at/up.sql b/dan_layer/state_store_sqlite/migrations/2024-01-22-123711_rename_mined_at_to_proposed_at/up.sql new file mode 100644 index 000000000..b1a8dec56 --- /dev/null +++ b/dan_layer/state_store_sqlite/migrations/2024-01-22-123711_rename_mined_at_to_proposed_at/up.sql @@ -0,0 +1,6 @@ +ALTER TABLE foreign_proposals + RENAME COLUMN mined_at TO proposed_height; + +UPDATE foreign_proposals + SET state = 'Proposed' + WHERE state = 'Mined'; \ No newline at end of file diff --git a/dan_layer/state_store_sqlite/src/reader.rs b/dan_layer/state_store_sqlite/src/reader.rs index e3dbf7543..c91349ec1 100644 --- a/dan_layer/state_store_sqlite/src/reader.rs +++ b/dan_layer/state_store_sqlite/src/reader.rs @@ -17,7 +17,6 @@ use diesel::{ ExpressionMethods, JoinOnDsl, NullableExpressionMethods, - OptionalExtension, QueryDsl, QueryableByName, RunQueryDsl, @@ -35,6 +34,7 @@ use tari_dan_storage::{ Decision, Evidence, ForeignProposal, + ForeignProposalState, ForeignReceiveCounters, ForeignSendCounters, HighQc, @@ -399,7 +399,7 @@ impl StateStoreReadTransa use crate::schema::foreign_proposals; let foreign_proposals = foreign_proposals::table - .filter(foreign_proposals::state.eq("New")) + .filter(foreign_proposals::state.eq(ForeignProposalState::New.to_string())) .load::(self.connection()) .map_err(|e| SqliteStorageError::DieselError { operation: "foreign_proposal_get_all", @@ -444,17 +444,12 @@ impl StateStoreReadTransa let counter = foreign_send_counters::table .filter(foreign_send_counters::block_id.eq(serialize_hex(block_id))) .first::(self.connection()) - .optional() .map_err(|e| SqliteStorageError::DieselError { operation: "foreign_send_counters_get", source: e, })?; - if let Some(counter) = counter { - counter.try_into() - } else { - Ok(ForeignSendCounters::default()) - } + counter.try_into() } fn foreign_receive_counters_get(&mut self) -> Result { @@ -463,17 +458,12 @@ impl StateStoreReadTransa let counter = foreign_receive_counters::table .order_by(foreign_receive_counters::id.desc()) .first::(self.connection()) - .optional() .map_err(|e| SqliteStorageError::DieselError { operation: "foreign_receive_counters_get", source: e, })?; - if let Some(counter) = counter { - counter.try_into() - } else { - Ok(ForeignReceiveCounters::default()) - } + counter.try_into() } fn transactions_get(&mut self, tx_id: &TransactionId) -> Result { diff --git a/dan_layer/state_store_sqlite/src/schema.rs b/dan_layer/state_store_sqlite/src/schema.rs index 573b79e08..fd5643604 100644 --- a/dan_layer/state_store_sqlite/src/schema.rs +++ b/dan_layer/state_store_sqlite/src/schema.rs @@ -27,7 +27,7 @@ diesel::table! { bucket -> Integer, block_id -> Text, state -> Text, - mined_at -> Nullable, + proposed_height -> Nullable, created_at -> Timestamp, } } diff --git a/dan_layer/state_store_sqlite/src/sql_models/bookkeeping.rs b/dan_layer/state_store_sqlite/src/sql_models/bookkeeping.rs index 9e9d33db3..0d3cf17e1 100644 --- a/dan_layer/state_store_sqlite/src/sql_models/bookkeeping.rs +++ b/dan_layer/state_store_sqlite/src/sql_models/bookkeeping.rs @@ -53,7 +53,7 @@ impl TryFrom for consensus_models::ForeignProposal { bucket: Shard::from(value.bucket as u32), block_id: deserialize_hex_try_from(&value.block_id)?, state: parse_from_string(&value.state)?, - mined_at: value.mined_at.map(|mined_at| NodeHeight(mined_at as u64)), + proposed_height: value.mined_at.map(|mined_at| NodeHeight(mined_at as u64)), }) } } diff --git a/dan_layer/state_store_sqlite/src/writer.rs b/dan_layer/state_store_sqlite/src/writer.rs index 87ebd777a..f17b081b2 100644 --- a/dan_layer/state_store_sqlite/src/writer.rs +++ b/dan_layer/state_store_sqlite/src/writer.rs @@ -146,7 +146,7 @@ impl<'a, TAddr: NodeAddressable> SqliteStateStoreWriteTransaction<'a, TAddr> { parked_blocks::commands.eq(serialize_json(block.commands())?), parked_blocks::total_leader_fee.eq(block.total_leader_fee() as i64), parked_blocks::justify.eq(serialize_json(block.justify())?), - parked_blocks::foreign_indexes.eq(serialize_json(block.get_foreign_indexes())?), + parked_blocks::foreign_indexes.eq(serialize_json(block.foreign_indexes())?), ); diesel::insert_into(parked_blocks::table) @@ -192,7 +192,7 @@ impl StateStoreWriteTransaction for SqliteStateStoreWrit blocks::is_dummy.eq(block.is_dummy()), blocks::is_processed.eq(block.is_processed()), blocks::signature.eq(block.get_signature().map(serialize_json).transpose()?), - blocks::foreign_indexes.eq(serialize_json(block.get_foreign_indexes())?), + blocks::foreign_indexes.eq(serialize_json(block.foreign_indexes())?), ); diesel::insert_into(blocks::table) @@ -577,7 +577,7 @@ impl StateStoreWriteTransaction for SqliteStateStoreWrit foreign_proposals::bucket.eq(foreign_proposal.bucket.as_u32() as i32), foreign_proposals::block_id.eq(serialize_hex(foreign_proposal.block_id)), foreign_proposals::state.eq(foreign_proposal.state.to_string()), - foreign_proposals::mined_at.eq(foreign_proposal.mined_at.map(|h| h.as_u64() as i64)), + foreign_proposals::proposed_height.eq(foreign_proposal.proposed_height.map(|h| h.as_u64() as i64)), ); diesel::insert_into(foreign_proposals::table) diff --git a/dan_layer/state_store_sqlite/tests/tests.rs b/dan_layer/state_store_sqlite/tests/tests.rs index 83ec4a553..56c069462 100644 --- a/dan_layer/state_store_sqlite/tests/tests.rs +++ b/dan_layer/state_store_sqlite/tests/tests.rs @@ -30,8 +30,6 @@ fn create_tx_atom() -> TransactionAtom { mod confirm_all_transitions { - use std::collections::HashMap; - use super::*; #[test] @@ -57,7 +55,7 @@ mod confirm_all_transitions { // cannot cause a state change without any commands [Command::Prepare(atom1.clone())].into_iter().collect(), Default::default(), - HashMap::new(), + Default::default(), None, ); block1.insert(&mut tx).unwrap(); diff --git a/dan_layer/storage/src/consensus_models/block.rs b/dan_layer/storage/src/consensus_models/block.rs index 49fd9c0ac..3830d64e8 100644 --- a/dan_layer/storage/src/consensus_models/block.rs +++ b/dan_layer/storage/src/consensus_models/block.rs @@ -2,12 +2,13 @@ // SPDX-License-Identifier: BSD-3-Clause use std::{ - collections::{BTreeSet, HashMap, HashSet}, + collections::{BTreeSet, HashSet}, fmt::{Debug, Display, Formatter}, hash::Hash, ops::{DerefMut, RangeInclusive}, }; +use indexmap::IndexMap; use log::*; use serde::{Deserialize, Serialize}; use tari_common_types::types::{FixedHash, FixedHashSizeError, PublicKey}; @@ -23,7 +24,7 @@ use tari_dan_common_types::{ use tari_transaction::TransactionId; use time::PrimitiveDateTime; -use super::{ForeignProposal, QuorumCertificate, ValidatorSchnorrSignature}; +use super::{ForeignProposal, ForeignSendCounters, QuorumCertificate, ValidatorSchnorrSignature}; use crate::{ consensus_models::{ Command, @@ -69,7 +70,7 @@ pub struct Block { /// Flag that indicates that the block has been committed. is_committed: bool, /// Counter for each foreign shard for reliable broadcast. - foreign_indexes: HashMap, + foreign_indexes: IndexMap, /// Timestamp when was this stored. stored_at: Option, /// Signature of block by the proposer. @@ -85,7 +86,7 @@ impl Block { proposed_by: PublicKey, commands: BTreeSet, total_leader_fee: u64, - foreign_indexes: HashMap, + sorted_foreign_indexes: IndexMap, signature: Option, ) -> Self { let mut block = Self { @@ -102,7 +103,7 @@ impl Block { is_dummy: false, is_processed: false, is_committed: false, - foreign_indexes, + foreign_indexes: sorted_foreign_indexes, stored_at: None, signature, }; @@ -122,7 +123,7 @@ impl Block { is_dummy: bool, is_processed: bool, is_committed: bool, - foreign_indexes: HashMap, + sorted_foreign_indexes: IndexMap, signature: Option, created_at: PrimitiveDateTime, ) -> Self { @@ -140,7 +141,7 @@ impl Block { is_dummy, is_processed, is_committed, - foreign_indexes, + foreign_indexes: sorted_foreign_indexes, stored_at: Some(created_at), signature, } @@ -155,7 +156,7 @@ impl Block { PublicKey::default(), Default::default(), 0, - HashMap::new(), + IndexMap::new(), None, ) } @@ -175,7 +176,7 @@ impl Block { is_dummy: false, is_processed: false, is_committed: true, - foreign_indexes: HashMap::new(), + foreign_indexes: IndexMap::new(), stored_at: None, signature: None, } @@ -196,7 +197,7 @@ impl Block { proposed_by, Default::default(), 0, - HashMap::new(), + IndexMap::new(), None, ); block.is_dummy = true; @@ -213,7 +214,7 @@ impl Block { .chain(&self.proposed_by) .chain(&self.merkle_root) .chain(&self.commands) - .chain(&self.foreign_indexes.iter().collect::>().sort()) + .chain(&self.foreign_indexes) .result() } } @@ -326,11 +327,11 @@ impl Block { self.is_committed } - pub fn get_foreign_index(&self, bucket: &Shard) -> Option<&u64> { - self.foreign_indexes.get(bucket) + pub fn get_foreign_counter(&self, bucket: &Shard) -> Option { + self.foreign_indexes.get(bucket).copied() } - pub fn get_foreign_indexes(&self) -> &HashMap { + pub fn foreign_indexes(&self) -> &IndexMap { &self.foreign_indexes } @@ -660,6 +661,22 @@ impl Block { ); Ok(false) } + + pub fn save_foreign_send_counters(&self, tx: &mut TTx) -> Result<(), StorageError> + where + TTx: StateStoreWriteTransaction + DerefMut + ?Sized, + TTx::Target: StateStoreReadTransaction, + { + let mut counters = ForeignSendCounters::get_or_default(tx.deref_mut(), self.justify().block_id())?; + // Add counters for this block and carry over the counters from the justify block, if any + for shard in self.foreign_indexes.keys() { + counters.increment_counter(*shard); + } + if !counters.is_empty() { + counters.set(tx, self.id())?; + } + Ok(()) + } } impl Display for Block { diff --git a/dan_layer/storage/src/consensus_models/foreign_proposal.rs b/dan_layer/storage/src/consensus_models/foreign_proposal.rs index 4971bb368..71f0be579 100644 --- a/dan_layer/storage/src/consensus_models/foreign_proposal.rs +++ b/dan_layer/storage/src/consensus_models/foreign_proposal.rs @@ -16,7 +16,7 @@ use crate::{StateStoreReadTransaction, StateStoreWriteTransaction, StorageError} #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize, PartialOrd, Ord)] pub enum ForeignProposalState { New, - Mined, + Proposed, Deleted, } @@ -24,7 +24,7 @@ impl Display for ForeignProposalState { fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { match self { ForeignProposalState::New => write!(f, "New"), - ForeignProposalState::Mined => write!(f, "Mined"), + ForeignProposalState::Proposed => write!(f, "Proposed"), ForeignProposalState::Deleted => write!(f, "Deleted"), } } @@ -36,7 +36,7 @@ impl FromStr for ForeignProposalState { fn from_str(s: &str) -> Result { match s { "New" => Ok(ForeignProposalState::New), - "Mined" => Ok(ForeignProposalState::Mined), + "Proposed" => Ok(ForeignProposalState::Proposed), "Deleted" => Ok(ForeignProposalState::Deleted), _ => Err(anyhow::anyhow!("Invalid foreign proposal state {}", s)), } @@ -48,7 +48,7 @@ pub struct ForeignProposal { pub bucket: Shard, pub block_id: BlockId, pub state: ForeignProposalState, - pub mined_at: Option, + pub proposed_height: Option, } impl ForeignProposal { @@ -57,13 +57,13 @@ impl ForeignProposal { bucket, block_id, state: ForeignProposalState::New, - mined_at: None, + proposed_height: None, } } - pub fn set_mined_at(mut self, mined_at: NodeHeight) -> Self { - self.mined_at = Some(mined_at); - self.state = ForeignProposalState::Mined; + pub fn set_proposed_height(&mut self, mined_at: NodeHeight) -> &mut Self { + self.proposed_height = Some(mined_at); + self.state = ForeignProposalState::Proposed; self } } diff --git a/dan_layer/storage/src/consensus_models/foreign_receive_counters.rs b/dan_layer/storage/src/consensus_models/foreign_receive_counters.rs index 67a465a92..6c1efab49 100644 --- a/dan_layer/storage/src/consensus_models/foreign_receive_counters.rs +++ b/dan_layer/storage/src/consensus_models/foreign_receive_counters.rs @@ -3,7 +3,7 @@ use std::collections::HashMap; -use tari_dan_common_types::shard::Shard; +use tari_dan_common_types::{optional::Optional, shard::Shard}; use crate::{StateStoreReadTransaction, StateStoreWriteTransaction, StorageError}; @@ -29,8 +29,8 @@ impl ForeignReceiveCounters { *self.counters.entry(*bucket).or_default() += 1; } - // If we haven't received any messages from this shard yet, return 0 - pub fn get_index(&self, bucket: &Shard) -> u64 { + /// Returns the counter for the provided shard. If the count does not exist, 0 is returned. + pub fn get_count(&self, bucket: &Shard) -> u64 { self.counters.get(bucket).copied().unwrap_or_default() } } @@ -41,7 +41,7 @@ impl ForeignReceiveCounters { Ok(()) } - pub fn get(tx: &mut TTx) -> Result { - tx.foreign_receive_counters_get() + pub fn get_or_default(tx: &mut TTx) -> Result { + Ok(tx.foreign_receive_counters_get().optional()?.unwrap_or_default()) } } diff --git a/dan_layer/storage/src/consensus_models/foreign_send_counters.rs b/dan_layer/storage/src/consensus_models/foreign_send_counters.rs index 0b9d083bd..c763a3a22 100644 --- a/dan_layer/storage/src/consensus_models/foreign_send_counters.rs +++ b/dan_layer/storage/src/consensus_models/foreign_send_counters.rs @@ -3,7 +3,7 @@ use std::collections::HashMap; -use tari_dan_common_types::shard::Shard; +use tari_dan_common_types::{optional::Optional, shard::Shard}; use super::BlockId; use crate::{StateStoreReadTransaction, StateStoreWriteTransaction, StorageError}; @@ -26,8 +26,20 @@ impl ForeignSendCounters { } } - pub fn increment_counter(&mut self, bucket: Shard) -> u64 { - *self.counters.entry(bucket).and_modify(|v| *v += 1).or_insert(1) + pub fn increment_counter(&mut self, shard: Shard) -> u64 { + *self.counters.entry(shard).and_modify(|v| *v += 1).or_insert(1) + } + + pub fn get_count(&self, shard: Shard) -> u64 { + self.counters.get(&shard).copied().unwrap_or_default() + } + + pub fn len(&self) -> usize { + self.counters.len() + } + + pub fn is_empty(&self) -> bool { + self.counters.is_empty() } } @@ -41,10 +53,10 @@ impl ForeignSendCounters { Ok(()) } - pub fn get( + pub fn get_or_default( tx: &mut TTx, block_id: &BlockId, ) -> Result { - tx.foreign_send_counters_get(block_id) + Ok(tx.foreign_send_counters_get(block_id).optional()?.unwrap_or_default()) } }