Skip to content

Commit ce1f235

Browse files
authored
Merge branch 'development' into consensus-sync-no-dummy-blocks
2 parents 673b5ae + 63a1563 commit ce1f235

File tree

9 files changed

+122
-50
lines changed

9 files changed

+122
-50
lines changed

applications/tari_validator_node/src/json_rpc/handlers.rs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ use tari_dan_storage::{
3737
consensus_models::{Block, ExecutedTransaction, LeafBlock, QuorumDecision, SubstateRecord, TransactionRecord},
3838
Ordering,
3939
StateStore,
40+
StateStoreReadTransaction,
4041
};
4142
use tari_epoch_manager::{base_layer::EpochManagerHandle, EpochManagerReader};
4243
use tari_networking::{is_supported_multiaddr, NetworkingHandle, NetworkingService};
@@ -292,6 +293,16 @@ impl JsonRpcHandlers {
292293
Ok(JsonRpcResponse::success(answer_id, res))
293294
}
294295

296+
pub async fn get_tx_pool(&self, value: JsonRpcExtractor) -> JrpcResult {
297+
let answer_id = value.get_answer_id();
298+
let tx_pool = self
299+
.state_store
300+
.with_read_tx(|tx| tx.transaction_pool_get_all())
301+
.map_err(internal_error(answer_id))?;
302+
let res = json!({ "tx_pool": tx_pool });
303+
Ok(JsonRpcResponse::success(answer_id, res))
304+
}
305+
295306
pub async fn get_transaction_result(&self, value: JsonRpcExtractor) -> JrpcResult {
296307
let answer_id = value.get_answer_id();
297308
let request: GetTransactionResultRequest = value.parse_params()?;

applications/tari_validator_node/src/json_rpc/server.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@ async fn handler(Extension(handlers): Extension<Arc<JsonRpcHandlers>>, value: Js
6868
"get_substates_created_by_transaction" => handlers.get_substates_created_by_transaction(value).await,
6969
"get_substates_destroyed_by_transaction" => handlers.get_substates_destroyed_by_transaction(value).await,
7070
"list_blocks" => handlers.list_blocks(value).await,
71+
"get_tx_pool" => handlers.get_tx_pool(value).await,
7172
// Blocks
7273
"get_block" => handlers.get_block(value).await,
7374
"get_blocks_count" => handlers.get_blocks_count(value).await,

dan_layer/consensus/src/hotstuff/on_inbound_message.rs

Lines changed: 28 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -35,11 +35,11 @@ pub struct OnInboundMessage<TConsensusSpec: ConsensusSpec> {
3535
leader_strategy: TConsensusSpec::LeaderStrategy,
3636
pacemaker: PaceMakerHandle,
3737
vote_signing_service: TConsensusSpec::SignatureService,
38-
rx_hotstuff_message: mpsc::Receiver<(TConsensusSpec::Addr, HotstuffMessage)>,
38+
pub rx_hotstuff_message: mpsc::Receiver<(TConsensusSpec::Addr, HotstuffMessage)>,
3939
tx_outbound_message: mpsc::Sender<(TConsensusSpec::Addr, HotstuffMessage)>,
4040
tx_msg_ready: mpsc::UnboundedSender<(TConsensusSpec::Addr, HotstuffMessage)>,
41-
rx_new_transactions: mpsc::Receiver<TransactionId>,
42-
message_buffer: MessageBuffer<TConsensusSpec::Addr>,
41+
pub rx_new_transactions: mpsc::Receiver<TransactionId>,
42+
pub message_buffer: MessageBuffer<TConsensusSpec::Addr>,
4343
shutdown: ShutdownSignal,
4444
}
4545

@@ -73,32 +73,6 @@ where TConsensusSpec: ConsensusSpec
7373
}
7474
}
7575

