diff --git a/core/src/banking_trace.rs b/core/src/banking_trace.rs index b991999446d6f1..9a2c6fe3c0c688 100644 --- a/core/src/banking_trace.rs +++ b/core/src/banking_trace.rs @@ -186,6 +186,18 @@ pub struct Channels { } impl Channels { + #[cfg(feature = "dev-context-only-utils")] + pub fn unified_sender(&self) -> &BankingPacketSender { + let unified_sender = &self.non_vote_sender; + assert!(unified_sender + .sender + .same_channel(&self.tpu_vote_sender.sender)); + assert!(unified_sender + .sender + .same_channel(&self.gossip_vote_sender.sender)); + unified_sender + } + pub(crate) fn unified_receiver(&self) -> &BankingPacketReceiver { let unified_receiver = &self.non_vote_receiver; assert!(unified_receiver.same_channel(&self.tpu_vote_receiver)); diff --git a/core/tests/unified_scheduler.rs b/core/tests/unified_scheduler.rs index ecd41dde68608a..45fb352cfe885b 100644 --- a/core/tests/unified_scheduler.rs +++ b/core/tests/unified_scheduler.rs @@ -1,8 +1,12 @@ use { + agave_banking_stage_ingress_types::BankingPacketBatch, + assert_matches::assert_matches, crossbeam_channel::unbounded, itertools::Itertools, log::*, solana_core::{ + banking_stage::unified_scheduler::ensure_banking_stage_setup, + banking_trace::BankingTracer, consensus::{ heaviest_subtree_fork_choice::HeaviestSubtreeForkChoice, progress_map::{ForkProgress, ProgressMap}, @@ -14,22 +18,36 @@ use { replay_stage::ReplayStage, unfrozen_gossip_verified_vote_hashes::UnfrozenGossipVerifiedVoteHashes, }, - solana_ledger::genesis_utils::create_genesis_config, + solana_entry::entry::Entry, + solana_gossip::cluster_info::{ClusterInfo, Node}, + solana_ledger::{ + blockstore::Blockstore, create_new_tmp_ledger_auto_delete, + genesis_utils::create_genesis_config, leader_schedule_cache::LeaderScheduleCache, + }, + solana_perf::packet::to_packet_batches, + solana_poh::poh_recorder::create_test_recorder, solana_runtime::{ accounts_background_service::AbsRequestSender, bank::Bank, bank_forks::BankForks, genesis_utils::GenesisConfigInfo, installed_scheduler_pool::SchedulingContext, prioritization_fee_cache::PrioritizationFeeCache, }, solana_runtime_transaction::runtime_transaction::RuntimeTransaction, - solana_sdk::{hash::Hash, pubkey::Pubkey, system_transaction, transaction::Result}, + solana_sdk::{ + hash::Hash, pubkey::Pubkey, signature::Signer, signer::keypair::Keypair, + system_transaction, transaction::Result, + }, + solana_streamer::socket::SocketAddrSpace, solana_timings::ExecuteTimings, - solana_unified_scheduler_logic::Task, + solana_unified_scheduler_logic::{SchedulingMode, Task}, solana_unified_scheduler_pool::{ - DefaultTaskHandler, HandlerContext, PooledScheduler, SchedulerPool, TaskHandler, + DefaultSchedulerPool, DefaultTaskHandler, HandlerContext, PooledScheduler, SchedulerPool, + SupportedSchedulingMode, TaskHandler, }, std::{ collections::HashMap, - sync::{Arc, Mutex}, + sync::{atomic::Ordering, Arc, Mutex}, + thread::sleep, + time::Duration, }, }; @@ -185,3 +203,106 @@ fn test_scheduler_waited_by_drop_bank_service() { // the scheduler used by the pruned_bank have been returned now. assert_eq!(pool_raw.pooled_scheduler_count(), 1); } + +#[test] +fn test_scheduler_producing_blocks() { + solana_logger::setup(); + + let GenesisConfigInfo { + genesis_config, + mint_keypair, + .. + } = create_genesis_config(10_000); + let (ledger_path, _blockhash) = create_new_tmp_ledger_auto_delete!(&genesis_config); + let blockstore = Arc::new(Blockstore::open(ledger_path.path()).unwrap()); + + // Setup bank_forks with block-producing unified scheduler enabled + let genesis_bank = Bank::new_for_tests(&genesis_config); + let bank_forks = BankForks::new_rw_arc(genesis_bank); + let ignored_prioritization_fee_cache = Arc::new(PrioritizationFeeCache::new(0u64)); + let genesis_bank = bank_forks.read().unwrap().working_bank_with_scheduler(); + genesis_bank.set_fork_graph_in_program_cache(Arc::downgrade(&bank_forks)); + let leader_schedule_cache = Arc::new(LeaderScheduleCache::new_from_bank(&genesis_bank)); + let (exit, poh_recorder, poh_service, signal_receiver) = create_test_recorder( + genesis_bank.clone(), + blockstore.clone(), + None, + Some(leader_schedule_cache), + ); + let pool = DefaultSchedulerPool::new( + SupportedSchedulingMode::Either(SchedulingMode::BlockProduction), + None, + None, + None, + None, + ignored_prioritization_fee_cache, + poh_recorder.read().unwrap().new_recorder(), + ); + let channels = { + let banking_tracer = BankingTracer::new_disabled(); + banking_tracer.create_channels_for_scheduler_pool(&pool) + }; + let cluster_info = { + let keypair = Arc::new(Keypair::new()); + let node = Node::new_localhost_with_pubkey(&keypair.pubkey()); + Arc::new(ClusterInfo::new( + node.info, + keypair, + SocketAddrSpace::Unspecified, + )) + }; + ensure_banking_stage_setup(&pool, &bank_forks, &channels, &cluster_info, &poh_recorder); + bank_forks.write().unwrap().install_scheduler_pool(pool); + + // Wait until genesis_bank reaches its tick height... + while poh_recorder.read().unwrap().bank().is_some() { + sleep(Duration::from_millis(100)); + } + + // Create test tx + let tx = system_transaction::transfer( + &mint_keypair, + &solana_pubkey::new_rand(), + 1, + genesis_config.hash(), + ); + let banking_packet_batch = BankingPacketBatch::new(to_packet_batches(&vec![tx.clone(); 1], 1)); + let tx = RuntimeTransaction::from_transaction_for_tests(tx); + + // Crate tpu_bank + let tpu_bank = Bank::new_from_parent(genesis_bank.clone(), &Pubkey::default(), 2); + let tpu_bank = bank_forks + .write() + .unwrap() + .insert_with_scheduling_mode(SchedulingMode::BlockProduction, tpu_bank); + poh_recorder + .write() + .unwrap() + .set_bank(tpu_bank.clone_with_scheduler(), false); + tpu_bank.unblock_block_production(); + let tpu_bank = bank_forks.read().unwrap().working_bank_with_scheduler(); + assert_eq!(tpu_bank.transaction_count(), 0); + + // Now, send transaction + channels + .unified_sender() + .send(banking_packet_batch) + .unwrap(); + + // Wait until tpu_bank reaches its tick height... + while poh_recorder.read().unwrap().bank().is_some() { + sleep(Duration::from_millis(100)); + } + assert_matches!(tpu_bank.wait_for_completed_scheduler(), Some((Ok(()), _))); + + // Verify transactions are committed and poh-recorded + assert_eq!(tpu_bank.transaction_count(), 1); + assert_matches!( + signal_receiver.into_iter().find(|(_, (entry, _))| !entry.is_tick()), + Some((_, (Entry {transactions, ..}, _))) if transactions == [tx.to_versioned_transaction()] + ); + + // Stop things. + exit.store(true, Ordering::Relaxed); + poh_service.join().unwrap(); +} diff --git a/unified-scheduler-pool/src/lib.rs b/unified-scheduler-pool/src/lib.rs index e11426e6a63cd4..b99d0214c84817 100644 --- a/unified-scheduler-pool/src/lib.rs +++ b/unified-scheduler-pool/src/lib.rs @@ -729,6 +729,7 @@ impl TaskHandler for DefaultTaskHandler { } = handler_context .transaction_recorder .record_transactions(bank.slot(), vec![transaction.to_versioned_transaction()]); + trace!("pre_commit_callback: poh: {result:?}"); match result { Ok(()) => Ok(starting_transaction_index), Err(_) => Err(TransactionError::CommitCancelled),