Skip to content

Commit

Permalink
fix(sync): calculate dummy blocks in sync
Browse files Browse the repository at this point in the history
  • Loading branch information
sdbondi committed Jan 11, 2024
1 parent 95da1cf commit 673b5ae
Show file tree
Hide file tree
Showing 14 changed files with 143 additions and 68 deletions.
38 changes: 19 additions & 19 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 3 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,12 @@ members = [
"clients/base_node_client",
"clients/validator_node_client",
"clients/wallet_daemon_client",
"dan_layer/comms_rpc_state_sync",
"dan_layer/consensus",
"dan_layer/consensus_tests",
"dan_layer/epoch_manager",
"dan_layer/indexer_lib",
"dan_layer/p2p",
"dan_layer/rpc_state_sync",
"dan_layer/state_store_sqlite",
"dan_layer/storage_lmdb",
"dan_layer/storage_sqlite",
Expand Down Expand Up @@ -64,9 +64,9 @@ libp2p-substream = { path = "networking/libp2p-substream" }
proto_builder = { path = "networking/proto_builder" }
sqlite_message_logger = { path = "networking/sqlite_message_logger" }
tari_base_node_client = { path = "clients/base_node_client" }
# avoid including default features so each crate can choose which ones to import
# avoid including default features so each crate can choose which ones to import
tari_bor = { version = "0.3.0", path = "dan_layer/tari_bor", default-features = false }
tari_comms_rpc_state_sync = { path = "dan_layer/comms_rpc_state_sync" }
tari_rpc_state_sync = { path = "dan_layer/rpc_state_sync" }
tari_consensus = { path = "dan_layer/consensus" }
tari_dan_app_utilities = { path = "applications/tari_dan_app_utilities" }
tari_dan_common_types = { path = "dan_layer/common_types" }
Expand Down
6 changes: 3 additions & 3 deletions applications/tari_validator_node/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ minotari_app_grpc = { workspace = true }
minotari_app_utilities = { workspace = true }
minotari_wallet_grpc_client = { workspace = true }
tari_common = { workspace = true }
tari_common_types = {workspace = true }
tari_common_types = { workspace = true }
tari_core = { workspace = true, default-features = false, features = ["transactions"] }
tari_crypto = { workspace = true }
tari_validator_node_rpc = { workspace = true }
Expand All @@ -30,7 +30,7 @@ tari_validator_node_client = { workspace = true }
tari_base_node_client = { workspace = true }
tari_epoch_manager = { workspace = true, features = ["base_layer"] }
tari_indexer_lib = { workspace = true }
tari_comms_rpc_state_sync = {workspace = true }
tari_rpc_state_sync = { workspace = true }
tari_bor = { workspace = true, default-features = true }
tari_consensus = { workspace = true }
tari_state_store_sqlite = { workspace = true }
Expand Down Expand Up @@ -72,7 +72,7 @@ tokio = { workspace = true, features = [
"time",
"sync",
"rt-multi-thread",
]}
] }
tonic = { workspace = true }
tower-http = { workspace = true, features = ["default", "cors"] }

Expand Down
4 changes: 2 additions & 2 deletions applications/tari_validator_node/src/consensus/mod.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
// Copyright 2023 The Tari Project
// SPDX-License-Identifier: BSD-3-Clause

