Skip to content

Commit

Permalink
feat: timeout
Browse files Browse the repository at this point in the history
  • Loading branch information
Cifko committed Jan 25, 2024
1 parent 7448b4f commit 3033f41
Show file tree
Hide file tree
Showing 11 changed files with 118 additions and 18 deletions.
41 changes: 31 additions & 10 deletions dan_layer/consensus/src/hotstuff/on_receive_foreign_proposal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use tari_dan_storage::{
StateStore,
};
use tari_epoch_manager::EpochManagerReader;
use tari_transaction::TransactionId;

use crate::{
hotstuff::{error::HotStuffError, pacemaker_handle::PaceMakerHandle, ProposalValidationError},
Expand Down Expand Up @@ -70,7 +71,36 @@ where TConsensusSpec: ConsensusSpec
.epoch_manager
.get_committee_shard(block.epoch(), vn.shard_key)
.await?;
let foreign_proposal = ForeignProposal::new(committee_shard.shard(), *block.id());

let local_shard = self.epoch_manager.get_local_committee_shard(block.epoch()).await?;
self.validate_proposed_block(
&from,
&block,
committee_shard.shard(),
local_shard.shard(),
&foreign_receive_counter,
)?;
// Is this ok? Can foreign node send invalid block that should still increment the counter?
foreign_receive_counter.increment(&committee_shard.shard());

let tx_ids = block
.commands()
.iter()
.filter_map(|command| {
if let Some(tx) = command.local_prepared() {
if !committee_shard.includes_any_shard(command.evidence().shards_iter()) {
return None;
}
// We are interested in the commands that are for us, they will be in local prepared and one of the
// evidence shards will be ours
Some(tx.id)
} else {
None
}
})
.collect::<Vec<TransactionId>>();

let foreign_proposal = ForeignProposal::new(committee_shard.shard(), *block.id(), tx_ids);
if self
.store
.with_read_tx(|tx| ForeignProposal::exists(tx, &foreign_proposal))?
Expand All @@ -83,15 +113,6 @@ where TConsensusSpec: ConsensusSpec
return Ok(());
}

let local_shard = self.epoch_manager.get_local_committee_shard(block.epoch()).await?;
self.validate_proposed_block(
&from,
&block,
committee_shard.shard(),
local_shard.shard(),
&foreign_receive_counter,
)?;
foreign_receive_counter.increment(&committee_shard.shard());
self.store.with_write_tx(|tx| {
foreign_receive_counter.save(tx)?;
foreign_proposal.upsert(tx)?;
Expand Down
42 changes: 41 additions & 1 deletion dan_layer/consensus/src/hotstuff/on_receive_local_proposal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,19 @@ use tari_dan_common_types::{
NodeHeight,
};
use tari_dan_storage::{
consensus_models::{Block, BlockId, ForeignSendCounters, HighQc, TransactionPool, ValidBlock},
consensus_models::{
Block,
BlockId,
Decision,
ForeignProposal,
ForeignSendCounters,
HighQc,
TransactionPool,
TransactionPoolStage,
ValidBlock,
},
StateStore,
StateStoreReadTransaction,
};
use tari_epoch_manager::EpochManagerReader;
use tokio::sync::broadcast;
Expand All @@ -41,6 +52,7 @@ pub struct OnReceiveLocalProposalHandler<TConsensusSpec: ConsensusSpec> {
epoch_manager: TConsensusSpec::EpochManager,
leader_strategy: TConsensusSpec::LeaderStrategy,
pacemaker: PaceMakerHandle,
transaction_pool: TransactionPool<TConsensusSpec::StateStore>,
on_ready_to_vote_on_local_block: OnReadyToVoteOnLocalBlock<TConsensusSpec>,
}

Expand All @@ -63,6 +75,7 @@ impl<TConsensusSpec: ConsensusSpec> OnReceiveLocalProposalHandler<TConsensusSpec
epoch_manager: epoch_manager.clone(),
leader_strategy: leader_strategy.clone(),
pacemaker,
transaction_pool: transaction_pool.clone(),
on_ready_to_vote_on_local_block: OnReadyToVoteOnLocalBlock::new(
validator_addr,
store,
Expand Down Expand Up @@ -242,6 +255,7 @@ impl<TConsensusSpec: ConsensusSpec> OnReceiveLocalProposalHandler<TConsensusSpec

/// Perform final block validations (TODO: implement all validations)
/// We assume at this point that initial stateless validations have been done (in inbound messages)
#[allow(clippy::too_many_lines)]
fn validate_local_proposed_block(
&self,
tx: &mut <TConsensusSpec::StateStore as StateStore>::ReadTransaction<'_>,
Expand Down Expand Up @@ -354,6 +368,32 @@ impl<TConsensusSpec: ConsensusSpec> OnReceiveLocalProposalHandler<TConsensusSpec
.into());
}

// TODO: Move this to consensus constants
const TIMEOUT: u64 = 1000;
let all_mined_proposals = ForeignProposal::get_all_mined(
tx.deref_mut(),
candidate_block.height().saturating_sub(NodeHeight(TIMEOUT)),
)?;
for proposal in all_mined_proposals {
let mut has_unresolved_transactions = false;
for tx_id in proposal.transactions.clone() {
let transaction = tx.transactions_get(&tx_id).optional()?;
if transaction.map_or(false, |t| t.final_decision().is_some()) {
// We don't know the transaction at all, or we know it but it's not finalised.
let mut tx_rec = self.transaction_pool.get(tx, candidate_block.as_leaf_block(), &tx_id)?;
// If the transaction is still in the pool we have to check if it was at least localy prepared,
// otherwise abort it.
if tx_rec.stage() == TransactionPoolStage::New || tx_rec.stage() == TransactionPoolStage::Prepared {
tx_rec.update_local_decision(tx, Decision::Abort)?;
has_unresolved_transactions = true;
}
}
}
if !has_unresolved_transactions {
proposal.delete(tx)?;
}
}

Ok(ValidBlock::new(candidate_block))
}
}
1 change: 1 addition & 0 deletions dan_layer/p2p/proto/consensus.proto
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ message ForeignProposal {
bytes block_id = 2;
ForeignProposalState state = 3;
uint64 mined_at = 4;
repeated bytes transactions = 5;
}

