diff --git a/Cargo.lock b/Cargo.lock index c9b0f77e3..512965d33 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8920,6 +8920,7 @@ version = "0.3.0" dependencies = [ "anyhow", "async-trait", + "indexmap 2.1.0", "log", "serde", "tari_common_types", diff --git a/applications/tari_validator_node/src/bootstrap.rs b/applications/tari_validator_node/src/bootstrap.rs index 4dd920ee9..b87f795a3 100644 --- a/applications/tari_validator_node/src/bootstrap.rs +++ b/applications/tari_validator_node/src/bootstrap.rs @@ -50,7 +50,7 @@ use tari_dan_app_utilities::{ use tari_dan_common_types::{Epoch, NodeAddressable, NodeHeight, PeerAddress, SubstateAddress}; use tari_dan_engine::fees::FeeTable; use tari_dan_storage::{ - consensus_models::{Block, BlockId, ExecutedTransaction, ForeignReceiveCounters, SubstateRecord}, + consensus_models::{Block, BlockId, ExecutedTransaction, SubstateRecord}, global::GlobalDb, StateStore, StateStoreReadTransaction, @@ -229,7 +229,6 @@ pub async fn spawn_services( // Consensus let (tx_executed_transaction, rx_executed_transaction) = mpsc::channel(10); - let foreign_receive_counter = state_store.with_read_tx(|tx| ForeignReceiveCounters::get(tx))?; let (consensus_join_handle, consensus_handle, rx_consensus_to_mempool) = consensus::spawn( state_store.clone(), keypair.clone(), @@ -238,7 +237,6 @@ pub async fn spawn_services( rx_consensus_message, outbound_messaging.clone(), validator_node_client_factory.clone(), - foreign_receive_counter, shutdown.clone(), ) .await; diff --git a/applications/tari_validator_node/src/consensus/mod.rs b/applications/tari_validator_node/src/consensus/mod.rs index 983d16f57..3fa0bacff 100644 --- a/applications/tari_validator_node/src/consensus/mod.rs +++ b/applications/tari_validator_node/src/consensus/mod.rs @@ -7,7 +7,7 @@ use tari_consensus::{ }; use tari_dan_common_types::committee::Committee; use tari_dan_p2p::{Message, OutboundService}; -use tari_dan_storage::consensus_models::{ForeignReceiveCounters, TransactionPool}; +use tari_dan_storage::consensus_models::TransactionPool; use tari_epoch_manager::base_layer::EpochManagerHandle; use tari_shutdown::ShutdownSignal; use tari_state_store_sqlite::SqliteStateStore; @@ -50,7 +50,6 @@ pub async fn spawn( rx_hs_message: mpsc::Receiver<(PeerAddress, HotstuffMessage)>, outbound_messaging: OutboundMessaging, client_factory: TariValidatorNodeRpcClientFactory, - foreign_receive_counter: ForeignReceiveCounters, shutdown_signal: ShutdownSignal, ) -> ( JoinHandle>, @@ -82,7 +81,6 @@ pub async fn spawn( tx_leader, tx_hotstuff_events.clone(), tx_mempool, - foreign_receive_counter, shutdown_signal.clone(), ); diff --git a/dan_layer/consensus/Cargo.toml b/dan_layer/consensus/Cargo.toml index c962fbd54..526b19dbb 100644 --- a/dan_layer/consensus/Cargo.toml +++ b/dan_layer/consensus/Cargo.toml @@ -19,6 +19,7 @@ tari_mmr = { workspace = true } tari_shutdown = { workspace = true } anyhow = { workspace = true } +indexmap = { workspace = true } async-trait = { workspace = true } log = { workspace = true } serde = { workspace = true, default-features = true } diff --git a/dan_layer/consensus/src/hotstuff/error.rs b/dan_layer/consensus/src/hotstuff/error.rs index bd02bb0f2..55dd47cb1 100644 --- a/dan_layer/consensus/src/hotstuff/error.rs +++ b/dan_layer/consensus/src/hotstuff/error.rs @@ -96,8 +96,12 @@ pub enum ProposalValidationError { NotSafeBlock { proposed_by: String, hash: BlockId }, #[error("Node proposed by {proposed_by} with hash {hash} is missing foreign index")] MissingForeignCounters { proposed_by: String, hash: BlockId }, - #[error("Node proposed by {proposed_by} with hash {hash} has invalid foreign counters")] - InvalidForeignCounters { proposed_by: String, hash: BlockId }, + #[error("Node proposed by {proposed_by} with hash {hash} has invalid foreign counters: {details}")] + InvalidForeignCounters { + proposed_by: String, + hash: BlockId, + details: String, + }, #[error("Node proposed by {proposed_by} with hash {hash} is the genesis block")] ProposingGenesisBlock { proposed_by: String, hash: BlockId }, #[error("Justification block {justify_block} for proposed block {block_description} by {proposed_by} not found")] diff --git a/dan_layer/consensus/src/hotstuff/on_propose.rs b/dan_layer/consensus/src/hotstuff/on_propose.rs index 743df2449..2df0ae999 100644 --- a/dan_layer/consensus/src/hotstuff/on_propose.rs +++ b/dan_layer/consensus/src/hotstuff/on_propose.rs @@ -3,6 +3,7 @@ use std::{collections::BTreeSet, num::NonZeroU64, ops::DerefMut}; +use indexmap::IndexMap; use log::*; use tari_common_types::types::PublicKey; use tari_dan_common_types::{ @@ -26,7 +27,6 @@ use tari_dan_storage::{ TransactionPoolStage, }, StateStore, - StateStoreWriteTransaction, }; use tari_epoch_manager::EpochManagerReader; use tokio::sync::mpsc; @@ -109,16 +109,12 @@ where TConsensusSpec: ConsensusSpec let validator = self.epoch_manager.get_our_validator_node(epoch).await?; let local_committee_shard = self.epoch_manager.get_local_committee_shard(epoch).await?; - // The scope here is due to a shortcoming of rust. The tx is dropped at tx.commit() but it still complains that - // the non-Send tx could be used after the await point, which is not possible. - let next_block; - { - let mut tx = self.store.create_write_tx()?; - let high_qc = HighQc::get(&mut *tx)?; - let high_qc = high_qc.get_quorum_certificate(&mut *tx)?; - let mut foreign_counters = ForeignSendCounters::get(tx.deref_mut(), leaf_block.block_id())?; - next_block = self.build_next_block( - &mut tx, + + let next_block = self.store.with_write_tx(|tx| { + let high_qc = HighQc::get(tx.deref_mut())?; + let high_qc = high_qc.get_quorum_certificate(tx.deref_mut())?; + let next_block = self.build_next_block( + tx, epoch, &leaf_block, high_qc, @@ -127,18 +123,11 @@ where TConsensusSpec: ConsensusSpec // TODO: This just avoids issues with proposed transactions causing leader failures. Not sure if this // is a good idea. is_newview_propose, - &mut foreign_counters, )?; - next_block.as_last_proposed().set(&mut tx)?; - - // Get involved shards for all LocalPrepared commands in the block. - // This allows us to broadcast the proposal only to the relevant committees that would be interested in the - // LocalPrepared. - // TODO: we should never broadcast to foreign shards here. The soonest we can broadcast is once we have - // locked the block - tx.commit()?; - } + next_block.as_last_proposed().set(tx)?; + Ok::<_, HotStuffError>(next_block) + })?; info!( target: LOG_TARGET, @@ -192,7 +181,6 @@ where TConsensusSpec: ConsensusSpec proposed_by: PublicKey, local_committee_shard: &CommitteeShard, empty_block: bool, - foreign_counters: &mut ForeignSendCounters, ) -> Result { // TODO: Configure const TARGET_BLOCK_SIZE: usize = 1000; @@ -265,10 +253,14 @@ where TConsensusSpec: ConsensusSpec local_committee_shard.shard(), )?; - let foreign_indexes = non_local_buckets + let foreign_counters = ForeignSendCounters::get_or_default(tx, parent_block.block_id())?; + let mut foreign_indexes = non_local_buckets .iter() - .map(|bucket| (*bucket, foreign_counters.increment_counter(*bucket))) - .collect(); + .map(|bucket| (*bucket, foreign_counters.get_count(*bucket) + 1)) + .collect::>(); + + // Ensure that foreign indexes are canonically ordered + foreign_indexes.sort_keys(); let mut next_block = Block::new( *parent_block.block_id(), diff --git a/dan_layer/consensus/src/hotstuff/on_receive_foreign_proposal.rs b/dan_layer/consensus/src/hotstuff/on_receive_foreign_proposal.rs index 7ad3f92bd..ce7314d8d 100644 --- a/dan_layer/consensus/src/hotstuff/on_receive_foreign_proposal.rs +++ b/dan_layer/consensus/src/hotstuff/on_receive_foreign_proposal.rs @@ -30,7 +30,6 @@ pub struct OnReceiveForeignProposalHandler { epoch_manager: TConsensusSpec::EpochManager, transaction_pool: TransactionPool, pacemaker: PaceMakerHandle, - foreign_receive_counter: ForeignReceiveCounters, } impl OnReceiveForeignProposalHandler @@ -41,14 +40,12 @@ where TConsensusSpec: ConsensusSpec epoch_manager: TConsensusSpec::EpochManager, transaction_pool: TransactionPool, pacemaker: PaceMakerHandle, - foreign_receive_counter: ForeignReceiveCounters, ) -> Self { Self { store, epoch_manager, transaction_pool, pacemaker, - foreign_receive_counter, } } @@ -64,6 +61,10 @@ where TConsensusSpec: ConsensusSpec from, ); + let mut foreign_receive_counter = self + .store + .with_read_tx(|tx| ForeignReceiveCounters::get_or_default(tx))?; + let vn = self.epoch_manager.get_validator_node(block.epoch(), &from).await?; let committee_shard = self .epoch_manager @@ -83,11 +84,16 @@ where TConsensusSpec: ConsensusSpec } let local_shard = self.epoch_manager.get_local_committee_shard(block.epoch()).await?; - self.validate_proposed_block(&from, &block, committee_shard.shard(), local_shard.shard())?; - // Is this ok? Can foreign node send invalid block that should still increment the counter? - self.foreign_receive_counter.increment(&committee_shard.shard()); + self.validate_proposed_block( + &from, + &block, + committee_shard.shard(), + local_shard.shard(), + &foreign_receive_counter, + )?; + foreign_receive_counter.increment(&committee_shard.shard()); self.store.with_write_tx(|tx| { - self.foreign_receive_counter.save(tx)?; + foreign_receive_counter.save(tx)?; foreign_proposal.upsert(tx)?; self.on_receive_foreign_block(tx, &block, &committee_shard) })?; @@ -168,24 +174,26 @@ where TConsensusSpec: ConsensusSpec candidate_block: &Block, foreign_bucket: Shard, local_bucket: Shard, + foreign_receive_counter: &ForeignReceiveCounters, ) -> Result<(), ProposalValidationError> { - let incoming_index = match candidate_block.get_foreign_index(&local_bucket) { - Some(i) => *i, - None => { - debug!(target:LOG_TARGET, "Our bucket {local_bucket:?} is missing reliability index in the proposed block {candidate_block:?}"); - return Err(ProposalValidationError::MissingForeignCounters { - proposed_by: from.to_string(), - hash: *candidate_block.id(), - }); - }, + let Some(incoming_count) = candidate_block.get_foreign_counter(&local_bucket) else { + debug!(target:LOG_TARGET, "Our bucket {local_bucket:?} is missing reliability index in the proposed block {candidate_block:?}"); + return Err(ProposalValidationError::MissingForeignCounters { + proposed_by: from.to_string(), + hash: *candidate_block.id(), + }); }; - let current_index = self.foreign_receive_counter.get_index(&foreign_bucket); - if current_index + 1 != incoming_index { - debug!(target:LOG_TARGET, "We were expecting the index to be {expected_index}, but the index was - {incoming_index}", expected_index = current_index + 1); + let current_count = foreign_receive_counter.get_count(&foreign_bucket); + if current_count + 1 != incoming_count { + debug!(target:LOG_TARGET, "We were expecting the index to be {expected_count}, but the index was {incoming_count}", expected_count = current_count + 1); return Err(ProposalValidationError::InvalidForeignCounters { proposed_by: from.to_string(), hash: *candidate_block.id(), + details: format!( + "Expected foreign receive count to be {} but it was {}", + current_count + 1, + incoming_count + ), }); } if candidate_block.height() == NodeHeight::zero() || candidate_block.id().is_genesis() { diff --git a/dan_layer/consensus/src/hotstuff/on_receive_local_proposal.rs b/dan_layer/consensus/src/hotstuff/on_receive_local_proposal.rs index 95657901f..362968e65 100644 --- a/dan_layer/consensus/src/hotstuff/on_receive_local_proposal.rs +++ b/dan_layer/consensus/src/hotstuff/on_receive_local_proposal.rs @@ -5,7 +5,7 @@ // ----[foreign:LocalPrepared]--->(LocalPrepared, true) ----cmd:AllPrepare ---> (AllPrepared, true) ---cmd:Accept ---> // Complete -use std::{collections::HashMap, ops::DerefMut}; +use std::ops::DerefMut; use log::*; use tari_dan_common_types::{ @@ -101,10 +101,26 @@ impl OnReceiveLocalProposalHandler(Some((high_qc, valid_block))) + })?; + + if let Some((high_qc, valid_block)) = maybe_high_qc_and_block { self.pacemaker .update_view(valid_block.height(), high_qc.block_height()) .await?; @@ -117,74 +133,123 @@ impl OnReceiveLocalProposalHandler Result { - self.store.with_write_tx(|tx| { - valid_block.block().justify().save(tx)?; - valid_block.save_all_dummy_blocks(tx)?; - valid_block.block().save(tx)?; - let high_qc = valid_block.block().justify().update_high_qc(tx)?; - Ok(high_qc) - }) + fn save_block( + &self, + tx: &mut ::WriteTransaction<'_>, + valid_block: &ValidBlock, + ) -> Result { + valid_block.block().save_foreign_send_counters(tx)?; + valid_block.block().justify().save(tx)?; + valid_block.save_all_dummy_blocks(tx)?; + valid_block.block().save(tx)?; + let high_qc = valid_block.block().justify().update_high_qc(tx)?; + Ok(high_qc) } - async fn validate_block(&self, block: Block) -> Result, HotStuffError> { - let local_committee = self - .epoch_manager - .get_committee_by_validator_public_key(block.epoch(), block.proposed_by()) - .await?; - let local_committee_shard = self.epoch_manager.get_local_committee_shard(block.epoch()).await?; - // First save the block in one db transaction - self.store.with_write_tx(|tx| { - match self.validate_local_proposed_block(tx, block, &local_committee, &local_committee_shard) { - Ok(validated) => Ok(Some(validated)), - // Propagate this error out as sync is needed in the case where we have a valid QC but do not know the - // block - Err( - err @ HotStuffError::ProposalValidationError(ProposalValidationError::JustifyBlockNotFound { - .. - }), - ) => Err(err), - // Validation errors should not cause a FAILURE state transition - Err(HotStuffError::ProposalValidationError(err)) => { - warn!(target: LOG_TARGET, "❌ Block failed validation: {}", err); - // A bad block should not cause a FAILURE state transition - Ok(None) - }, - Err(e) => Err(e), - } - }) + fn validate_block( + &self, + tx: &mut ::ReadTransaction<'_>, + block: Block, + local_committee: &Committee, + local_committee_shard: &CommitteeShard, + ) -> Result, HotStuffError> { + match self.validate_local_proposed_block(tx, block, local_committee, local_committee_shard) { + Ok(validated) => Ok(Some(validated)), + // Propagate this error out as sync is needed in the case where we have a valid QC but do not know the + // block + Err(err @ HotStuffError::ProposalValidationError(ProposalValidationError::JustifyBlockNotFound { .. })) => { + Err(err) + }, + // Validation errors should not cause a FAILURE state transition + Err(HotStuffError::ProposalValidationError(err)) => { + warn!(target: LOG_TARGET, "❌ Block failed validation: {}", err); + // A bad block should not cause a FAILURE state transition + Ok(None) + }, + Err(e) => Err(e), + } } fn check_foreign_indexes( &self, - tx: &mut ::WriteTransaction<'_>, + tx: &mut ::ReadTransaction<'_>, num_committees: u32, - local_bucket: Shard, + local_shard: Shard, block: &Block, justify_block: &BlockId, - ) -> Result { - let mut foreign_counters = ForeignSendCounters::get(tx.deref_mut(), justify_block)?; - let non_local_buckets = proposer::get_non_local_shards(tx.deref_mut(), block, num_committees, local_bucket)?; - let mut foreign_indexes = HashMap::new(); - for non_local_bucket in non_local_buckets { - foreign_indexes - .entry(non_local_bucket) - .or_insert(foreign_counters.increment_counter(non_local_bucket)); + ) -> Result<(), HotStuffError> { + let non_local_shards = proposer::get_non_local_shards(tx, block, num_committees, local_shard)?; + let block_foreign_indexes = block.foreign_indexes(); + if block_foreign_indexes.len() != non_local_shards.len() { + return Err(ProposalValidationError::InvalidForeignCounters { + proposed_by: block.proposed_by().to_string(), + hash: *block.id(), + details: format!( + "Foreign indexes length ({}) does not match non-local shards length ({})", + block_foreign_indexes.len(), + non_local_shards.len() + ), + } + .into()); + } + + let mut foreign_counters = ForeignSendCounters::get_or_default(tx, justify_block)?; + let mut current_shard = None; + for (shard, foreign_count) in block_foreign_indexes { + if let Some(current_shard) = current_shard { + // Check ordering + if current_shard > shard { + return Err(ProposalValidationError::InvalidForeignCounters { + proposed_by: block.proposed_by().to_string(), + hash: *block.id(), + details: format!( + "Foreign indexes are not sorted by shard. Current shard: {}, shard: {}", + current_shard, shard + ), + } + .into()); + } + } + + current_shard = Some(shard); + // Check that each shard is correct + if !non_local_shards.contains(shard) { + return Err(ProposalValidationError::InvalidForeignCounters { + proposed_by: block.proposed_by().to_string(), + hash: *block.id(), + details: format!("Shard {} is not a non-local shard", shard), + } + .into()); + } + + // Check that foreign counters are correct + let expected_count = foreign_counters.increment_counter(*shard); + if *foreign_count != expected_count { + return Err(ProposalValidationError::InvalidForeignCounters { + proposed_by: block.proposed_by().to_string(), + hash: *block.id(), + details: format!( + "Foreign counter for shard {} is incorrect. Expected {}, got {}", + shard, expected_count, foreign_count + ), + } + .into()); + } } - foreign_counters.set(tx, block.id())?; - Ok(foreign_indexes == *block.get_foreign_indexes()) + + Ok(()) } /// Perform final block validations (TODO: implement all validations) /// We assume at this point that initial stateless validations have been done (in inbound messages) fn validate_local_proposed_block( &self, - tx: &mut ::WriteTransaction<'_>, + tx: &mut ::ReadTransaction<'_>, candidate_block: Block, local_committee: &Committee, local_committee_shard: &CommitteeShard, ) -> Result { - if Block::has_been_processed(tx.deref_mut(), candidate_block.id())? { + if Block::has_been_processed(tx, candidate_block.id())? { return Err(ProposalValidationError::BlockAlreadyProcessed { block_id: *candidate_block.id(), height: candidate_block.height(), @@ -193,7 +258,7 @@ impl OnReceiveLocalProposalHandler OnReceiveLocalProposalHandler OnReceiveLocalProposalHandler HotstuffWorker { tx_leader: mpsc::Sender<(TConsensusSpec::Addr, HotstuffMessage)>, tx_events: broadcast::Sender, tx_mempool: mpsc::UnboundedSender, - foreign_receive_counter: ForeignReceiveCounters, shutdown: ShutdownSignal, ) -> Self { let pacemaker = PaceMaker::new(); @@ -101,6 +100,7 @@ impl HotstuffWorker { ); let proposer = Proposer::::new(state_store.clone(), epoch_manager.clone(), tx_broadcast.clone()); + Self { validator_addr: validator_addr.clone(), tx_events: tx_events.clone(), @@ -141,7 +141,6 @@ impl HotstuffWorker { epoch_manager.clone(), transaction_pool.clone(), pacemaker.clone_handle(), - foreign_receive_counter, ), on_receive_vote: OnReceiveVoteHandler::new(vote_receiver.clone()), on_receive_new_view: OnReceiveNewViewHandler::new( diff --git a/dan_layer/consensus_tests/src/support/validator/builder.rs b/dan_layer/consensus_tests/src/support/validator/builder.rs index fcf5cb940..4f22d9d93 100644 --- a/dan_layer/consensus_tests/src/support/validator/builder.rs +++ b/dan_layer/consensus_tests/src/support/validator/builder.rs @@ -4,7 +4,7 @@ use tari_common_types::types::PublicKey; use tari_consensus::hotstuff::{ConsensusWorker, ConsensusWorkerContext, HotstuffWorker}; use tari_dan_common_types::{shard::Shard, SubstateAddress}; -use tari_dan_storage::consensus_models::{ForeignReceiveCounters, TransactionPool}; +use tari_dan_storage::consensus_models::TransactionPool; use tari_shutdown::ShutdownSignal; use tari_state_store_sqlite::SqliteStateStore; use tokio::sync::{broadcast, mpsc, watch}; @@ -107,7 +107,6 @@ impl ValidatorBuilder { tx_leader, tx_events.clone(), tx_mempool, - ForeignReceiveCounters::default(), shutdown_signal.clone(), ); diff --git a/dan_layer/state_store_sqlite/src/reader.rs b/dan_layer/state_store_sqlite/src/reader.rs index e3dbf7543..a6967bc6a 100644 --- a/dan_layer/state_store_sqlite/src/reader.rs +++ b/dan_layer/state_store_sqlite/src/reader.rs @@ -17,7 +17,6 @@ use diesel::{ ExpressionMethods, JoinOnDsl, NullableExpressionMethods, - OptionalExtension, QueryDsl, QueryableByName, RunQueryDsl, @@ -444,17 +443,12 @@ impl StateStoreReadTransa let counter = foreign_send_counters::table .filter(foreign_send_counters::block_id.eq(serialize_hex(block_id))) .first::(self.connection()) - .optional() .map_err(|e| SqliteStorageError::DieselError { operation: "foreign_send_counters_get", source: e, })?; - if let Some(counter) = counter { - counter.try_into() - } else { - Ok(ForeignSendCounters::default()) - } + counter.try_into() } fn foreign_receive_counters_get(&mut self) -> Result { @@ -463,17 +457,12 @@ impl StateStoreReadTransa let counter = foreign_receive_counters::table .order_by(foreign_receive_counters::id.desc()) .first::(self.connection()) - .optional() .map_err(|e| SqliteStorageError::DieselError { operation: "foreign_receive_counters_get", source: e, })?; - if let Some(counter) = counter { - counter.try_into() - } else { - Ok(ForeignReceiveCounters::default()) - } + counter.try_into() } fn transactions_get(&mut self, tx_id: &TransactionId) -> Result { diff --git a/dan_layer/state_store_sqlite/src/writer.rs b/dan_layer/state_store_sqlite/src/writer.rs index 87ebd777a..2fd43fe70 100644 --- a/dan_layer/state_store_sqlite/src/writer.rs +++ b/dan_layer/state_store_sqlite/src/writer.rs @@ -146,7 +146,7 @@ impl<'a, TAddr: NodeAddressable> SqliteStateStoreWriteTransaction<'a, TAddr> { parked_blocks::commands.eq(serialize_json(block.commands())?), parked_blocks::total_leader_fee.eq(block.total_leader_fee() as i64), parked_blocks::justify.eq(serialize_json(block.justify())?), - parked_blocks::foreign_indexes.eq(serialize_json(block.get_foreign_indexes())?), + parked_blocks::foreign_indexes.eq(serialize_json(block.foreign_indexes())?), ); diesel::insert_into(parked_blocks::table) @@ -192,7 +192,7 @@ impl StateStoreWriteTransaction for SqliteStateStoreWrit blocks::is_dummy.eq(block.is_dummy()), blocks::is_processed.eq(block.is_processed()), blocks::signature.eq(block.get_signature().map(serialize_json).transpose()?), - blocks::foreign_indexes.eq(serialize_json(block.get_foreign_indexes())?), + blocks::foreign_indexes.eq(serialize_json(block.foreign_indexes())?), ); diesel::insert_into(blocks::table) diff --git a/dan_layer/state_store_sqlite/tests/tests.rs b/dan_layer/state_store_sqlite/tests/tests.rs index 83ec4a553..56c069462 100644 --- a/dan_layer/state_store_sqlite/tests/tests.rs +++ b/dan_layer/state_store_sqlite/tests/tests.rs @@ -30,8 +30,6 @@ fn create_tx_atom() -> TransactionAtom { mod confirm_all_transitions { - use std::collections::HashMap; - use super::*; #[test] @@ -57,7 +55,7 @@ mod confirm_all_transitions { // cannot cause a state change without any commands [Command::Prepare(atom1.clone())].into_iter().collect(), Default::default(), - HashMap::new(), + Default::default(), None, ); block1.insert(&mut tx).unwrap(); diff --git a/dan_layer/storage/src/consensus_models/block.rs b/dan_layer/storage/src/consensus_models/block.rs index 49fd9c0ac..3830d64e8 100644 --- a/dan_layer/storage/src/consensus_models/block.rs +++ b/dan_layer/storage/src/consensus_models/block.rs @@ -2,12 +2,13 @@ // SPDX-License-Identifier: BSD-3-Clause use std::{ - collections::{BTreeSet, HashMap, HashSet}, + collections::{BTreeSet, HashSet}, fmt::{Debug, Display, Formatter}, hash::Hash, ops::{DerefMut, RangeInclusive}, }; +use indexmap::IndexMap; use log::*; use serde::{Deserialize, Serialize}; use tari_common_types::types::{FixedHash, FixedHashSizeError, PublicKey}; @@ -23,7 +24,7 @@ use tari_dan_common_types::{ use tari_transaction::TransactionId; use time::PrimitiveDateTime; -use super::{ForeignProposal, QuorumCertificate, ValidatorSchnorrSignature}; +use super::{ForeignProposal, ForeignSendCounters, QuorumCertificate, ValidatorSchnorrSignature}; use crate::{ consensus_models::{ Command, @@ -69,7 +70,7 @@ pub struct Block { /// Flag that indicates that the block has been committed. is_committed: bool, /// Counter for each foreign shard for reliable broadcast. - foreign_indexes: HashMap, + foreign_indexes: IndexMap, /// Timestamp when was this stored. stored_at: Option, /// Signature of block by the proposer. @@ -85,7 +86,7 @@ impl Block { proposed_by: PublicKey, commands: BTreeSet, total_leader_fee: u64, - foreign_indexes: HashMap, + sorted_foreign_indexes: IndexMap, signature: Option, ) -> Self { let mut block = Self { @@ -102,7 +103,7 @@ impl Block { is_dummy: false, is_processed: false, is_committed: false, - foreign_indexes, + foreign_indexes: sorted_foreign_indexes, stored_at: None, signature, }; @@ -122,7 +123,7 @@ impl Block { is_dummy: bool, is_processed: bool, is_committed: bool, - foreign_indexes: HashMap, + sorted_foreign_indexes: IndexMap, signature: Option, created_at: PrimitiveDateTime, ) -> Self { @@ -140,7 +141,7 @@ impl Block { is_dummy, is_processed, is_committed, - foreign_indexes, + foreign_indexes: sorted_foreign_indexes, stored_at: Some(created_at), signature, } @@ -155,7 +156,7 @@ impl Block { PublicKey::default(), Default::default(), 0, - HashMap::new(), + IndexMap::new(), None, ) } @@ -175,7 +176,7 @@ impl Block { is_dummy: false, is_processed: false, is_committed: true, - foreign_indexes: HashMap::new(), + foreign_indexes: IndexMap::new(), stored_at: None, signature: None, } @@ -196,7 +197,7 @@ impl Block { proposed_by, Default::default(), 0, - HashMap::new(), + IndexMap::new(), None, ); block.is_dummy = true; @@ -213,7 +214,7 @@ impl Block { .chain(&self.proposed_by) .chain(&self.merkle_root) .chain(&self.commands) - .chain(&self.foreign_indexes.iter().collect::>().sort()) + .chain(&self.foreign_indexes) .result() } } @@ -326,11 +327,11 @@ impl Block { self.is_committed } - pub fn get_foreign_index(&self, bucket: &Shard) -> Option<&u64> { - self.foreign_indexes.get(bucket) + pub fn get_foreign_counter(&self, bucket: &Shard) -> Option { + self.foreign_indexes.get(bucket).copied() } - pub fn get_foreign_indexes(&self) -> &HashMap { + pub fn foreign_indexes(&self) -> &IndexMap { &self.foreign_indexes } @@ -660,6 +661,22 @@ impl Block { ); Ok(false) } + + pub fn save_foreign_send_counters(&self, tx: &mut TTx) -> Result<(), StorageError> + where + TTx: StateStoreWriteTransaction + DerefMut + ?Sized, + TTx::Target: StateStoreReadTransaction, + { + let mut counters = ForeignSendCounters::get_or_default(tx.deref_mut(), self.justify().block_id())?; + // Add counters for this block and carry over the counters from the justify block, if any + for shard in self.foreign_indexes.keys() { + counters.increment_counter(*shard); + } + if !counters.is_empty() { + counters.set(tx, self.id())?; + } + Ok(()) + } } impl Display for Block { diff --git a/dan_layer/storage/src/consensus_models/foreign_receive_counters.rs b/dan_layer/storage/src/consensus_models/foreign_receive_counters.rs index 67a465a92..6c1efab49 100644 --- a/dan_layer/storage/src/consensus_models/foreign_receive_counters.rs +++ b/dan_layer/storage/src/consensus_models/foreign_receive_counters.rs @@ -3,7 +3,7 @@ use std::collections::HashMap; -use tari_dan_common_types::shard::Shard; +use tari_dan_common_types::{optional::Optional, shard::Shard}; use crate::{StateStoreReadTransaction, StateStoreWriteTransaction, StorageError}; @@ -29,8 +29,8 @@ impl ForeignReceiveCounters { *self.counters.entry(*bucket).or_default() += 1; } - // If we haven't received any messages from this shard yet, return 0 - pub fn get_index(&self, bucket: &Shard) -> u64 { + /// Returns the counter for the provided shard. If the count does not exist, 0 is returned. + pub fn get_count(&self, bucket: &Shard) -> u64 { self.counters.get(bucket).copied().unwrap_or_default() } } @@ -41,7 +41,7 @@ impl ForeignReceiveCounters { Ok(()) } - pub fn get(tx: &mut TTx) -> Result { - tx.foreign_receive_counters_get() + pub fn get_or_default(tx: &mut TTx) -> Result { + Ok(tx.foreign_receive_counters_get().optional()?.unwrap_or_default()) } } diff --git a/dan_layer/storage/src/consensus_models/foreign_send_counters.rs b/dan_layer/storage/src/consensus_models/foreign_send_counters.rs index 0b9d083bd..c763a3a22 100644 --- a/dan_layer/storage/src/consensus_models/foreign_send_counters.rs +++ b/dan_layer/storage/src/consensus_models/foreign_send_counters.rs @@ -3,7 +3,7 @@ use std::collections::HashMap; -use tari_dan_common_types::shard::Shard; +use tari_dan_common_types::{optional::Optional, shard::Shard}; use super::BlockId; use crate::{StateStoreReadTransaction, StateStoreWriteTransaction, StorageError}; @@ -26,8 +26,20 @@ impl ForeignSendCounters { } } - pub fn increment_counter(&mut self, bucket: Shard) -> u64 { - *self.counters.entry(bucket).and_modify(|v| *v += 1).or_insert(1) + pub fn increment_counter(&mut self, shard: Shard) -> u64 { + *self.counters.entry(shard).and_modify(|v| *v += 1).or_insert(1) + } + + pub fn get_count(&self, shard: Shard) -> u64 { + self.counters.get(&shard).copied().unwrap_or_default() + } + + pub fn len(&self) -> usize { + self.counters.len() + } + + pub fn is_empty(&self) -> bool { + self.counters.is_empty() } } @@ -41,10 +53,10 @@ impl ForeignSendCounters { Ok(()) } - pub fn get( + pub fn get_or_default( tx: &mut TTx, block_id: &BlockId, ) -> Result { - tx.foreign_send_counters_get(block_id) + Ok(tx.foreign_send_counters_get(block_id).optional()?.unwrap_or_default()) } } diff --git a/dan_layer/validator_node_rpc/src/conversions/consensus.rs b/dan_layer/validator_node_rpc/src/conversions/consensus.rs index 59c23112d..d466e7a03 100644 --- a/dan_layer/validator_node_rpc/src/conversions/consensus.rs +++ b/dan_layer/validator_node_rpc/src/conversions/consensus.rs @@ -258,7 +258,7 @@ impl From<&tari_dan_storage::consensus_models::Block> for proto::consensus::Bloc justify: Some(value.justify().into()), total_leader_fee: value.total_leader_fee(), commands: value.commands().iter().map(Into::into).collect(), - foreign_indexes: encode(value.get_foreign_indexes()).unwrap(), + foreign_indexes: encode(value.foreign_indexes()).unwrap(), signature: value.get_signature().map(Into::into), } }