use tari_comms_rpc_state_sync::CommsRpcStateSyncManager;
use tari_consensus::{
hotstuff::{ConsensusWorker, ConsensusWorkerContext, HotstuffWorker},
messages::HotstuffMessage,
Expand Down Expand Up @@ -39,6 +38,7 @@ pub use handle::*;
use sqlite_message_logger::SqliteMessageLogger;
use tari_dan_app_utilities::keypair::RistrettoKeypair;
use tari_dan_common_types::PeerAddress;
use tari_rpc_state_sync::RpcStateSyncManager;

use crate::p2p::services::message_dispatcher::OutboundMessaging;

Expand Down Expand Up @@ -90,7 +90,7 @@ pub async fn spawn(
let context = ConsensusWorkerContext {
epoch_manager: epoch_manager.clone(),
hotstuff: hotstuff_worker,
state_sync: CommsRpcStateSyncManager::new(epoch_manager, store, client_factory),
state_sync: RpcStateSyncManager::new(epoch_manager, store, leader_strategy, client_factory),
tx_current_state,
};

Expand Down
4 changes: 2 additions & 2 deletions applications/tari_validator_node/src/consensus/spec.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
// Copyright 2023 The Tari Project
// SPDX-License-Identifier: BSD-3-Clause

use tari_comms_rpc_state_sync::CommsRpcStateSyncManager;
use tari_consensus::traits::ConsensusSpec;
use tari_dan_common_types::PeerAddress;
use tari_epoch_manager::base_layer::EpochManagerHandle;
use tari_rpc_state_sync::RpcStateSyncManager;
use tari_state_store_sqlite::SqliteStateStore;

use crate::consensus::{
Expand All @@ -23,5 +23,5 @@ impl ConsensusSpec for TariConsensusSpec {
type SignatureService = TariSignatureService;
type StateManager = TariStateManager;
type StateStore = SqliteStateStore<Self::Addr>;
type SyncManager = CommsRpcStateSyncManager<Self::EpochManager, Self::StateStore>;
type SyncManager = RpcStateSyncManager<Self>;
}
10 changes: 8 additions & 2 deletions applications/tari_validator_node/src/p2p/rpc/sync_task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,13 +109,19 @@ impl<TStateStore: StateStore> BlockSyncTask<TStateStore> {
fn fetch_next_batch(&self, buffer: &mut BlockBuffer, current_block_id: &BlockId) -> Result<BlockId, StorageError> {
self.store.with_read_tx(|tx| {
let mut current_block_id = *current_block_id;
let mut last_block_id = current_block_id;
loop {
let children = tx.blocks_get_all_by_parent(&current_block_id)?;
let Some(child) = children.into_iter().find(|b| b.is_committed()) else {
break;
};

current_block_id = *child.id();
if child.is_dummy() {
continue;
}

last_block_id = current_block_id;
let all_qcs = child
.commands()
.iter()
Expand All @@ -130,15 +136,15 @@ impl<TStateStore: StateStore> BlockSyncTask<TStateStore> {
break;
}
}
Ok::<_, StorageError>(current_block_id)
Ok::<_, StorageError>(last_block_id)
})
}

fn fetch_last_blocks(&self, buffer: &mut BlockBuffer, current_block_id: &BlockId) -> Result<(), StorageError> {
self.store.with_read_tx(|tx| {
// TODO: if there are any transactions this will break the syncing node.
let leaf_block = LeafBlock::get(tx)?;
let blocks = Block::get_all_blocks_between(tx, current_block_id, leaf_block.block_id())?;
let blocks = Block::get_all_blocks_between(tx, current_block_id, leaf_block.block_id(), false)?;
for block in blocks {
debug!(
target: LOG_TARGET,
Expand Down
1 change: 1 addition & 0 deletions dan_layer/consensus/src/hotstuff/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ mod state_machine;
mod vote_receiver;
mod worker;

pub use common::*;
pub use error::*;
pub use event::*;
pub use state_machine::*;
Expand Down
8 changes: 6 additions & 2 deletions dan_layer/consensus/src/hotstuff/on_inbound_message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ where TConsensusSpec: ConsensusSpec
);

if block.height() < current_height {
debug!(
info!(
target: LOG_TARGET,
"🔥 Block {} is lower than current height {}. Ignoring.",
block,
Expand Down Expand Up @@ -334,7 +334,11 @@ impl<TAddr: NodeAddressable> MessageBuffer<TAddr> {
},
// Buffer message for future height
Some(h) if h > current_height => {
debug!(target: LOG_TARGET, "Message {} is for future block {}. Current height {}", msg, h, current_height);
if msg.proposal().is_some() {
info!(target: LOG_TARGET, "Proposal {} is for future block {}. Current height {}", msg, h, current_height);
} else {
debug!(target: LOG_TARGET, "Message {} is for future height {}. Current height {}", msg, h, current_height);
}
self.push_to_buffer(h, from, msg);
continue;
},
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[package]
name = "tari_comms_rpc_state_sync"
name = "tari_rpc_state_sync"
description = "Tari template runtime engine"
version.workspace = true
edition.workspace = true
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// Copyright 2023 The Tari Project
// SPDX-License-Identifier: BSD-3-Clause

use tari_consensus::hotstuff::HotStuffError;
use tari_consensus::hotstuff::{HotStuffError, ProposalValidationError};
use tari_dan_storage::{
consensus_models::{BlockId, TransactionPoolError},
StorageError,
Expand All @@ -28,6 +28,8 @@ pub enum CommsRpcConsensusSyncError {
BlockNotSafe { block_id: BlockId },
#[error("No peers available. The committee size is {committee_size}")]
NoPeersAvailable { committee_size: usize },
#[error("Proposal validation error: {0}")]
ProposalValidationError(#[from] ProposalValidationError),
}

impl From<CommsRpcConsensusSyncError> for HotStuffError {
Expand Down
File renamed without changes.
Loading

0 comments on commit 673b5ae

Please sign in to comment.