message TransactionAtom {
Expand Down
6 changes: 6 additions & 0 deletions dan_layer/p2p/src/conversions/consensus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -391,6 +391,7 @@ impl From<&ForeignProposal> for proto::consensus::ForeignProposal {
block_id: value.block_id.as_bytes().to_vec(),
state: proto::consensus::ForeignProposalState::from(value.state).into(),
mined_at: value.proposed_height.map(|a| a.0).unwrap_or(0),
transactions: value.transactions.iter().map(|tx| tx.as_bytes().to_vec()).collect(),
}
}
}
Expand All @@ -410,6 +411,11 @@ impl TryFrom<proto::consensus::ForeignProposal> for ForeignProposal {
} else {
Some(NodeHeight(value.mined_at))
},
transactions: value
.transactions
.into_iter()
.map(|tx| tx.try_into())
.collect::<Result<_, _>>()?,
})
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -249,11 +249,12 @@ CREATE TABLE missing_transactions

CREATE TABLE foreign_proposals
(
id integer not NULL primary key AUTOINCREMENT,
bucket bigint not NULL,
block_id text not NULL,
state text not NULL,
mined_at bigint NULL,
id integer not NULL primary key AUTOINCREMENT,
bucket bigint not NULL,
block_id text not NULL,
state text not NULL,
mined_at bigint NULL,
transactions text not NULL,
created_at timestamp not NULL DEFAULT CURRENT_TIMESTAMP,
UNIQUE (bucket, block_id)
);
Expand Down
18 changes: 17 additions & 1 deletion dan_layer/state_store_sqlite/src/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ use tari_utilities::ByteArray;

use crate::{
error::SqliteStorageError,
serialization::{deserialize_hex_try_from, deserialize_json, serialize_hex},
serialization::{deserialize_hex_try_from, deserialize_json, serialize_hex, serialize_json},
sql_models,
sqlite_transaction::SqliteTransaction,
};
Expand Down Expand Up @@ -384,6 +384,7 @@ impl<TAddr: NodeAddressable + Serialize + DeserializeOwned> StateStoreReadTransa
let foreign_proposals = foreign_proposals::table
.filter(foreign_proposals::bucket.eq(foreign_proposal.bucket.as_u32() as i32))
.filter(foreign_proposals::block_id.eq(serialize_hex(foreign_proposal.block_id)))
.filter(foreign_proposals::transactions.eq(serialize_json(&foreign_proposal.transactions)?))
.count()
.limit(1)
.get_result::<i64>(self.connection())
Expand Down Expand Up @@ -438,6 +439,21 @@ impl<TAddr: NodeAddressable + Serialize + DeserializeOwned> StateStoreReadTransa
.collect::<Vec<ForeignProposal>>())
}

