Skip to content

Commit

Permalink
fix(consensus): foreign counters minor fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
sdbondi committed Jan 22, 2024
1 parent f85608a commit c9a32e1
Show file tree
Hide file tree
Showing 17 changed files with 238 additions and 164 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 1 addition & 3 deletions applications/tari_validator_node/src/bootstrap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ use tari_dan_app_utilities::{
use tari_dan_common_types::{Epoch, NodeAddressable, NodeHeight, PeerAddress, SubstateAddress};
use tari_dan_engine::fees::FeeTable;
use tari_dan_storage::{
consensus_models::{Block, BlockId, ExecutedTransaction, ForeignReceiveCounters, SubstateRecord},
consensus_models::{Block, BlockId, ExecutedTransaction, SubstateRecord},
global::GlobalDb,
StateStore,
StateStoreReadTransaction,
Expand Down Expand Up @@ -229,7 +229,6 @@ pub async fn spawn_services(

// Consensus
let (tx_executed_transaction, rx_executed_transaction) = mpsc::channel(10);
let foreign_receive_counter = state_store.with_read_tx(|tx| ForeignReceiveCounters::get(tx))?;
let (consensus_join_handle, consensus_handle, rx_consensus_to_mempool) = consensus::spawn(
state_store.clone(),
keypair.clone(),
Expand All @@ -238,7 +237,6 @@ pub async fn spawn_services(
rx_consensus_message,
outbound_messaging.clone(),
validator_node_client_factory.clone(),
foreign_receive_counter,
shutdown.clone(),
)
.await;
Expand Down
4 changes: 1 addition & 3 deletions applications/tari_validator_node/src/consensus/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use tari_consensus::{
};
use tari_dan_common_types::committee::Committee;
use tari_dan_p2p::{Message, OutboundService};
use tari_dan_storage::consensus_models::{ForeignReceiveCounters, TransactionPool};
use tari_dan_storage::consensus_models::TransactionPool;
use tari_epoch_manager::base_layer::EpochManagerHandle;
use tari_shutdown::ShutdownSignal;
use tari_state_store_sqlite::SqliteStateStore;
Expand Down Expand Up @@ -50,7 +50,6 @@ pub async fn spawn(
rx_hs_message: mpsc::Receiver<(PeerAddress, HotstuffMessage)>,
outbound_messaging: OutboundMessaging<PeerAddress, SqliteMessageLogger>,
client_factory: TariValidatorNodeRpcClientFactory,
foreign_receive_counter: ForeignReceiveCounters,
shutdown_signal: ShutdownSignal,
) -> (
JoinHandle<Result<(), anyhow::Error>>,
Expand Down Expand Up @@ -82,7 +81,6 @@ pub async fn spawn(
tx_leader,
tx_hotstuff_events.clone(),
tx_mempool,
foreign_receive_counter,
shutdown_signal.clone(),
);

Expand Down
1 change: 1 addition & 0 deletions dan_layer/consensus/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ tari_mmr = { workspace = true }
tari_shutdown = { workspace = true }

anyhow = { workspace = true }
indexmap = { workspace = true }
async-trait = { workspace = true }
log = { workspace = true }
serde = { workspace = true, default-features = true }
Expand Down
8 changes: 6 additions & 2 deletions dan_layer/consensus/src/hotstuff/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,12 @@ pub enum ProposalValidationError {
NotSafeBlock { proposed_by: String, hash: BlockId },
#[error("Node proposed by {proposed_by} with hash {hash} is missing foreign index")]
MissingForeignCounters { proposed_by: String, hash: BlockId },
#[error("Node proposed by {proposed_by} with hash {hash} has invalid foreign counters")]
InvalidForeignCounters { proposed_by: String, hash: BlockId },
#[error("Node proposed by {proposed_by} with hash {hash} has invalid foreign counters: {details}")]
InvalidForeignCounters {
proposed_by: String,
hash: BlockId,
details: String,
},
#[error("Node proposed by {proposed_by} with hash {hash} is the genesis block")]
ProposingGenesisBlock { proposed_by: String, hash: BlockId },
#[error("Justification block {justify_block} for proposed block {block_description} by {proposed_by} not found")]
Expand Down
42 changes: 17 additions & 25 deletions dan_layer/consensus/src/hotstuff/on_propose.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

use std::{collections::BTreeSet, num::NonZeroU64, ops::DerefMut};

use indexmap::IndexMap;
use log::*;
use tari_common_types::types::PublicKey;
use tari_dan_common_types::{
Expand All @@ -26,7 +27,6 @@ use tari_dan_storage::{
TransactionPoolStage,
},
StateStore,
StateStoreWriteTransaction,
};
use tari_epoch_manager::EpochManagerReader;
use tokio::sync::mpsc;
Expand Down Expand Up @@ -109,16 +109,12 @@ where TConsensusSpec: ConsensusSpec

let validator = self.epoch_manager.get_our_validator_node(epoch).await?;
let local_committee_shard = self.epoch_manager.get_local_committee_shard(epoch).await?;
// The scope here is due to a shortcoming of rust. The tx is dropped at tx.commit() but it still complains that
// the non-Send tx could be used after the await point, which is not possible.
let next_block;
{
let mut tx = self.store.create_write_tx()?;
let high_qc = HighQc::get(&mut *tx)?;
let high_qc = high_qc.get_quorum_certificate(&mut *tx)?;
let mut foreign_counters = ForeignSendCounters::get(tx.deref_mut(), leaf_block.block_id())?;
next_block = self.build_next_block(
&mut tx,

let next_block = self.store.with_write_tx(|tx| {
let high_qc = HighQc::get(tx.deref_mut())?;
let high_qc = high_qc.get_quorum_certificate(tx.deref_mut())?;
let next_block = self.build_next_block(
tx,
epoch,
&leaf_block,
high_qc,
Expand All @@ -127,18 +123,11 @@ where TConsensusSpec: ConsensusSpec
// TODO: This just avoids issues with proposed transactions causing leader failures. Not sure if this
// is a good idea.
is_newview_propose,
&mut foreign_counters,
)?;

next_block.as_last_proposed().set(&mut tx)?;

// Get involved shards for all LocalPrepared commands in the block.
// This allows us to broadcast the proposal only to the relevant committees that would be interested in the
// LocalPrepared.
// TODO: we should never broadcast to foreign shards here. The soonest we can broadcast is once we have
// locked the block
tx.commit()?;
}
next_block.as_last_proposed().set(tx)?;
Ok::<_, HotStuffError>(next_block)
})?;

info!(
target: LOG_TARGET,
Expand Down Expand Up @@ -192,7 +181,6 @@ where TConsensusSpec: ConsensusSpec
proposed_by: PublicKey,
local_committee_shard: &CommitteeShard,
empty_block: bool,
foreign_counters: &mut ForeignSendCounters,
) -> Result<Block, HotStuffError> {
// TODO: Configure
const TARGET_BLOCK_SIZE: usize = 1000;
Expand Down Expand Up @@ -265,10 +253,14 @@ where TConsensusSpec: ConsensusSpec
local_committee_shard.shard(),
)?;

let foreign_indexes = non_local_buckets
let foreign_counters = ForeignSendCounters::get_or_default(tx, parent_block.block_id())?;
let mut foreign_indexes = non_local_buckets
.iter()
.map(|bucket| (*bucket, foreign_counters.increment_counter(*bucket)))
.collect();
.map(|bucket| (*bucket, foreign_counters.get_count(*bucket) + 1))
.collect::<IndexMap<_, _>>();

// Ensure that foreign indexes are canonically ordered
foreign_indexes.sort_keys();

let mut next_block = Block::new(
*parent_block.block_id(),
Expand Down
48 changes: 28 additions & 20 deletions dan_layer/consensus/src/hotstuff/on_receive_foreign_proposal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ pub struct OnReceiveForeignProposalHandler<TConsensusSpec: ConsensusSpec> {
epoch_manager: TConsensusSpec::EpochManager,
transaction_pool: TransactionPool<TConsensusSpec::StateStore>,
pacemaker: PaceMakerHandle,
foreign_receive_counter: ForeignReceiveCounters,
}

impl<TConsensusSpec> OnReceiveForeignProposalHandler<TConsensusSpec>
Expand All @@ -41,14 +40,12 @@ where TConsensusSpec: ConsensusSpec
epoch_manager: TConsensusSpec::EpochManager,
transaction_pool: TransactionPool<TConsensusSpec::StateStore>,
pacemaker: PaceMakerHandle,
foreign_receive_counter: ForeignReceiveCounters,
) -> Self {
Self {
store,
epoch_manager,
transaction_pool,
pacemaker,
foreign_receive_counter,
}
}

Expand All @@ -64,6 +61,10 @@ where TConsensusSpec: ConsensusSpec
from,
);

let mut foreign_receive_counter = self
.store
.with_read_tx(|tx| ForeignReceiveCounters::get_or_default(tx))?;

let vn = self.epoch_manager.get_validator_node(block.epoch(), &from).await?;
let committee_shard = self
.epoch_manager
Expand All @@ -83,11 +84,16 @@ where TConsensusSpec: ConsensusSpec
}

let local_shard = self.epoch_manager.get_local_committee_shard(block.epoch()).await?;
self.validate_proposed_block(&from, &block, committee_shard.shard(), local_shard.shard())?;
// Is this ok? Can foreign node send invalid block that should still increment the counter?
self.foreign_receive_counter.increment(&committee_shard.shard());
self.validate_proposed_block(
&from,
&block,
committee_shard.shard(),
local_shard.shard(),
&foreign_receive_counter,
)?;
foreign_receive_counter.increment(&committee_shard.shard());
self.store.with_write_tx(|tx| {
self.foreign_receive_counter.save(tx)?;
foreign_receive_counter.save(tx)?;
foreign_proposal.upsert(tx)?;
self.on_receive_foreign_block(tx, &block, &committee_shard)
})?;
Expand Down Expand Up @@ -168,24 +174,26 @@ where TConsensusSpec: ConsensusSpec
candidate_block: &Block,
foreign_bucket: Shard,
local_bucket: Shard,
foreign_receive_counter: &ForeignReceiveCounters,
) -> Result<(), ProposalValidationError> {
let incoming_index = match candidate_block.get_foreign_index(&local_bucket) {
Some(i) => *i,
None => {
debug!(target:LOG_TARGET, "Our bucket {local_bucket:?} is missing reliability index in the proposed block {candidate_block:?}");
return Err(ProposalValidationError::MissingForeignCounters {
proposed_by: from.to_string(),
hash: *candidate_block.id(),
});
},
let Some(incoming_count) = candidate_block.get_foreign_counter(&local_bucket) else {
debug!(target:LOG_TARGET, "Our bucket {local_bucket:?} is missing reliability index in the proposed block {candidate_block:?}");
return Err(ProposalValidationError::MissingForeignCounters {
proposed_by: from.to_string(),
hash: *candidate_block.id(),
});
};
let current_index = self.foreign_receive_counter.get_index(&foreign_bucket);
if current_index + 1 != incoming_index {
debug!(target:LOG_TARGET, "We were expecting the index to be {expected_index}, but the index was
{incoming_index}", expected_index = current_index + 1);
let current_count = foreign_receive_counter.get_count(&foreign_bucket);
if current_count + 1 != incoming_count {
debug!(target:LOG_TARGET, "We were expecting the index to be {expected_count}, but the index was {incoming_count}", expected_count = current_count + 1);
return Err(ProposalValidationError::InvalidForeignCounters {
proposed_by: from.to_string(),
hash: *candidate_block.id(),
details: format!(
"Expected foreign receive count to be {} but it was {}",
current_count + 1,
incoming_count
),
});
}
if candidate_block.height() == NodeHeight::zero() || candidate_block.id().is_genesis() {
Expand Down
Loading

0 comments on commit c9a32e1

Please sign in to comment.