Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions crates/starfish/core/src/authority_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1327,7 +1327,7 @@ mod tests {
context::Context,
core::{Core, CoreSignals},
core_thread::{CoreError, CoreThreadDispatcher, tests::MockCoreThreadDispatcher},
dag_state::DagState,
dag_state::{DagState, TransactionSource},
encoder::create_encoder,
error::{ConsensusError, ConsensusResult},
leader_schedule::LeaderSchedule,
Expand Down Expand Up @@ -2062,7 +2062,7 @@ mod tests {
async fn add_transactions(
&self,
_transactions: Vec<VerifiedTransactions>,
_source: &'static str,
_source: TransactionSource,
) -> Result<(), CoreError> {
unimplemented!("Unimplemented")
}
Expand Down
4 changes: 2 additions & 2 deletions crates/starfish/core/src/block_manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use crate::{
},
block_manager::block_suspender::BlockSuspender,
context::Context,
dag_state::DagState,
dag_state::{DagState, TransactionSource},
};

/// Block manager suspends incoming blocks until they are connected to the
Expand Down Expand Up @@ -143,7 +143,7 @@ impl BlockManager {
let mut write_guard = self.dag_state.write();
write_guard.accept_block_headers(block_headers);
for verified_transaction in transactions {
write_guard.add_transactions(verified_transaction, "Block streaming");
write_guard.add_transactions(verified_transaction, TransactionSource::BlockStreaming);
}
}

Expand Down
4 changes: 2 additions & 2 deletions crates/starfish/core/src/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ use crate::{
commit::{CertifiedCommits, PendingSubDag},
commit_observer::CommitObserver,
context::Context,
dag_state::DagState,
dag_state::{DagState, TransactionSource},
encoder::{ShardEncoder, create_encoder},
error::{ConsensusError, ConsensusResult},
leader_schedule::LeaderSchedule,
Expand Down Expand Up @@ -383,7 +383,7 @@ impl Core {
pub(crate) fn add_transactions(
&mut self,
transactions: Vec<VerifiedTransactions>,
source: &str,
source: TransactionSource,
) -> ConsensusResult<()> {
let _scope = monitored_scope("Core::add_transactions");
let _s = self
Expand Down
14 changes: 9 additions & 5 deletions crates/starfish/core/src/core_thread.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use crate::{
context::Context,
core::Core,
core_thread::CoreError::Shutdown,
dag_state::DagState,
dag_state::{DagState, TransactionSource},
error::{ConsensusError, ConsensusResult},
};

Expand Down Expand Up @@ -74,7 +74,11 @@ enum CoreThreadCommand {
/// that have these blocks.
GetMissingBlocks(oneshot::Sender<BTreeMap<BlockRef, BTreeSet<AuthorityIndex>>>),
/// Add transactions to be processed and accepted
AddTransactions(Vec<VerifiedTransactions>, oneshot::Sender<()>, &'static str),
AddTransactions(
Vec<VerifiedTransactions>,
oneshot::Sender<()>,
TransactionSource,
),
/// Add shards to the dag_state
AddShards(Vec<VerifiedOwnShard>, oneshot::Sender<()>),
/// Get missing transaction data that need to be synced
Expand Down Expand Up @@ -116,7 +120,7 @@ pub trait CoreThreadDispatcher: Sync + Send + 'static {
async fn add_transactions(
&self,
transactions: Vec<VerifiedTransactions>,
source: &'static str,
source: TransactionSource,
) -> Result<(), CoreError>;

async fn add_shards(&self, shards: Vec<VerifiedOwnShard>) -> Result<(), CoreError>;
Expand Down Expand Up @@ -382,7 +386,7 @@ impl CoreThreadDispatcher for ChannelCoreThreadDispatcher {
async fn add_transactions(
&self,
transactions: Vec<VerifiedTransactions>,
source: &'static str,
source: TransactionSource,
) -> Result<(), CoreError> {
let (sender, receiver) = oneshot::channel();
self.send(CoreThreadCommand::AddTransactions(
Expand Down Expand Up @@ -581,7 +585,7 @@ pub(crate) mod tests {
async fn add_transactions(
&self,
_transactions: Vec<VerifiedTransactions>,
_source: &'static str,
_source: TransactionSource,
) -> Result<(), CoreError> {
unimplemented!()
}
Expand Down
66 changes: 58 additions & 8 deletions crates/starfish/core/src/dag_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,51 @@ use crate::{
threshold_clock::ThresholdClock,
};

/// Represents the source from which transactions were received and added to the
/// DAG state. This is used for metrics tracking and debugging.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub(crate) enum TransactionSource {
/// Transactions received via the transaction synchronizer component.
/// This synchronizer periodically fetches missing transactions to ensure
/// nodes stay up-to-date.
TransactionSynchronizer,

/// Data received via block streaming from peers in the network.
/// This is the primary method for receiving real-time blocks and
/// transactions as they're created.
BlockStreaming,

/// Transactions reconstructed from erasure-coded shards.
/// Used when full transaction data isn't available, but enough shards
/// have been collected to reconstruct it.
ShardReconstructor,

/// Data added during testing.
/// Only used in test code.
#[cfg(test)]
Test,
}

impl TransactionSource {
/// Returns the string label used for metrics reporting.
/// This ensures consistency with existing metrics that may be monitored.
pub(crate) fn as_str(&self) -> &'static str {
match self {
TransactionSource::TransactionSynchronizer => "Transactions synchronizer",
TransactionSource::BlockStreaming => "Block streaming",
TransactionSource::ShardReconstructor => "Shard reconstructor",
#[cfg(test)]
TransactionSource::Test => "test",
}
}
}

impl std::fmt::Display for TransactionSource {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.as_str())
}
}

/// If a shard from a block created by authority v1 is useful to authority v2 at
/// round r, then shards from v1 will be sent to v2 up to round r +
/// MAX_ROUND_GAP_FOR_USEFUL_SHARDS.
Expand Down Expand Up @@ -334,7 +379,11 @@ impl DagState {
.inc();
}

pub(crate) fn add_transactions(&mut self, transactions: VerifiedTransactions, source: &str) {
pub(crate) fn add_transactions(
&mut self,
transactions: VerifiedTransactions,
source: TransactionSource,
) {
let block_ref = transactions.block_ref();
if self
.recent_transactions
Expand All @@ -345,7 +394,7 @@ impl DagState {
.metrics
.node_metrics
.accepted_transactions
.with_label_values(&[source])
.with_label_values(&[source.as_str()])
.inc();
tracing::debug!("Adding transactions for block ref: {block_ref}");
let has_transactions = transactions.has_transactions();
Expand Down Expand Up @@ -2483,7 +2532,7 @@ mod test {
let later_commits = commits.split_off(5);
dag_state.accept_block_headers(dag_builder.block_headers(1..=5));
for verified_transactions in dag_builder.transactions(1..=5).into_iter() {
dag_state.add_transactions(verified_transactions, "test");
dag_state.add_transactions(verified_transactions, TransactionSource::Test);
}

for commit in commits.clone() {
Expand All @@ -2504,7 +2553,7 @@ mod test {
// Add the rest of the block headers, transaction, and commits to the dag state
dag_state.accept_block_headers(dag_builder.block_headers(6..=num_rounds));
for verified_transactions in dag_builder.transactions(6..=num_rounds).into_iter() {
dag_state.add_transactions(verified_transactions, "test");
dag_state.add_transactions(verified_transactions, TransactionSource::Test);
}
for commit in later_commits.clone() {
dag_state.add_commit(commit);
Expand Down Expand Up @@ -3098,7 +3147,8 @@ mod test {
)
.unwrap();
} else {
dag_state.add_transactions(block.verified_transactions.clone(), "test");
dag_state
.add_transactions(block.verified_transactions.clone(), TransactionSource::Test);
}
});

Expand Down Expand Up @@ -3196,7 +3246,7 @@ mod test {

dag_state.accept_block_headers(dag_builder.block_headers(1..=num_rounds));
for verified_transactions in dag_builder.transactions(1..=num_rounds).into_iter() {
dag_state.add_transactions(verified_transactions, "test");
dag_state.add_transactions(verified_transactions, TransactionSource::Test);
}

for commit in commits.clone() {
Expand Down Expand Up @@ -3368,7 +3418,7 @@ mod test {

// add transactions for all blocks
blocks.into_iter().for_each(|block| {
dag_state.add_transactions(block.verified_transactions, "test");
dag_state.add_transactions(block.verified_transactions, TransactionSource::Test);
});

assert!(dag_state.pending_acknowledgments.is_empty());
Expand Down Expand Up @@ -3410,7 +3460,7 @@ mod test {
transaction_commitment,
serialized,
);
dag_state.add_transactions(verified_transaction, "test");
dag_state.add_transactions(verified_transaction, TransactionSource::Test);
}
}
assert_eq!(
Expand Down
22 changes: 17 additions & 5 deletions crates/starfish/core/src/data_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ mod tests {
block_header::{BlockRef, genesis_block_headers, genesis_blocks},
commit::{CommitRef, PendingSubDag},
context::Context,
dag_state::DagState,
dag_state::{DagState, TransactionSource},
test_dag_builder::DagBuilder,
};

Expand Down Expand Up @@ -286,7 +286,10 @@ mod tests {
for (i, block) in genesis_blocks.iter().enumerate() {
state.accept_block_header(block.verified_block_header.clone());
if !excluded_transactions.contains(&(0, i)) {
state.add_transactions(block.verified_transactions.clone(), "test");
state.add_transactions(
block.verified_transactions.clone(),
TransactionSource::Test,
);
}
}
}
Expand All @@ -301,7 +304,10 @@ mod tests {
for (i, block) in blocks.iter().enumerate() {
state.accept_block_header(block.verified_block_header.clone());
if !excluded_transactions.contains(&(round, i)) {
state.add_transactions(block.verified_transactions.clone(), "test");
state.add_transactions(
block.verified_transactions.clone(),
TransactionSource::Test,
);
}
}
}
Expand Down Expand Up @@ -333,12 +339,18 @@ mod tests {
if round == 0 {
let genesis_blocks = genesis_blocks(&self.context);
if let Some(block) = genesis_blocks.get(block_index) {
state.add_transactions(block.verified_transactions.clone(), "test");
state.add_transactions(
block.verified_transactions.clone(),
TransactionSource::Test,
);
}
} else {
let blocks = self.dag_builder.blocks(round..=round);
if let Some(block) = blocks.get(block_index) {
state.add_transactions(block.verified_transactions.clone(), "test");
state.add_transactions(
block.verified_transactions.clone(),
TransactionSource::Test,
);
}
}
}
Expand Down
8 changes: 4 additions & 4 deletions crates/starfish/core/src/shard_reconstructor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use crate::{
},
context::Context,
core_thread::CoreThreadDispatcher,
dag_state::DagState,
dag_state::{DagState, TransactionSource},
decoder::{ShardsDecoder, create_decoder},
encoder::{ShardEncoder, create_encoder},
error::{ConsensusError, ConsensusResult},
Expand Down Expand Up @@ -559,7 +559,7 @@ impl<C: CoreThreadDispatcher> ShardReconstructor<C> {

// Add the transactions to the core
self.core_dispatcher
.add_transactions(transactions, "Shard reconstructor")
.add_transactions(transactions, TransactionSource::ShardReconstructor)
.await
.map_err(|_| ConsensusError::Shutdown)?;
}
Expand Down Expand Up @@ -654,7 +654,7 @@ mod tests {
commit::CertifiedCommits,
context::Context,
core_thread::{CoreError, CoreThreadDispatcher},
dag_state::DagState,
dag_state::{DagState, TransactionSource},
encoder::create_encoder,
shard_reconstructor::{
FullTransactionMessage, ShardMessage, ShardReconstructor, TransactionMessage,
Expand Down Expand Up @@ -683,7 +683,7 @@ mod tests {
async fn add_transactions(
&self,
txs: Vec<VerifiedTransactions>,
_source: &'static str,
_source: TransactionSource,
) -> Result<(), CoreError> {
let mut guard = self.transactions.lock().await;
guard.extend(txs);
Expand Down
4 changes: 2 additions & 2 deletions crates/starfish/core/src/synchronizer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1419,7 +1419,7 @@ mod tests {
commit_vote_monitor::CommitVoteMonitor,
context::Context,
core_thread::{CoreError, CoreThreadDispatcher, tests::MockCoreThreadDispatcher},
dag_state::DagState,
dag_state::{DagState, TransactionSource},
error::{ConsensusError, ConsensusResult},
network::{BlockBundleStream, NetworkClient},
storage::mem_store::MemStore,
Expand Down Expand Up @@ -2378,7 +2378,7 @@ mod tests {
async fn add_transactions(
&self,
_transactions: Vec<VerifiedTransactions>,
_source: &'static str,
_source: TransactionSource,
) -> Result<(), CoreError> {
unimplemented!("Unimplemented")
}
Expand Down
6 changes: 3 additions & 3 deletions crates/starfish/core/src/test_dag_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use crate::{
},
commit::{CertifiedCommit, CommitDigest, TrustedCommit, WAVE_LENGTH},
context::Context,
dag_state::DagState,
dag_state::{DagState, TransactionSource},
encoder::{ShardEncoder, create_encoder},
leader_schedule::{LeaderSchedule, LeaderSwapTable},
linearizer::{BlockStoreAPI, Linearizer},
Expand Down Expand Up @@ -419,7 +419,7 @@ impl DagBuilder {
for block_transactions in self.transactions.values() {
dag_state
.write()
.add_transactions(block_transactions.clone(), "test");
.add_transactions(block_transactions.clone(), TransactionSource::Test);
}
}

Expand Down Expand Up @@ -895,7 +895,7 @@ impl<'a> LayerBuilder<'a> {
let mut dag_state = dag_state.write();
dag_state.accept_block_headers(self.block_headers.clone());
for transactions in self.transactions.clone() {
dag_state.add_transactions(transactions, "test");
dag_state.add_transactions(transactions, TransactionSource::Test);
}
}

Expand Down
6 changes: 3 additions & 3 deletions crates/starfish/core/src/transactions_synchronizer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ use crate::{
block_verifier::BlockVerifier,
context::Context,
core_thread::CoreThreadDispatcher,
dag_state::DagState,
dag_state::{DagState, TransactionSource},
encoder::create_encoder,
error::{ConsensusError, ConsensusResult},
network::{NetworkClient, SerializedTransactions},
Expand Down Expand Up @@ -969,7 +969,7 @@ impl<C: NetworkClient, V: BlockVerifier, D: CoreThreadDispatcher>

// Add the transactions to the core
core_dispatcher
.add_transactions(transactions, "Transactions synchronizer")
.add_transactions(transactions, TransactionSource::TransactionSynchronizer)
.await
.map_err(|_| ConsensusError::Shutdown)?;

Expand Down Expand Up @@ -2047,7 +2047,7 @@ mod tests {
async fn add_transactions(
&self,
transactions: Vec<VerifiedTransactions>,
_source: &'static str,
_source: TransactionSource,
) -> Result<(), CoreError> {
let mut txns = self.transactions.lock().await;

Expand Down
Loading