Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat!: fix horizon sync after smt upgrade #6006

Merged
merged 1 commit into from
Jan 31, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ impl CommandContext {
let mut missing_headers = Vec::new();
print!("Searching for height: ");
// We need to check every header, but not every block.
let horizon_height = meta.horizon_block_height(height);
let horizon_height = meta.pruned_height_at_given_chain_tip(height);
while height > 0 {
print!("{}", height);
io::stdout().flush().await?;
Expand Down
92 changes: 44 additions & 48 deletions base_layer/common_types/src/chain_metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,11 @@ use std::fmt::{Display, Error, Formatter};
use primitive_types::U256;
use serde::{Deserialize, Serialize};

use crate::types::{BlockHash, FixedHash};
use crate::types::BlockHash;

#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize, Hash)]
pub struct ChainMetadata {
/// The current chain height, or the block number of the longest valid chain, or `None` if there is no chain
/// The current chain height, or the block number of the longest valid chain
best_block_height: u64,
/// The block hash of the current tip of the longest valid chain
best_block_hash: BlockHash,
Expand All @@ -40,7 +40,7 @@ pub struct ChainMetadata {
pruning_horizon: u64,
/// The height of the pruning horizon. This indicates from what height a full block can be provided
/// (exclusive). If `pruned_height` is equal to the `best_block_height` no blocks can be
/// provided. Archival nodes wil always have an `pruned_height` of zero.
/// provided. Archival nodes wil always have a `pruned_height` of zero.
pruned_height: u64,
/// The total accumulated proof of work of the longest chain
accumulated_difficulty: U256,
Expand All @@ -67,37 +67,16 @@ impl ChainMetadata {
}
}

pub fn empty() -> ChainMetadata {
ChainMetadata {
best_block_height: 0,
best_block_hash: FixedHash::zero(),
pruning_horizon: 0,
pruned_height: 0,
accumulated_difficulty: 0.into(),
timestamp: 0,
}
}

/// The block height at the pruning horizon, given the chain height of the network. Typically database backends
/// cannot provide any block data earlier than this point.
/// Zero is returned if the blockchain still hasn't reached the pruning horizon.
pub fn horizon_block_height(&self, chain_tip: u64) -> u64 {
pub fn pruned_height_at_given_chain_tip(&self, chain_tip: u64) -> u64 {
match self.pruning_horizon {
0 => 0,
horizon => chain_tip.saturating_sub(horizon),
pruning_horizon => chain_tip.saturating_sub(pruning_horizon),
}
}

/// Set the pruning horizon to indicate that the chain is in archival mode (i.e. a pruning horizon of zero)
pub fn archival_mode(&mut self) {
self.pruning_horizon = 0;
}

/// Set the pruning horizon
pub fn set_pruning_horizon(&mut self, pruning_horizon: u64) {
self.pruning_horizon = pruning_horizon;
}

/// The configured number of blocks back from the tip that this database tracks. A value of 0 indicates that
/// pruning mode is disabled and the node will keep full blocks from the time it was set. If pruning horizon
/// was previously enabled, previously pruned blocks will remain pruned. If set from initial sync, full blocks
Expand All @@ -123,7 +102,7 @@ impl ChainMetadata {

/// The height of the pruning horizon. This indicates from what height a full block can be provided
/// (exclusive). If `pruned_height` is equal to the `best_block_height` no blocks can be
/// provided. Archival nodes wil always have an `pruned_height` of zero.
/// provided. Archival nodes wil always have a `pruned_height` of zero.
pub fn pruned_height(&self) -> u64 {
self.pruned_height
}
Expand All @@ -143,14 +122,11 @@ impl ChainMetadata {

impl Display for ChainMetadata {
fn fmt(&self, f: &mut Formatter<'_>) -> Result<(), Error> {
let height = self.best_block_height;
let best_block = self.best_block_hash;
let accumulated_difficulty = self.accumulated_difficulty;
writeln!(f, "Height of longest chain: {}", height)?;
writeln!(f, "Total accumulated difficulty: {}", accumulated_difficulty)?;
writeln!(f, "Best block: {}", best_block)?;
writeln!(f, "Best block height: {}", self.best_block_height)?;
writeln!(f, "Total accumulated difficulty: {}", self.accumulated_difficulty)?;
writeln!(f, "Best block hash: {}", self.best_block_hash)?;
writeln!(f, "Pruning horizon: {}", self.pruning_horizon)?;
writeln!(f, "Effective pruned height: {}", self.pruned_height)?;
writeln!(f, "Pruned height: {}", self.pruned_height)?;
Ok(())
}
}
Expand All @@ -161,33 +137,53 @@ mod test {

#[test]
fn horizon_block_on_default() {
let metadata = ChainMetadata::empty();
assert_eq!(metadata.horizon_block_height(0), 0);
let metadata = ChainMetadata {
best_block_height: 0,
best_block_hash: Default::default(),
pruning_horizon: 0,
pruned_height: 0,
accumulated_difficulty: Default::default(),
timestamp: 0,
};
assert_eq!(metadata.pruned_height_at_given_chain_tip(0), 0);
}

#[test]
fn pruned_mode() {
let mut metadata = ChainMetadata::empty();
let mut metadata = ChainMetadata {
best_block_height: 0,
best_block_hash: Default::default(),
pruning_horizon: 0,
pruned_height: 0,
accumulated_difficulty: Default::default(),
timestamp: 0,
};
assert!(!metadata.is_pruned_node());
assert!(metadata.is_archival_node());
metadata.set_pruning_horizon(2880);
metadata.pruning_horizon = 2880;
assert!(metadata.is_pruned_node());
assert!(!metadata.is_archival_node());
assert_eq!(metadata.horizon_block_height(0), 0);
assert_eq!(metadata.horizon_block_height(100), 0);
assert_eq!(metadata.horizon_block_height(2880), 0);
assert_eq!(metadata.horizon_block_height(2881), 1);
assert_eq!(metadata.pruned_height_at_given_chain_tip(0), 0);
assert_eq!(metadata.pruned_height_at_given_chain_tip(100), 0);
assert_eq!(metadata.pruned_height_at_given_chain_tip(2880), 0);
assert_eq!(metadata.pruned_height_at_given_chain_tip(2881), 1);
}

#[test]
fn archival_node() {
let mut metadata = ChainMetadata::empty();
metadata.archival_mode();
let metadata = ChainMetadata {
best_block_height: 0,
best_block_hash: Default::default(),
pruning_horizon: 0,
pruned_height: 0,
accumulated_difficulty: Default::default(),
timestamp: 0,
};
// Chain is still empty
assert_eq!(metadata.horizon_block_height(0), 0);
assert_eq!(metadata.pruned_height_at_given_chain_tip(0), 0);
// When pruning horizon is zero, the horizon block is always 0, the genesis block
assert_eq!(metadata.horizon_block_height(0), 0);
assert_eq!(metadata.horizon_block_height(100), 0);
assert_eq!(metadata.horizon_block_height(2881), 0);
assert_eq!(metadata.pruned_height_at_given_chain_tip(0), 0);
assert_eq!(metadata.pruned_height_at_given_chain_tip(100), 0);
assert_eq!(metadata.pruned_height_at_given_chain_tip(2881), 0);
}
}
16 changes: 10 additions & 6 deletions base_layer/core/src/base_node/proto/rpc.proto
Original file line number Diff line number Diff line change
Expand Up @@ -57,16 +57,20 @@ message SyncKernelsRequest {
}

message SyncUtxosRequest {
// Start header hash to sync UTXOs from
bytes start_header_hash = 1;
// End header hash to sync UTXOs to
bytes end_header_hash = 2;
}
message SyncUtxosResponse {
tari.types.TransactionOutput output = 1;
bytes mined_header = 2;
}

message PrunedOutput {
bytes hash = 1;
message SyncUtxosResponse {
oneof txo {
// The unspent transaction output
tari.types.TransactionOutput output = 1;
// If the TXO is spent, the commitment bytes are returned
bytes commitment = 2;
}
bytes mined_header = 3;
}

message SyncUtxosByBlockRequest {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,26 +57,33 @@ impl HorizonStateSync {
Err(err) => return err.into(),
};

let sync_peers = &mut self.sync_peers;
// Order sync peers according to accumulated difficulty
sync_peers.sort_by(|a, b| {
b.claimed_chain_metadata()
.accumulated_difficulty()
.cmp(&a.claimed_chain_metadata().accumulated_difficulty())
});

// Target horizon sync height based on the last header we have synced
let last_header = match shared.db.fetch_last_header().await {
Ok(h) => h,
Err(err) => return err.into(),
};
let target_horizon_sync_height = local_metadata.pruned_height_at_given_chain_tip(last_header.height);

let horizon_sync_height = local_metadata.horizon_block_height(last_header.height);
if local_metadata.pruned_height() >= horizon_sync_height {
info!(target: LOG_TARGET, "Horizon state was already synchronized.");
// Determine if we need to sync horizon state
if local_metadata.pruned_height() >= target_horizon_sync_height {
info!(target: LOG_TARGET, "Horizon state is already synchronized.");
return StateEvent::HorizonStateSynchronized;
}

// We're already synced because we have full blocks higher than our target pruned height
if local_metadata.best_block_height() >= horizon_sync_height {
if local_metadata.best_block_height() >= target_horizon_sync_height {
info!(
target: LOG_TARGET,
"Tip height is higher than our pruned height. Horizon state is already synchronized."
"Our tip height is higher than our target pruned height. Horizon state is already synchronized."
);
return StateEvent::HorizonStateSynchronized;
}
let sync_peers = &mut self.sync_peers;

let db = shared.db.clone();
let config = shared.config.blockchain_sync_config.clone();
Expand All @@ -90,7 +97,7 @@ impl HorizonStateSync {
connectivity,
rules,
sync_peers,
horizon_sync_height,
target_horizon_sync_height,
prover,
validator,
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,63 +62,71 @@ impl DecideNextSync {
);

if local_metadata.pruning_horizon() > 0 {
let last_header = match shared.db.fetch_last_header().await {
Ok(h) => h,
Err(err) => return err.into(),
};

let horizon_sync_height = local_metadata.horizon_block_height(last_header.height);
// Filter sync peers that claim to be able to provide blocks up until our pruned height
let sync_peers = self
.sync_peers
debug!(target: LOG_TARGET, "Local metadata: {}", local_metadata);
let mut sync_peers = self.sync_peers.clone();
let sync_peers = sync_peers
.drain(..)
.filter(|sync_peer| {
let remote_metadata = sync_peer.claimed_chain_metadata();
remote_metadata.best_block_height() >= horizon_sync_height
debug!(target: LOG_TARGET, "Peer metadata: {}", remote_metadata);
let remote_is_archival_node = remote_metadata.pruned_height() == 0;
let general_sync_conditions =
// Must be able to provide the correct amount of full blocks past the pruned height (i.e. the
// pruning horizon), otherwise our horizon spec will not be met
remote_metadata.best_block_height().saturating_sub(remote_metadata.pruned_height()) >=
local_metadata.pruning_horizon() &&
// Must have a better blockchain tip than us
remote_metadata.best_block_height() > local_metadata.best_block_height() &&
// Must be able to provide full blocks from the height we need detailed information
remote_metadata.pruned_height() <= local_metadata.best_block_height();
let sync_from_prune_node = !remote_is_archival_node &&
// Must have done initial sync (to detect genesis TXO spends)
local_metadata.best_block_height() > 0;
general_sync_conditions && (remote_is_archival_node || sync_from_prune_node)
})
.collect::<Vec<_>>();

if sync_peers.is_empty() {
warn!(
target: LOG_TARGET,
"Unable to find any appropriate sync peers for horizon sync"
"Unable to find any appropriate sync peers for horizon sync, trying for block sync"
);
return Continue;
}

debug!(
target: LOG_TARGET,
"Proceeding to horizon sync with {} sync peer(s) with a best latency of {:.2?}",
sync_peers.len(),
sync_peers.first().map(|p| p.latency()).unwrap_or_default()
);
ProceedToHorizonSync(sync_peers)
} else {
// Filter sync peers that are able to provide full blocks from our current tip
let sync_peers = self
.sync_peers
.drain(..)
.filter(|sync_peer| {
sync_peer.claimed_chain_metadata().pruned_height() <= local_metadata.best_block_height()
})
.collect::<Vec<_>>();

if sync_peers.is_empty() {
warn!(
} else {
debug!(
target: LOG_TARGET,
"Unable to find any appropriate sync peers for block sync"
"Proceeding to horizon sync with {} sync peer(s) with a best latency of {:.2?}",
sync_peers.len(),
sync_peers.first().map(|p| p.latency()).unwrap_or_default()
);
return Continue;
return ProceedToHorizonSync(sync_peers);
}
}

// This is not a pruned node or horizon sync is not possible, try for block sync

// Filter sync peers that are able to provide full blocks from our current tip
let sync_peers = self
.sync_peers
.drain(..)
.filter(|sync_peer| {
let remote_metadata = sync_peer.claimed_chain_metadata();
remote_metadata.pruned_height() <= local_metadata.best_block_height()
})
.collect::<Vec<_>>();

debug!(
target: LOG_TARGET,
"Proceeding to block sync with {} sync peer(s) with a best latency of {:.2?}",
sync_peers.len(),
sync_peers.first().map(|p| p.latency()).unwrap_or_default()
);
ProceedToBlockSync(sync_peers)
if sync_peers.is_empty() {
warn!(target: LOG_TARGET, "Unable to find any appropriate sync peers for block sync");
return Continue;
}

debug!(
target: LOG_TARGET,
"Proceeding to block sync with {} sync peer(s) with a best latency of {:.2?}",
sync_peers.len(),
sync_peers.first().map(|p| p.latency()).unwrap_or_default()
);
ProceedToBlockSync(sync_peers)
}
}

Expand Down
12 changes: 11 additions & 1 deletion base_layer/core/src/base_node/sync/horizon_state_sync/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ use tari_comms::{
};
use tari_crypto::errors::RangeProofError;
use tari_mmr::{error::MerkleMountainRangeError, sparse_merkle_tree::SMTError};
use tari_utilities::ByteArrayError;
use thiserror::Error;
use tokio::task;

Expand Down Expand Up @@ -97,6 +98,14 @@ pub enum HorizonSyncError {
PeerNotFound,
#[error("Sparse Merkle Tree error: {0}")]
SMTError(#[from] SMTError),
#[error("ByteArrayError error: {0}")]
ByteArrayError(String),
}

impl From<ByteArrayError> for HorizonSyncError {
fn from(e: ByteArrayError) -> Self {
HorizonSyncError::ByteArrayError(e.to_string())
}
}

impl From<TryFromIntError> for HorizonSyncError {
Expand Down Expand Up @@ -142,7 +151,8 @@ impl HorizonSyncError {
err @ HorizonSyncError::ConversionError(_) |
err @ HorizonSyncError::MerkleMountainRangeError(_) |
err @ HorizonSyncError::FixedHashSizeError(_) |
err @ HorizonSyncError::TransactionError(_) => Some(BanReason {
err @ HorizonSyncError::TransactionError(_) |
err @ HorizonSyncError::ByteArrayError(_) => Some(BanReason {
reason: format!("{}", err),
ban_duration: BanPeriod::Long,
}),
Expand Down
Loading
Loading