Skip to content

Commit

Permalink
Add test: test_scheduler_producing_blocks()
Browse files Browse the repository at this point in the history
  • Loading branch information
ryoqun committed Jan 17, 2025
1 parent 8935353 commit 0d3de56
Show file tree
Hide file tree
Showing 3 changed files with 139 additions and 5 deletions.
12 changes: 12 additions & 0 deletions core/src/banking_trace.rs
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,18 @@ pub struct Channels {
}

impl Channels {
#[cfg(feature = "dev-context-only-utils")]
pub fn unified_sender(&self) -> &BankingPacketSender {
let unified_sender = &self.non_vote_sender;
assert!(unified_sender
.sender
.same_channel(&self.tpu_vote_sender.sender));
assert!(unified_sender
.sender
.same_channel(&self.gossip_vote_sender.sender));
unified_sender
}

pub(crate) fn unified_receiver(&self) -> &BankingPacketReceiver {
let unified_receiver = &self.non_vote_receiver;
assert!(unified_receiver.same_channel(&self.tpu_vote_receiver));
Expand Down
131 changes: 126 additions & 5 deletions core/tests/unified_scheduler.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
use {
agave_banking_stage_ingress_types::BankingPacketBatch,
assert_matches::assert_matches,
crossbeam_channel::unbounded,
itertools::Itertools,
log::*,
solana_core::{
banking_stage::unified_scheduler::ensure_banking_stage_setup,
banking_trace::BankingTracer,
consensus::{
heaviest_subtree_fork_choice::HeaviestSubtreeForkChoice,
progress_map::{ForkProgress, ProgressMap},
Expand All @@ -14,22 +18,36 @@ use {
replay_stage::ReplayStage,
unfrozen_gossip_verified_vote_hashes::UnfrozenGossipVerifiedVoteHashes,
},
solana_ledger::genesis_utils::create_genesis_config,
solana_entry::entry::Entry,
solana_gossip::cluster_info::{ClusterInfo, Node},
solana_ledger::{
blockstore::Blockstore, create_new_tmp_ledger_auto_delete,
genesis_utils::create_genesis_config, leader_schedule_cache::LeaderScheduleCache,
},
solana_perf::packet::to_packet_batches,
solana_poh::poh_recorder::create_test_recorder,
solana_runtime::{
accounts_background_service::AbsRequestSender, bank::Bank, bank_forks::BankForks,
genesis_utils::GenesisConfigInfo, installed_scheduler_pool::SchedulingContext,
prioritization_fee_cache::PrioritizationFeeCache,
},
solana_runtime_transaction::runtime_transaction::RuntimeTransaction,
solana_sdk::{hash::Hash, pubkey::Pubkey, system_transaction, transaction::Result},
solana_sdk::{
hash::Hash, pubkey::Pubkey, signature::Signer, signer::keypair::Keypair,
system_transaction, transaction::Result,
},
solana_streamer::socket::SocketAddrSpace,
solana_timings::ExecuteTimings,
solana_unified_scheduler_logic::Task,
solana_unified_scheduler_logic::{SchedulingMode, Task},
solana_unified_scheduler_pool::{
DefaultTaskHandler, HandlerContext, PooledScheduler, SchedulerPool, TaskHandler,
DefaultSchedulerPool, DefaultTaskHandler, HandlerContext, PooledScheduler, SchedulerPool,
SupportedSchedulingMode, TaskHandler,
},
std::{
collections::HashMap,
sync::{Arc, Mutex},
sync::{atomic::Ordering, Arc, Mutex},
thread::sleep,
time::Duration,
},
};

Expand Down Expand Up @@ -185,3 +203,106 @@ fn test_scheduler_waited_by_drop_bank_service() {
// the scheduler used by the pruned_bank have been returned now.
assert_eq!(pool_raw.pooled_scheduler_count(), 1);
}

#[test]
fn test_scheduler_producing_blocks() {
solana_logger::setup();

let GenesisConfigInfo {
genesis_config,
mint_keypair,
..
} = create_genesis_config(10_000);
let (ledger_path, _blockhash) = create_new_tmp_ledger_auto_delete!(&genesis_config);
let blockstore = Arc::new(Blockstore::open(ledger_path.path()).unwrap());

// Setup bank_forks with block-producing unified scheduler enabled
let genesis_bank = Bank::new_for_tests(&genesis_config);
let bank_forks = BankForks::new_rw_arc(genesis_bank);
let ignored_prioritization_fee_cache = Arc::new(PrioritizationFeeCache::new(0u64));
let genesis_bank = bank_forks.read().unwrap().working_bank_with_scheduler();
genesis_bank.set_fork_graph_in_program_cache(Arc::downgrade(&bank_forks));
let leader_schedule_cache = Arc::new(LeaderScheduleCache::new_from_bank(&genesis_bank));
let (exit, poh_recorder, poh_service, signal_receiver) = create_test_recorder(
genesis_bank.clone(),
blockstore.clone(),
None,
Some(leader_schedule_cache),
);
let pool = DefaultSchedulerPool::new(
SupportedSchedulingMode::Either(SchedulingMode::BlockProduction),
None,
None,
None,
None,
ignored_prioritization_fee_cache,
poh_recorder.read().unwrap().new_recorder(),
);
let channels = {
let banking_tracer = BankingTracer::new_disabled();
banking_tracer.create_channels_for_scheduler_pool(&pool)
};
let cluster_info = {
let keypair = Arc::new(Keypair::new());
let node = Node::new_localhost_with_pubkey(&keypair.pubkey());
Arc::new(ClusterInfo::new(
node.info,
keypair,
SocketAddrSpace::Unspecified,
))
};
ensure_banking_stage_setup(&pool, &bank_forks, &channels, &cluster_info, &poh_recorder);
bank_forks.write().unwrap().install_scheduler_pool(pool);

// Wait until genesis_bank reaches its tick height...
while poh_recorder.read().unwrap().bank().is_some() {
sleep(Duration::from_millis(100));
}

// Create test tx
let tx = system_transaction::transfer(
&mint_keypair,
&solana_pubkey::new_rand(),
1,
genesis_config.hash(),
);
let banking_packet_batch = BankingPacketBatch::new(to_packet_batches(&vec![tx.clone(); 1], 1));
let tx = RuntimeTransaction::from_transaction_for_tests(tx);

// Crate tpu_bank
let tpu_bank = Bank::new_from_parent(genesis_bank.clone(), &Pubkey::default(), 2);
let tpu_bank = bank_forks
.write()
.unwrap()
.insert_with_scheduling_mode(SchedulingMode::BlockProduction, tpu_bank);
poh_recorder
.write()
.unwrap()
.set_bank(tpu_bank.clone_with_scheduler(), false);
tpu_bank.unblock_block_production();
let tpu_bank = bank_forks.read().unwrap().working_bank_with_scheduler();
assert_eq!(tpu_bank.transaction_count(), 0);

// Now, send transaction
channels
.unified_sender()
.send(banking_packet_batch)
.unwrap();

// Wait until tpu_bank reaches its tick height...
while poh_recorder.read().unwrap().bank().is_some() {
sleep(Duration::from_millis(100));
}
assert_matches!(tpu_bank.wait_for_completed_scheduler(), Some((Ok(()), _)));

// Verify transactions are committed and poh-recorded
assert_eq!(tpu_bank.transaction_count(), 1);
assert_matches!(
signal_receiver.into_iter().find(|(_, (entry, _))| !entry.is_tick()),
Some((_, (Entry {transactions, ..}, _))) if transactions == [tx.to_versioned_transaction()]
);

// Stop things.
exit.store(true, Ordering::Relaxed);
poh_service.join().unwrap();
}
1 change: 1 addition & 0 deletions unified-scheduler-pool/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -729,6 +729,7 @@ impl TaskHandler for DefaultTaskHandler {
} = handler_context
.transaction_recorder
.record_transactions(bank.slot(), vec![transaction.to_versioned_transaction()]);
trace!("pre_commit_callback: poh: {result:?}");
match result {
Ok(()) => Ok(starting_transaction_index),
Err(_) => Err(TransactionError::CommitCancelled),
Expand Down

0 comments on commit 0d3de56

Please sign in to comment.