fn foreign_proposal_get_all_mined(&mut self, to_height: NodeHeight) -> Result<Vec<ForeignProposal>, StorageError> {
use crate::schema::foreign_proposals;

let foreign_proposals = foreign_proposals::table
.filter(foreign_proposals::state.eq("Mined"))
.filter(foreign_proposals::mined_at.le(to_height.0 as i64))
.load::<sql_models::ForeignProposal>(self.connection())
.map_err(|e| SqliteStorageError::DieselError {
operation: "foreign_proposal_get_all",
source: e,
})?;

foreign_proposals.into_iter().map(|p| p.try_into()).collect()
}

fn foreign_send_counters_get(&mut self, block_id: &BlockId) -> Result<ForeignSendCounters, StorageError> {
use crate::schema::foreign_send_counters;

Expand Down
1 change: 1 addition & 0 deletions dan_layer/state_store_sqlite/src/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ diesel::table! {
block_id -> Text,
state -> Text,
proposed_height -> Nullable<BigInt>,
transactions -> Text,
created_at -> Timestamp,
}
}
Expand Down
2 changes: 2 additions & 0 deletions dan_layer/state_store_sqlite/src/sql_models/bookkeeping.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ pub struct ForeignProposal {
pub block_id: String,
pub state: String,
pub mined_at: Option<i64>,
pub transactions: String,
pub created_at: PrimitiveDateTime,
}

Expand All @@ -54,6 +55,7 @@ impl TryFrom<ForeignProposal> for consensus_models::ForeignProposal {
block_id: deserialize_hex_try_from(&value.block_id)?,
state: parse_from_string(&value.state)?,
proposed_height: value.mined_at.map(|mined_at| NodeHeight(mined_at as u64)),
transactions: deserialize_json(&value.transactions)?,
})
}
}
Expand Down
1 change: 1 addition & 0 deletions dan_layer/state_store_sqlite/src/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -578,6 +578,7 @@ impl<TAddr: NodeAddressable> StateStoreWriteTransaction for SqliteStateStoreWrit
foreign_proposals::block_id.eq(serialize_hex(foreign_proposal.block_id)),
foreign_proposals::state.eq(foreign_proposal.state.to_string()),
foreign_proposals::proposed_height.eq(foreign_proposal.proposed_height.map(|h| h.as_u64() as i64)),
foreign_proposals::transactions.eq(serialize_json(&foreign_proposal.transactions)?),
);

diesel::insert_into(foreign_proposals::table)
Expand Down
12 changes: 11 additions & 1 deletion dan_layer/storage/src/consensus_models/foreign_proposal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use std::{

use serde::{Deserialize, Serialize};
use tari_dan_common_types::{shard::Shard, NodeHeight};
use tari_transaction::TransactionId;

use super::BlockId;
use crate::{StateStoreReadTransaction, StateStoreWriteTransaction, StorageError};
Expand Down Expand Up @@ -49,15 +50,17 @@ pub struct ForeignProposal {
pub block_id: BlockId,
pub state: ForeignProposalState,
pub proposed_height: Option<NodeHeight>,
pub transactions: Vec<TransactionId>,
}

impl ForeignProposal {
pub fn new(bucket: Shard, block_id: BlockId) -> Self {
pub fn new(bucket: Shard, block_id: BlockId, transactions: Vec<TransactionId>) -> Self {
Self {
bucket,
block_id,
state: ForeignProposalState::New,
proposed_height: None,
transactions,
}
}

Expand Down Expand Up @@ -97,4 +100,11 @@ impl ForeignProposal {
) -> Result<Vec<Self>, StorageError> {
tx.foreign_proposal_get_all_pending(from_block_id, to_block_id)
}

pub fn get_all_mined<TTx: StateStoreReadTransaction + ?Sized>(
tx: &mut TTx,
to_height: NodeHeight,
) -> Result<Vec<Self>, StorageError> {
tx.foreign_proposal_get_all_mined(to_height)
}
}
1 change: 1 addition & 0 deletions dan_layer/storage/src/state_store/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ pub trait StateStoreReadTransaction {
from_block_id: &BlockId,
to_block_id: &BlockId,
) -> Result<Vec<ForeignProposal>, StorageError>;
fn foreign_proposal_get_all_mined(&mut self, to_height: NodeHeight) -> Result<Vec<ForeignProposal>, StorageError>;
fn foreign_send_counters_get(&mut self, block_id: &BlockId) -> Result<ForeignSendCounters, StorageError>;
fn foreign_receive_counters_get(&mut self) -> Result<ForeignReceiveCounters, StorageError>;
fn transactions_get(&mut self, tx_id: &TransactionId) -> Result<TransactionRecord, StorageError>;
Expand Down

0 comments on commit 3033f41

Please sign in to comment.