76-
pub async fn next(&mut self, current_height: NodeHeight) -> IncomingMessageResult<TConsensusSpec::Addr> {
77-
loop {
78-
tokio::select! {
79-
biased;
80-
81-
_ = self.shutdown.wait() => { break Ok(None); }
82-
83-
msg_or_sync = self.message_buffer.next(current_height) => {
84-
break msg_or_sync;
85-
},
86-
87-
Some((from, msg)) = self.rx_hotstuff_message.recv() => {
88-
if let Err(err) = self.handle_hotstuff_message(current_height, from, msg).await {
89-
error!(target: LOG_TARGET, "Error handling message: {}", err);
90-
}
91-
},
92-
93-
Some(tx_id) = self.rx_new_transactions.recv() => {
94-
if let Err(err) = self.check_if_parked_blocks_ready(current_height, &tx_id).await {
95-
error!(target: LOG_TARGET, "Error checking parked blocks: {}", err);
96-
}
97-
},
98-
}
99-
}
100-
}
101-
10276
pub async fn discard(&mut self) {
10377
loop {
10478
tokio::select! {
@@ -123,7 +97,15 @@ where TConsensusSpec: ConsensusSpec
12397
) -> Result<(), HotStuffError> {
12498
match msg {
12599
HotstuffMessage::Proposal(msg) => {
126-
self.process_proposal(current_height, msg).await?;
100+
self.process_local_proposal(current_height, msg).await?;
101+
},
102+
HotstuffMessage::ForeignProposal(ref proposal) => {
103+
self.check_proposal(proposal.block.clone()).await?;
104+
self.tx_msg_ready
105+
.send((from, msg))
106+
.map_err(|_| HotStuffError::InternalChannelClosed {
107+
context: "tx_msg_ready in InboundMessageWorker::handle_hotstuff_message",
108+
})?;
127109
},
128110
msg => self
129111
.tx_msg_ready
@@ -135,7 +117,19 @@ where TConsensusSpec: ConsensusSpec
135117
Ok(())
136118
}
137119

138-
async fn process_proposal(
120+
async fn check_proposal(&self, block: Block) -> Result<Option<Block>, HotStuffError> {
121+
check_hash_and_height(&block)?;
122+
let committee_for_block = self
123+
.epoch_manager
124+
.get_committee_by_validator_public_key(block.epoch(), block.proposed_by())
125+
.await?;
126+
check_proposed_by_leader(&self.leader_strategy, &committee_for_block, &block)?;
127+
check_signature(&block)?;
128+
check_quorum_certificate::<TConsensusSpec>(&block, &self.vote_signing_service, &self.epoch_manager).await?;
129+
self.handle_missing_transactions(block).await
130+
}
131+
132+
async fn process_local_proposal(
139133
&self,
140134
current_height: NodeHeight,
141135
proposal: ProposalMessage,
@@ -160,16 +154,7 @@ where TConsensusSpec: ConsensusSpec
160154
return Ok(());
161155
}
162156

163-
check_hash_and_height(&block)?;
164-
let committee_for_block = self
165-
.epoch_manager
166-
.get_committee_by_validator_public_key(block.epoch(), block.proposed_by())
167-
.await?;
168-
check_proposed_by_leader(&self.leader_strategy, &committee_for_block, &block)?;
169-
check_signature(&block)?;
170-
check_quorum_certificate::<TConsensusSpec>(&block, &self.vote_signing_service, &self.epoch_manager).await?;
171-
172-
let Some(ready_block) = self.handle_missing_transactions(block).await? else {
157+
let Some(ready_block) = self.check_proposal(block).await? else {
173158
// Block not ready
174159
return Ok(());
175160
};
@@ -184,7 +169,7 @@ where TConsensusSpec: ConsensusSpec
184169
Ok(())
185170
}
186171

187-
async fn check_if_parked_blocks_ready(
172+
pub async fn check_if_parked_blocks_ready(
188173
&self,
189174
current_height: NodeHeight,
190175
transaction_id: &TransactionId,
@@ -301,7 +286,7 @@ where TConsensusSpec: ConsensusSpec
301286
}
302287
}
303288

304-
struct MessageBuffer<TAddr> {
289+
pub struct MessageBuffer<TAddr> {
305290
buffer: BTreeMap<NodeHeight, VecDeque<(TAddr, HotstuffMessage)>>,
306291
rx_msg_ready: mpsc::UnboundedReceiver<(TAddr, HotstuffMessage)>,
307292
}

dan_layer/consensus/src/hotstuff/on_ready_to_vote_on_local_block.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -950,6 +950,7 @@ where TConsensusSpec: ConsensusSpec
950950
// committees and within committees are not different in terms of size, speed, etc.
951951
let diff_from_leader = (my_index + local_committee.len() - leader_index as usize) % local_committee.len();
952952
// f+1 nodes (always including the leader) send the proposal to the foreign committee
953+
// if diff_from_leader <= (local_committee.len() - 1) / 3 + 1 {
953954
if diff_from_leader <= local_committee.len() / 3 {
954955
self.proposer.broadcast_proposal_foreignly(&block).await?;
955956
}

dan_layer/consensus/src/hotstuff/on_receive_foreign_proposal.rs

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,13 +69,26 @@ where TConsensusSpec: ConsensusSpec
6969
.epoch_manager
7070
.get_committee_shard(block.epoch(), vn.shard_key)
7171
.await?;
72+
let foreign_proposal = ForeignProposal::new(committee_shard.bucket(), *block.id());
73+
if self
74+
.store
75+
.with_read_tx(|tx| ForeignProposal::exists(tx, &foreign_proposal))?
76+
{
77+
warn!(
78+
target: LOG_TARGET,
79+
"🔥 FOREIGN PROPOSAL: Already received proposal for block {}",
80+
block.id(),
81+
);
82+
return Ok(());
83+
}
84+
7285
let local_shard = self.epoch_manager.get_local_committee_shard(block.epoch()).await?;
7386
self.validate_proposed_block(&from, &block, committee_shard.bucket(), local_shard.bucket())?;
7487
// Is this ok? Can foreign node send invalid block that should still increment the counter?
7588
self.foreign_receive_counter.increment(&committee_shard.bucket());
7689
self.store.with_write_tx(|tx| {
7790
self.foreign_receive_counter.save(tx)?;
78-
ForeignProposal::new(committee_shard.bucket(), *block.id()).upsert(tx)?;
91+
foreign_proposal.upsert(tx)?;
7992
self.on_receive_foreign_block(tx, &block, &committee_shard)
8093
})?;
8194

dan_layer/consensus/src/hotstuff/worker.rs

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -222,13 +222,25 @@ where TConsensusSpec: ConsensusSpec
222222
);
223223

224224
tokio::select! {
225-
msg_or_sync = self.inbound_message_worker.next(current_height) => {
225+
msg_or_sync = self.inbound_message_worker.message_buffer.next(current_height) => {
226226
if let Err(e) = self.on_new_hs_message(msg_or_sync).await {
227227
self.on_failure("on_new_hs_message", &e).await;
228228
return Err(e);
229229
}
230230
},
231231

232+
Some((from, msg)) = self.inbound_message_worker.rx_hotstuff_message.recv() => {
233+
if let Err(err) = self.inbound_message_worker.handle_hotstuff_message(current_height, from, msg).await {
234+
error!(target: LOG_TARGET, "Error handling message: {}", err);
235+
}
236+
},
237+
238+
Some(tx_id) = self.inbound_message_worker.rx_new_transactions.recv() => {
239+
if let Err(err) = self.inbound_message_worker.check_if_parked_blocks_ready(current_height, &tx_id).await {
240+
error!(target: LOG_TARGET, "Error checking parked blocks: {}", err);
241+
}
242+
},
243+
232244
Ok(event) = epoch_manager_events.recv() => {
233245
self.handle_epoch_manager_event(event).await?;
234246
},

dan_layer/state_store_sqlite/src/reader.rs

Lines changed: 50 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,8 @@ use tari_dan_storage::{
3232
Block,
3333
BlockId,
3434
Command,
35+
Decision,
36+
Evidence,
3537
ForeignProposal,
3638
ForeignReceiveCounters,
3739
ForeignSendCounters,
@@ -61,7 +63,7 @@ use tari_utilities::ByteArray;
6163

6264
use crate::{
6365
error::SqliteStorageError,
64-
serialization::{deserialize_hex_try_from, deserialize_json, serialize_hex},
66+
serialization::{deserialize_hex_try_from, deserialize_json, parse_from_string, serialize_hex},
6567
sql_models,
6668
sqlite_transaction::SqliteTransaction,
6769
};
@@ -95,7 +97,7 @@ impl<'a, TAddr> SqliteStateStoreReadTransaction<'a, TAddr> {
9597
}
9698

9799
impl<'a, TAddr: NodeAddressable + Serialize + DeserializeOwned> SqliteStateStoreReadTransaction<'a, TAddr> {
98-
pub(super) fn get_transaction_atom_state_updates_between_blocks<'i, ITx>(
100+
pub fn get_transaction_atom_state_updates_between_blocks<'i, ITx>(
99101
&mut self,
100102
from_block_id: &BlockId,
101103
to_block_id: &BlockId,
@@ -1144,6 +1146,17 @@ impl<TAddr: NodeAddressable + Serialize + DeserializeOwned> StateStoreReadTransa
11441146
Ok(count > 0)
11451147
}
11461148

1149+
fn transaction_pool_get_all(&mut self) -> Result<Vec<TransactionPoolRecord>, StorageError> {
1150+
use crate::schema::transaction_pool;
1151+
let txs = transaction_pool::table
1152+
.get_results::<sql_models::TransactionPoolRecord>(self.connection())
1153+
.map_err(|e| SqliteStorageError::DieselError {
1154+
operation: "transaction_pool_get_all",
1155+
source: e,
1156+
})?;
1157+
txs.into_iter().map(|tx| tx.try_convert(None)).collect()
1158+
}
1159+
11471160
fn transaction_pool_get_many_ready(&mut self, max_txs: usize) -> Result<Vec<TransactionPoolRecord>, StorageError> {
11481161
use crate::schema::transaction_pool;
11491162

@@ -1178,14 +1191,48 @@ impl<TAddr: NodeAddressable + Serialize + DeserializeOwned> StateStoreReadTransa
11781191
updates.len()
11791192
);
11801193

1194+
let mut used_substates = HashSet::<ShardId>::new();
1195+
let mut processed_substates = HashMap::<TransactionId, HashSet<ShardId>>::new();
1196+
for (tx_id, update) in &updates {
1197+
if let Some(Decision::Abort) = update
1198+
.local_decision
1199+
.as_ref()
1200+
.map(|decision| parse_from_string(decision.as_str()))
1201+
.transpose()?
1202+
{
1203+
// The aborted transaction don't lock any substates
1204+
continue;
1205+
}
1206+
let evidence = deserialize_json::<Evidence>(&update.evidence)?;
1207+
let evidence = evidence.shards_iter().copied().collect::<HashSet<_>>();
1208+
processed_substates.insert(deserialize_hex_try_from(tx_id)?, evidence);
1209+
}
1210+
11811211
ready_txs
11821212
.into_iter()
11831213
.filter_map(|rec| {
11841214
let maybe_update = updates.remove(&rec.transaction_id);
11851215
match rec.try_convert(maybe_update) {
11861216
Ok(rec) => {
11871217
if rec.is_ready() {
1188-
Some(Ok(rec))
1218+
let tx_substates: HashSet<ShardId> = rec
1219+
.transaction()
1220+
.evidence
1221+
.shards_iter()
1222+
.copied()
1223+
.collect::<HashSet<_>>();
1224+
if tx_substates.is_disjoint(&used_substates) &&
1225+
processed_substates.iter().all(|(tx_id, substates)| {
1226+
tx_id == rec.transaction_id() || tx_substates.is_disjoint(substates)
1227+
})
1228+
{
1229+
used_substates.extend(tx_substates);
1230+
Some(Ok(rec))
1231+
} else {
1232+
// TODO: If we don't switch to "no version" transaction, then we can abort these here.
1233+
// That also requires changes to the on_ready_to_vote_on_local_block
1234+
None
1235+
}
11891236
} else {
11901237
None
11911238
}

dan_layer/storage/src/consensus_models/transaction_pool.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ use std::{
99
};
1010

1111
use log::*;
12+
use serde::Serialize;
1213
use tari_dan_common_types::{
1314
committee::CommitteeShard,
1415
optional::{IsNotFoundError, Optional},
@@ -25,7 +26,7 @@ use crate::{
2526

2627
const _LOG_TARGET: &str = "tari::dan::storage::transaction_pool";
2728

28-
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
29+
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize)]
2930
pub enum TransactionPoolStage {
3031
/// Transaction has just come in and has never been proposed
3132
New,
@@ -215,7 +216,7 @@ impl<TStateStore: StateStore> TransactionPool<TStateStore> {
215216
}
216217
}
217218

218-
#[derive(Debug, Clone)]
219+
#[derive(Debug, Clone, Serialize)]
219220
pub struct TransactionPoolRecord {
220221
transaction: TransactionAtom,
221222
stage: TransactionPoolStage,

dan_layer/storage/src/state_store/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -166,6 +166,7 @@ pub trait StateStoreReadTransaction {
166166
transaction_id: &TransactionId,
167167
) -> Result<TransactionPoolRecord, StorageError>;
168168
fn transaction_pool_exists(&mut self, transaction_id: &TransactionId) -> Result<bool, StorageError>;
169+
fn transaction_pool_get_all(&mut self) -> Result<Vec<TransactionPoolRecord>, StorageError>;
169170
fn transaction_pool_get_many_ready(&mut self, max_txs: usize) -> Result<Vec<TransactionPoolRecord>, StorageError>;
170171
fn transaction_pool_count(
171172
&mut self,

0 commit comments

Comments
 (0)