Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(consensus): minor foreign counters fixes #911

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 1 addition & 3 deletions applications/tari_validator_node/src/bootstrap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ use tari_dan_common_types::{Epoch, NodeAddressable, NodeHeight, PeerAddress, Sub
use tari_dan_engine::fees::FeeTable;
use tari_dan_p2p::TariMessagingSpec;
use tari_dan_storage::{
consensus_models::{Block, BlockId, ExecutedTransaction, ForeignReceiveCounters, SubstateRecord},
consensus_models::{Block, BlockId, ExecutedTransaction, SubstateRecord},
global::GlobalDb,
StateStore,
StateStoreReadTransaction,
Expand Down Expand Up @@ -225,7 +225,6 @@ pub async fn spawn_services(

// Consensus
let (tx_executed_transaction, rx_executed_transaction) = mpsc::channel(10);
let foreign_receive_counter = state_store.with_read_tx(|tx| ForeignReceiveCounters::get(tx))?;

let local_address = PeerAddress::from(keypair.public_key().clone());
let (loopback_sender, loopback_receiver) = mpsc::unbounded_channel();
Expand All @@ -246,7 +245,6 @@ pub async fn spawn_services(
inbound_messaging,
outbound_messaging.clone(),
validator_node_client_factory.clone(),
foreign_receive_counter,
shutdown.clone(),
)
.await;
Expand Down
4 changes: 1 addition & 3 deletions applications/tari_validator_node/src/consensus/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
// SPDX-License-Identifier: BSD-3-Clause

use tari_consensus::hotstuff::{ConsensusWorker, ConsensusWorkerContext, HotstuffWorker};
use tari_dan_storage::consensus_models::{ForeignReceiveCounters, TransactionPool};
use tari_dan_storage::consensus_models::TransactionPool;
use tari_epoch_manager::base_layer::EpochManagerHandle;
use tari_shutdown::ShutdownSignal;
use tari_state_store_sqlite::SqliteStateStore;
Expand Down Expand Up @@ -45,7 +45,6 @@ pub async fn spawn(
inbound_messaging: ConsensusInboundMessaging<SqliteMessageLogger>,
outbound_messaging: ConsensusOutboundMessaging<SqliteMessageLogger>,
client_factory: TariValidatorNodeRpcClientFactory,
foreign_receive_counter: ForeignReceiveCounters,
shutdown_signal: ShutdownSignal,
) -> (
JoinHandle<Result<(), anyhow::Error>>,
Expand Down Expand Up @@ -74,7 +73,6 @@ pub async fn spawn(
transaction_pool,
tx_hotstuff_events.clone(),
tx_mempool,
foreign_receive_counter,
shutdown_signal.clone(),
);

Expand Down
1 change: 1 addition & 0 deletions dan_layer/consensus/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ tari_mmr = { workspace = true }
tari_shutdown = { workspace = true }

anyhow = { workspace = true }
indexmap = { workspace = true }
async-trait = { workspace = true }
log = { workspace = true }
serde = { workspace = true, default-features = true }
Expand Down
8 changes: 6 additions & 2 deletions dan_layer/consensus/src/hotstuff/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,8 +102,12 @@ pub enum ProposalValidationError {
NotSafeBlock { proposed_by: String, hash: BlockId },
#[error("Node proposed by {proposed_by} with hash {hash} is missing foreign index")]
MissingForeignCounters { proposed_by: String, hash: BlockId },
#[error("Node proposed by {proposed_by} with hash {hash} has invalid foreign counters")]
InvalidForeignCounters { proposed_by: String, hash: BlockId },
#[error("Node proposed by {proposed_by} with hash {hash} has invalid foreign counters: {details}")]
InvalidForeignCounters {
proposed_by: String,
hash: BlockId,
details: String,
},
#[error("Node proposed by {proposed_by} with hash {hash} is the genesis block")]
ProposingGenesisBlock { proposed_by: String, hash: BlockId },
#[error("Justification block {justify_block} for proposed block {block_description} by {proposed_by} not found")]
Expand Down
54 changes: 25 additions & 29 deletions dan_layer/consensus/src/hotstuff/on_propose.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

use std::{collections::BTreeSet, num::NonZeroU64, ops::DerefMut};

use indexmap::IndexMap;
use log::*;
use tari_common_types::types::PublicKey;
use tari_dan_common_types::{
Expand All @@ -26,7 +27,6 @@ use tari_dan_storage::{
TransactionPoolStage,
},
StateStore,
StateStoreWriteTransaction,
};
use tari_epoch_manager::EpochManagerReader;

Expand Down Expand Up @@ -107,16 +107,12 @@ where TConsensusSpec: ConsensusSpec

let validator = self.epoch_manager.get_our_validator_node(epoch).await?;
let local_committee_shard = self.epoch_manager.get_local_committee_shard(epoch).await?;
// The scope here is due to a shortcoming of rust. The tx is dropped at tx.commit() but it still complains that
// the non-Send tx could be used after the await point, which is not possible.
let next_block;
{
let mut tx = self.store.create_write_tx()?;
let high_qc = HighQc::get(&mut *tx)?;
let high_qc = high_qc.get_quorum_certificate(&mut *tx)?;
let mut foreign_counters = ForeignSendCounters::get(tx.deref_mut(), leaf_block.block_id())?;
next_block = self.build_next_block(
&mut tx,

let next_block = self.store.with_write_tx(|tx| {
let high_qc = HighQc::get(tx.deref_mut())?;
let high_qc = high_qc.get_quorum_certificate(tx.deref_mut())?;
let next_block = self.build_next_block(
tx,
epoch,
&leaf_block,
high_qc,
Expand All @@ -125,13 +121,11 @@ where TConsensusSpec: ConsensusSpec
// TODO: This just avoids issues with proposed transactions causing leader failures. Not sure if this
// is a good idea.
is_newview_propose,
&mut foreign_counters,
)?;

next_block.as_last_proposed().set(&mut tx)?;

tx.commit()?;
}
next_block.as_last_proposed().set(tx)?;
Ok::<_, HotStuffError>(next_block)
})?;

info!(
target: LOG_TARGET,
Expand Down Expand Up @@ -182,7 +176,6 @@ where TConsensusSpec: ConsensusSpec
proposed_by: PublicKey,
local_committee_shard: &CommitteeShard,
empty_block: bool,
foreign_counters: &mut ForeignSendCounters,
) -> Result<Block, HotStuffError> {
// TODO: Configure
const TARGET_BLOCK_SIZE: usize = 1000;
Expand All @@ -197,17 +190,16 @@ where TConsensusSpec: ConsensusSpec
let pending_proposals = ForeignProposal::get_all_pending(tx, locked_block.block_id(), parent_block.block_id())?;
let commands = ForeignProposal::get_all_new(tx)?
.into_iter()
.filter_map(|foreign_proposal| {
if pending_proposals.iter().any(|pending_proposal| {
.filter(|foreign_proposal| {
// If the foreign proposal is already pending, don't propose it again
!pending_proposals.iter().any(|pending_proposal| {
pending_proposal.bucket == foreign_proposal.bucket &&
pending_proposal.block_id == foreign_proposal.block_id
}) {
None
} else {
Some(Ok(Command::ForeignProposal(
foreign_proposal.set_mined_at(parent_block.height().saturating_add(NodeHeight(1))),
)))
}
})
})
.map(|mut foreign_proposal| {
foreign_proposal.set_proposed_height(parent_block.height().saturating_add(NodeHeight(1)));
Ok(Command::ForeignProposal(foreign_proposal))
})
.chain(batch.into_iter().map(|t| match t.current_stage() {
// If the transaction is New, propose to Prepare it
Expand Down Expand Up @@ -255,10 +247,14 @@ where TConsensusSpec: ConsensusSpec
local_committee_shard.shard(),
)?;

let foreign_indexes = non_local_buckets
let foreign_counters = ForeignSendCounters::get_or_default(tx, parent_block.block_id())?;
let mut foreign_indexes = non_local_buckets
.iter()
.map(|bucket| (*bucket, foreign_counters.increment_counter(*bucket)))
.collect();
.map(|bucket| (*bucket, foreign_counters.get_count(*bucket) + 1))
.collect::<IndexMap<_, _>>();

// Ensure that foreign indexes are canonically ordered
foreign_indexes.sort_keys();

let mut next_block = Block::new(
*parent_block.block_id(),
Expand Down
48 changes: 28 additions & 20 deletions dan_layer/consensus/src/hotstuff/on_receive_foreign_proposal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ pub struct OnReceiveForeignProposalHandler<TConsensusSpec: ConsensusSpec> {
epoch_manager: TConsensusSpec::EpochManager,
transaction_pool: TransactionPool<TConsensusSpec::StateStore>,
pacemaker: PaceMakerHandle,
foreign_receive_counter: ForeignReceiveCounters,
}

impl<TConsensusSpec> OnReceiveForeignProposalHandler<TConsensusSpec>
Expand All @@ -41,14 +40,12 @@ where TConsensusSpec: ConsensusSpec
epoch_manager: TConsensusSpec::EpochManager,
transaction_pool: TransactionPool<TConsensusSpec::StateStore>,
pacemaker: PaceMakerHandle,
foreign_receive_counter: ForeignReceiveCounters,
) -> Self {
Self {
store,
epoch_manager,
transaction_pool,
pacemaker,
foreign_receive_counter,
}
}

Expand All @@ -64,6 +61,10 @@ where TConsensusSpec: ConsensusSpec
from,
);

let mut foreign_receive_counter = self
.store
.with_read_tx(|tx| ForeignReceiveCounters::get_or_default(tx))?;

let vn = self.epoch_manager.get_validator_node(block.epoch(), &from).await?;
let committee_shard = self
.epoch_manager
Expand All @@ -83,11 +84,16 @@ where TConsensusSpec: ConsensusSpec
}

let local_shard = self.epoch_manager.get_local_committee_shard(block.epoch()).await?;
self.validate_proposed_block(&from, &block, committee_shard.shard(), local_shard.shard())?;
// Is this ok? Can foreign node send invalid block that should still increment the counter?
self.foreign_receive_counter.increment(&committee_shard.shard());
self.validate_proposed_block(
&from,
&block,
committee_shard.shard(),
local_shard.shard(),
&foreign_receive_counter,
)?;
foreign_receive_counter.increment(&committee_shard.shard());
self.store.with_write_tx(|tx| {
self.foreign_receive_counter.save(tx)?;
foreign_receive_counter.save(tx)?;
foreign_proposal.upsert(tx)?;
self.on_receive_foreign_block(tx, &block, &committee_shard)
})?;
Expand Down Expand Up @@ -168,24 +174,26 @@ where TConsensusSpec: ConsensusSpec
candidate_block: &Block,
foreign_bucket: Shard,
local_bucket: Shard,
foreign_receive_counter: &ForeignReceiveCounters,
) -> Result<(), ProposalValidationError> {
let incoming_index = match candidate_block.get_foreign_index(&local_bucket) {
Some(i) => *i,
None => {
debug!(target:LOG_TARGET, "Our bucket {local_bucket:?} is missing reliability index in the proposed block {candidate_block:?}");
return Err(ProposalValidationError::MissingForeignCounters {
proposed_by: from.to_string(),
hash: *candidate_block.id(),
});
},
let Some(incoming_count) = candidate_block.get_foreign_counter(&local_bucket) else {
debug!(target:LOG_TARGET, "Our bucket {local_bucket:?} is missing reliability index in the proposed block {candidate_block:?}");
return Err(ProposalValidationError::MissingForeignCounters {
proposed_by: from.to_string(),
hash: *candidate_block.id(),
});
};
let current_index = self.foreign_receive_counter.get_index(&foreign_bucket);
if current_index + 1 != incoming_index {
debug!(target:LOG_TARGET, "We were expecting the index to be {expected_index}, but the index was
{incoming_index}", expected_index = current_index + 1);
let current_count = foreign_receive_counter.get_count(&foreign_bucket);
if current_count + 1 != incoming_count {
debug!(target:LOG_TARGET, "We were expecting the index to be {expected_count}, but the index was {incoming_count}", expected_count = current_count + 1);
return Err(ProposalValidationError::InvalidForeignCounters {
proposed_by: from.to_string(),
hash: *candidate_block.id(),
details: format!(
"Expected foreign receive count to be {} but it was {}",
current_count + 1,
incoming_count
),
});
}
if candidate_block.height() == NodeHeight::zero() || candidate_block.id().is_genesis() {
Expand Down
Loading
Loading