Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions crates/starfish/core/src/authority_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1327,7 +1327,7 @@ mod tests {
context::Context,
core::{Core, CoreSignals},
core_thread::{CoreError, CoreThreadDispatcher, tests::MockCoreThreadDispatcher},
dag_state::DagState,
dag_state::{DagState, TransactionSource},
encoder::create_encoder,
error::{ConsensusError, ConsensusResult},
leader_schedule::LeaderSchedule,
Expand Down Expand Up @@ -2062,7 +2062,7 @@ mod tests {
async fn add_transactions(
&self,
_transactions: Vec<VerifiedTransactions>,
_source: &'static str,
_source: TransactionSource,
) -> Result<(), CoreError> {
unimplemented!("Unimplemented")
}
Expand Down
4 changes: 2 additions & 2 deletions crates/starfish/core/src/block_manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use crate::{
},
block_manager::block_suspender::BlockSuspender,
context::Context,
dag_state::DagState,
dag_state::{DagState, TransactionSource},
};

/// Block manager suspends incoming blocks until they are connected to the
Expand Down Expand Up @@ -143,7 +143,7 @@ impl BlockManager {
let mut write_guard = self.dag_state.write();
write_guard.accept_block_headers(block_headers);
for verified_transaction in transactions {
write_guard.add_transactions(verified_transaction, "Block streaming");
write_guard.add_transactions(verified_transaction, TransactionSource::BlockStreaming);
}
}

Expand Down
4 changes: 2 additions & 2 deletions crates/starfish/core/src/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ use crate::{
commit::{CertifiedCommits, PendingSubDag},
commit_observer::CommitObserver,
context::Context,
dag_state::DagState,
dag_state::{DagState, TransactionSource},
encoder::{ShardEncoder, create_encoder},
error::{ConsensusError, ConsensusResult},
leader_schedule::LeaderSchedule,
Expand Down Expand Up @@ -383,7 +383,7 @@ impl Core {
pub(crate) fn add_transactions(
&mut self,
transactions: Vec<VerifiedTransactions>,
source: &str,
source: TransactionSource,
) -> ConsensusResult<()> {
let _scope = monitored_scope("Core::add_transactions");
let _s = self
Expand Down
14 changes: 9 additions & 5 deletions crates/starfish/core/src/core_thread.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use crate::{
context::Context,
core::Core,
core_thread::CoreError::Shutdown,
dag_state::DagState,
dag_state::{DagState, TransactionSource},
error::{ConsensusError, ConsensusResult},
};

Expand Down Expand Up @@ -74,7 +74,11 @@ enum CoreThreadCommand {
/// that have these blocks.
GetMissingBlocks(oneshot::Sender<BTreeMap<BlockRef, BTreeSet<AuthorityIndex>>>),
/// Add transactions to be processed and accepted
AddTransactions(Vec<VerifiedTransactions>, oneshot::Sender<()>, &'static str),
AddTransactions(
Vec<VerifiedTransactions>,
oneshot::Sender<()>,
TransactionSource,
),
/// Add shards to the dag_state
AddShards(Vec<VerifiedOwnShard>, oneshot::Sender<()>),
/// Get missing transaction data that need to be synced
Expand Down Expand Up @@ -116,7 +120,7 @@ pub trait CoreThreadDispatcher: Sync + Send + 'static {
async fn add_transactions(
&self,
transactions: Vec<VerifiedTransactions>,
source: &'static str,
source: TransactionSource,
) -> Result<(), CoreError>;

async fn add_shards(&self, shards: Vec<VerifiedOwnShard>) -> Result<(), CoreError>;
Expand Down Expand Up @@ -382,7 +386,7 @@ impl CoreThreadDispatcher for ChannelCoreThreadDispatcher {
async fn add_transactions(
&self,
transactions: Vec<VerifiedTransactions>,
source: &'static str,
source: TransactionSource,
) -> Result<(), CoreError> {
let (sender, receiver) = oneshot::channel();
self.send(CoreThreadCommand::AddTransactions(
Expand Down Expand Up @@ -581,7 +585,7 @@ pub(crate) mod tests {
async fn add_transactions(
&self,
_transactions: Vec<VerifiedTransactions>,
_source: &'static str,
_source: TransactionSource,
) -> Result<(), CoreError> {
unimplemented!()
}
Expand Down
95 changes: 87 additions & 8 deletions crates/starfish/core/src/dag_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,51 @@ use crate::{
threshold_clock::ThresholdClock,
};

/// Represents the source from which transactions were received and added to the
/// DAG state. This is used for metrics tracking and debugging.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub(crate) enum TransactionSource {
/// Transactions received via the transaction synchronizer component.
/// This synchronizer periodically fetches missing transactions to ensure
/// nodes stay up-to-date.
TransactionSynchronizer,

/// Data received via block streaming from peers in the network.
/// This is the primary method for receiving real-time blocks and
/// transactions as they're created.
BlockStreaming,

/// Transactions reconstructed from erasure-coded shards.
/// Used when full transaction data isn't available, but enough shards
/// have been collected to reconstruct it.
ShardReconstructor,

/// Data added during testing.
/// Only used in test code.
#[cfg(test)]
Test,
}

impl TransactionSource {
/// Returns the string label used for metrics reporting.
/// This ensures consistency with existing metrics that may be monitored.
pub(crate) fn as_str(&self) -> &'static str {
match self {
TransactionSource::TransactionSynchronizer => "Transactions synchronizer",
TransactionSource::BlockStreaming => "Block streaming",
TransactionSource::ShardReconstructor => "Shard reconstructor",
#[cfg(test)]
TransactionSource::Test => "test",
}
}
}

impl std::fmt::Display for TransactionSource {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.as_str())
}
}

/// If a shard from a block created by authority v1 is useful to authority v2 at
/// round r, then shards from v1 will be sent to v2 up to round r +
/// MAX_ROUND_GAP_FOR_USEFUL_SHARDS.
Expand Down Expand Up @@ -334,7 +379,11 @@ impl DagState {
.inc();
}

pub(crate) fn add_transactions(&mut self, transactions: VerifiedTransactions, source: &str) {
pub(crate) fn add_transactions(
&mut self,
transactions: VerifiedTransactions,
source: TransactionSource,
) {
let block_ref = transactions.block_ref();
if self
.recent_transactions
Expand All @@ -345,7 +394,7 @@ impl DagState {
.metrics
.node_metrics
.accepted_transactions
.with_label_values(&[source])
.with_label_values(&[source.as_str()])
.inc();
tracing::debug!("Adding transactions for block ref: {block_ref}");
let has_transactions = transactions.has_transactions();
Expand Down Expand Up @@ -2483,7 +2532,7 @@ mod test {
let later_commits = commits.split_off(5);
dag_state.accept_block_headers(dag_builder.block_headers(1..=5));
for verified_transactions in dag_builder.transactions(1..=5).into_iter() {
dag_state.add_transactions(verified_transactions, "test");
dag_state.add_transactions(verified_transactions, TransactionSource::Test);
}

for commit in commits.clone() {
Expand All @@ -2504,7 +2553,7 @@ mod test {
// Add the rest of the block headers, transaction, and commits to the dag state
dag_state.accept_block_headers(dag_builder.block_headers(6..=num_rounds));
for verified_transactions in dag_builder.transactions(6..=num_rounds).into_iter() {
dag_state.add_transactions(verified_transactions, "test");
dag_state.add_transactions(verified_transactions, TransactionSource::Test);
}
for commit in later_commits.clone() {
dag_state.add_commit(commit);
Expand Down Expand Up @@ -3098,7 +3147,8 @@ mod test {
)
.unwrap();
} else {
dag_state.add_transactions(block.verified_transactions.clone(), "test");
dag_state
.add_transactions(block.verified_transactions.clone(), TransactionSource::Test);
}
});

Expand Down Expand Up @@ -3196,7 +3246,7 @@ mod test {

dag_state.accept_block_headers(dag_builder.block_headers(1..=num_rounds));
for verified_transactions in dag_builder.transactions(1..=num_rounds).into_iter() {
dag_state.add_transactions(verified_transactions, "test");
dag_state.add_transactions(verified_transactions, TransactionSource::Test);
}

for commit in commits.clone() {
Expand Down Expand Up @@ -3368,7 +3418,7 @@ mod test {

// add transactions for all blocks
blocks.into_iter().for_each(|block| {
dag_state.add_transactions(block.verified_transactions, "test");
dag_state.add_transactions(block.verified_transactions, TransactionSource::Test);
});

assert!(dag_state.pending_acknowledgments.is_empty());
Expand Down Expand Up @@ -3410,7 +3460,7 @@ mod test {
transaction_commitment,
serialized,
);
dag_state.add_transactions(verified_transaction, "test");
dag_state.add_transactions(verified_transaction, TransactionSource::Test);
}
}
assert_eq!(
Expand All @@ -3421,4 +3471,33 @@ mod test {
assert!(dag_state.pending_acknowledgments.contains(block_ref));
}
}

#[test]
fn test_transaction_source_labels() {
assert_eq!(
TransactionSource::TransactionSynchronizer.as_str(),
"Transactions synchronizer"
);
assert_eq!(
TransactionSource::BlockStreaming.as_str(),
"Block streaming"
);
assert_eq!(
TransactionSource::ShardReconstructor.as_str(),
"Shard reconstructor"
);
assert_eq!(TransactionSource::Test.as_str(), "test");
}

#[test]
fn test_transaction_source_display() {
assert_eq!(
format!("{}", TransactionSource::TransactionSynchronizer),
"Transactions synchronizer"
);
assert_eq!(
format!("{}", TransactionSource::BlockStreaming),
"Block streaming"
);
}
}
22 changes: 17 additions & 5 deletions crates/starfish/core/src/data_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ mod tests {
block_header::{BlockRef, genesis_block_headers, genesis_blocks},
commit::{CommitRef, PendingSubDag},
context::Context,
dag_state::DagState,
dag_state::{DagState, TransactionSource},
test_dag_builder::DagBuilder,
};

Expand Down Expand Up @@ -286,7 +286,10 @@ mod tests {
for (i, block) in genesis_blocks.iter().enumerate() {
state.accept_block_header(block.verified_block_header.clone());
if !excluded_transactions.contains(&(0, i)) {
state.add_transactions(block.verified_transactions.clone(), "test");
state.add_transactions(
block.verified_transactions.clone(),
TransactionSource::Test,
);
}
}
}
Expand All @@ -301,7 +304,10 @@ mod tests {
for (i, block) in blocks.iter().enumerate() {
state.accept_block_header(block.verified_block_header.clone());
if !excluded_transactions.contains(&(round, i)) {
state.add_transactions(block.verified_transactions.clone(), "test");
state.add_transactions(
block.verified_transactions.clone(),
TransactionSource::Test,
);
}
}
}
Expand Down Expand Up @@ -333,12 +339,18 @@ mod tests {
if round == 0 {
let genesis_blocks = genesis_blocks(&self.context);
if let Some(block) = genesis_blocks.get(block_index) {
state.add_transactions(block.verified_transactions.clone(), "test");
state.add_transactions(
block.verified_transactions.clone(),
TransactionSource::Test,
);
}
} else {
let blocks = self.dag_builder.blocks(round..=round);
if let Some(block) = blocks.get(block_index) {
state.add_transactions(block.verified_transactions.clone(), "test");
state.add_transactions(
block.verified_transactions.clone(),
TransactionSource::Test,
);
}
}
}
Expand Down
8 changes: 4 additions & 4 deletions crates/starfish/core/src/shard_reconstructor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use crate::{
},
context::Context,
core_thread::CoreThreadDispatcher,
dag_state::DagState,
dag_state::{DagState, TransactionSource},
decoder::{ShardsDecoder, create_decoder},
encoder::{ShardEncoder, create_encoder},
error::{ConsensusError, ConsensusResult},
Expand Down Expand Up @@ -559,7 +559,7 @@ impl<C: CoreThreadDispatcher> ShardReconstructor<C> {

// Add the transactions to the core
self.core_dispatcher
.add_transactions(transactions, "Shard reconstructor")
.add_transactions(transactions, TransactionSource::ShardReconstructor)
.await
.map_err(|_| ConsensusError::Shutdown)?;
}
Expand Down Expand Up @@ -654,7 +654,7 @@ mod tests {
commit::CertifiedCommits,
context::Context,
core_thread::{CoreError, CoreThreadDispatcher},
dag_state::DagState,
dag_state::{DagState, TransactionSource},
encoder::create_encoder,
shard_reconstructor::{
FullTransactionMessage, ShardMessage, ShardReconstructor, TransactionMessage,
Expand Down Expand Up @@ -683,7 +683,7 @@ mod tests {
async fn add_transactions(
&self,
txs: Vec<VerifiedTransactions>,
_source: &'static str,
_source: TransactionSource,
) -> Result<(), CoreError> {
let mut guard = self.transactions.lock().await;
guard.extend(txs);
Expand Down
4 changes: 2 additions & 2 deletions crates/starfish/core/src/synchronizer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1419,7 +1419,7 @@ mod tests {
commit_vote_monitor::CommitVoteMonitor,
context::Context,
core_thread::{CoreError, CoreThreadDispatcher, tests::MockCoreThreadDispatcher},
dag_state::DagState,
dag_state::{DagState, TransactionSource},
error::{ConsensusError, ConsensusResult},
network::{BlockBundleStream, NetworkClient},
storage::mem_store::MemStore,
Expand Down Expand Up @@ -2378,7 +2378,7 @@ mod tests {
async fn add_transactions(
&self,
_transactions: Vec<VerifiedTransactions>,
_source: &'static str,
_source: TransactionSource,
) -> Result<(), CoreError> {
unimplemented!("Unimplemented")
}
Expand Down
Loading
Loading