diff --git a/Cargo.lock b/Cargo.lock index f423eb9992403e..5f43229248ae56 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -225,6 +225,7 @@ dependencies = [ "solana-log-collector", "solana-logger", "solana-measure", + "solana-poh", "solana-program-runtime", "solana-rpc", "solana-runtime", @@ -6349,6 +6350,8 @@ dependencies = [ "solana-sdk", "solana-streamer", "solana-tpu-client", + "solana-unified-scheduler-logic", + "solana-unified-scheduler-pool", "solana-version", ] @@ -7214,6 +7217,7 @@ dependencies = [ "chrono", "crossbeam-channel", "dashmap", + "derive_more 1.0.0", "etcd-client", "fs_extra", "futures 0.3.31", @@ -8534,6 +8538,7 @@ dependencies = [ "bincode", "bv", "caps", + "crossbeam-channel", "curve25519-dalek 4.1.3", "dlopen2", "fnv", @@ -10667,7 +10672,6 @@ dependencies = [ "dyn-clone", "lazy_static", "log", - "qualifier_attr", "scopeguard", "solana-clock", "solana-entry", @@ -10684,6 +10688,7 @@ dependencies = [ "solana-transaction", "solana-transaction-error", "solana-unified-scheduler-logic", + "solana-unified-scheduler-pool", "static_assertions", "test-case", "trait-set", diff --git a/banking-bench/Cargo.toml b/banking-bench/Cargo.toml index a86629552ddf84..4deaa61dc985c7 100644 --- a/banking-bench/Cargo.toml +++ b/banking-bench/Cargo.toml @@ -28,6 +28,8 @@ solana-runtime = { workspace = true, features = ["dev-context-only-utils"] } solana-sdk = { workspace = true } solana-streamer = { workspace = true } solana-tpu-client = { workspace = true } +solana-unified-scheduler-logic = { workspace = true } +solana-unified-scheduler-pool = { workspace = true } solana-version = { workspace = true } [features] diff --git a/banking-bench/src/main.rs b/banking-bench/src/main.rs index 2c0a047212bbbc..0f95dc888e03d3 100644 --- a/banking-bench/src/main.rs +++ b/banking-bench/src/main.rs @@ -8,7 +8,10 @@ use { rand::{thread_rng, Rng}, rayon::prelude::*, solana_core::{ - banking_stage::{update_bank_forks_and_poh_recorder_for_new_tpu_bank, BankingStage}, + banking_stage::{ + unified_scheduler::ensure_banking_stage_setup, + update_bank_forks_and_poh_recorder_for_new_tpu_bank, BankingStage, + }, banking_trace::{BankingTracer, Channels, BANKING_TRACE_DIR_DEFAULT_BYTE_LIMIT}, validator::{BlockProductionMethod, TransactionStructure}, }, @@ -36,6 +39,8 @@ use { transaction::Transaction, }, solana_streamer::socket::SocketAddrSpace, + solana_unified_scheduler_logic::SchedulingMode, + solana_unified_scheduler_pool::{DefaultSchedulerPool, SupportedSchedulingMode}, std::{ sync::{atomic::Ordering, Arc, RwLock}, thread::sleep, @@ -452,6 +457,25 @@ fn main() { ClusterInfo::new(node.info, keypair, SocketAddrSpace::Unspecified) }; let cluster_info = Arc::new(cluster_info); + let banking_tracer_channels = if matches!( + block_production_method, + BlockProductionMethod::UnifiedScheduler + ) { + let pool = DefaultSchedulerPool::new( + SupportedSchedulingMode::Either(SchedulingMode::BlockProduction), + None, + None, + None, + Some(replay_vote_sender.clone()), + prioritization_fee_cache.clone(), + ); + let channels = banking_tracer.create_channels_for_scheduler_pool(&pool); + ensure_banking_stage_setup(&pool, &bank_forks, &channels, &cluster_info, &poh_recorder); + bank_forks.write().unwrap().install_scheduler_pool(pool); + channels + } else { + banking_tracer.create_channels(false) + }; let Channels { non_vote_sender, non_vote_receiver, @@ -459,9 +483,9 @@ fn main() { tpu_vote_receiver, gossip_vote_sender, gossip_vote_receiver, - } = banking_tracer.create_channels(false); + } = banking_tracer_channels; let banking_stage = BankingStage::new_num_threads( - block_production_method, + block_production_method.clone(), transaction_struct, &cluster_info, &poh_recorder, @@ -476,6 +500,18 @@ fn main() { &prioritization_fee_cache, ); + // This bench processes transactions, starting from the very first bank, so special-casing is + // needed for unified scheduler. + if matches!( + block_production_method, + BlockProductionMethod::UnifiedScheduler + ) { + bank = bank_forks + .write() + .unwrap() + .reinstall_block_production_scheduler_into_working_genesis_bank(); + } + // This is so that the signal_receiver does not go out of scope after the closure. // If it is dropped before poh_service, then poh_service will error when // calling send() on the channel. @@ -536,10 +572,11 @@ fn main() { tx_total_us += now.elapsed().as_micros() as u64; let mut poh_time = Measure::start("poh_time"); - poh_recorder + let cleared_bank = poh_recorder .write() .unwrap() .reset(bank.clone(), Some((bank.slot(), bank.slot() + 1))); + assert_matches!(cleared_bank, None); poh_time.stop(); let mut new_bank_time = Measure::start("new_bank"); diff --git a/core/Cargo.toml b/core/Cargo.toml index 7f89cdc930b225..025a9e7fcba88a 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -27,6 +27,7 @@ bytes = { workspace = true } chrono = { workspace = true, features = ["default", "serde"] } crossbeam-channel = { workspace = true } dashmap = { workspace = true, features = ["rayon", "raw-api"] } +derive_more = { workspace = true } etcd-client = { workspace = true, features = ["tls"] } futures = { workspace = true } histogram = { workspace = true } @@ -93,6 +94,7 @@ solana-tls-utils = { workspace = true } solana-tpu-client = { workspace = true } solana-transaction-status = { workspace = true } solana-turbine = { workspace = true } +solana-unified-scheduler-logic = { workspace = true } solana-unified-scheduler-pool = { workspace = true } solana-version = { workspace = true } solana-vote = { workspace = true } @@ -124,7 +126,6 @@ solana-rpc = { workspace = true, features = ["dev-context-only-utils"] } solana-sdk = { workspace = true, features = ["dev-context-only-utils"] } solana-stake-program = { workspace = true } solana-system-program = { workspace = true } -solana-unified-scheduler-logic = { workspace = true } solana-unified-scheduler-pool = { workspace = true, features = [ "dev-context-only-utils", ] } diff --git a/core/src/banking_simulation.rs b/core/src/banking_simulation.rs index 82c72f7711fbb6..7a64de10a3a84c 100644 --- a/core/src/banking_simulation.rs +++ b/core/src/banking_simulation.rs @@ -2,6 +2,7 @@ use { crate::{ banking_stage::{ + unified_scheduler::ensure_banking_stage_setup, update_bank_forks_and_poh_recorder_for_new_tpu_bank, BankingStage, LikeClusterInfo, }, banking_trace::{ @@ -45,6 +46,7 @@ use { }, solana_streamer::socket::SocketAddrSpace, solana_turbine::broadcast_stage::{BroadcastStage, BroadcastStageType}, + solana_unified_scheduler_pool::DefaultSchedulerPool, std::{ collections::BTreeMap, fmt::Display, @@ -691,6 +693,7 @@ impl BankingSimulator { blockstore: Arc, block_production_method: BlockProductionMethod, transaction_struct: TransactionStructure, + unified_scheduler_pool: Option>, ) -> (SenderLoop, SimulatorLoop, SimulatorThreads) { let parent_slot = self.parent_slot().unwrap(); let mut packet_batches_by_time = self.banking_trace_events.packet_batches_by_time; @@ -777,6 +780,19 @@ impl BankingSimulator { let cluster_info_for_banking = Arc::new(DummyClusterInfo { id: simulated_leader.into(), }); + let banking_tracer_channels = if let Some(pool) = unified_scheduler_pool { + let channels = retracer.create_channels_for_scheduler_pool(&pool); + ensure_banking_stage_setup( + &pool, + &bank_forks, + &channels, + &cluster_info_for_banking, + &poh_recorder, + ); + channels + } else { + retracer.create_channels(false) + }; let Channels { non_vote_sender, non_vote_receiver, @@ -784,7 +800,7 @@ impl BankingSimulator { tpu_vote_receiver, gossip_vote_sender, gossip_vote_receiver, - } = retracer.create_channels(false); + } = banking_tracer_channels; let (replay_vote_sender, _replay_vote_receiver) = unbounded(); let (retransmit_slots_sender, retransmit_slots_receiver) = unbounded(); @@ -907,6 +923,7 @@ impl BankingSimulator { blockstore: Arc, block_production_method: BlockProductionMethod, transaction_struct: TransactionStructure, + unified_scheduler_pool: Option>, ) -> Result<(), SimulateError> { let (sender_loop, simulator_loop, simulator_threads) = self.prepare_simulation( genesis_config, @@ -914,6 +931,7 @@ impl BankingSimulator { blockstore, block_production_method, transaction_struct, + unified_scheduler_pool, ); sender_loop.log_starting(); diff --git a/core/src/banking_stage.rs b/core/src/banking_stage.rs index 4aab0260ee635b..467c8cbb7dfbfa 100644 --- a/core/src/banking_stage.rs +++ b/core/src/banking_stage.rs @@ -39,6 +39,7 @@ use { vote_sender_types::ReplayVoteSender, }, solana_sdk::{pubkey::Pubkey, timing::AtomicInterval}, + solana_unified_scheduler_logic::SchedulingMode, std::{ cmp, env, ops::Deref, @@ -391,6 +392,9 @@ impl BankingStage { prioritization_fee_cache, ) } + BlockProductionMethod::UnifiedScheduler => Self { + bank_thread_hdls: vec![], + }, } } @@ -729,11 +733,15 @@ pub(crate) fn update_bank_forks_and_poh_recorder_for_new_tpu_bank( tpu_bank: Bank, track_transaction_indexes: bool, ) { - let tpu_bank = bank_forks.write().unwrap().insert(tpu_bank); + let tpu_bank = bank_forks + .write() + .unwrap() + .insert_with_scheduling_mode(SchedulingMode::BlockProduction, tpu_bank); poh_recorder .write() .unwrap() - .set_bank(tpu_bank, track_transaction_indexes); + .set_bank(tpu_bank.clone_with_scheduler(), track_transaction_indexes); + tpu_bank.unblock_block_production(); } #[cfg(test)] diff --git a/core/src/banking_stage/decision_maker.rs b/core/src/banking_stage/decision_maker.rs index bde4701233b522..3c5b50be465f14 100644 --- a/core/src/banking_stage/decision_maker.rs +++ b/core/src/banking_stage/decision_maker.rs @@ -7,8 +7,9 @@ use { }, pubkey::Pubkey, }, + solana_unified_scheduler_pool::{BankingStageMonitor, BankingStageStatus}, std::{ - sync::{Arc, RwLock}, + sync::{atomic::Ordering::Relaxed, Arc, RwLock}, time::{Duration, Instant}, }, }; @@ -31,9 +32,10 @@ impl BufferedPacketsDecision { } } -#[derive(Clone)] +#[derive(Clone, derive_more::Debug)] pub struct DecisionMaker { my_pubkey: Pubkey, + #[debug("{poh_recorder:p}")] poh_recorder: Arc>, cached_decision: Option, @@ -136,6 +138,21 @@ impl DecisionMaker { } } +impl BankingStageMonitor for DecisionMaker { + fn status(&mut self) -> BankingStageStatus { + if self.poh_recorder.read().unwrap().is_exited.load(Relaxed) { + BankingStageStatus::Exited + } else if matches!( + self.make_consume_or_forward_decision(), + BufferedPacketsDecision::Forward, + ) { + BankingStageStatus::Inactive + } else { + BankingStageStatus::Active + } + } +} + #[cfg(test)] mod tests { use { diff --git a/core/src/banking_stage/packet_deserializer.rs b/core/src/banking_stage/packet_deserializer.rs index 32e2dd591ec589..3e5688c0513e60 100644 --- a/core/src/banking_stage/packet_deserializer.rs +++ b/core/src/banking_stage/packet_deserializer.rs @@ -199,7 +199,6 @@ impl PacketDeserializer { }) } - #[allow(dead_code)] pub(crate) fn deserialize_packets_with_indexes( packet_batch: &PacketBatch, ) -> impl Iterator + '_ { diff --git a/core/src/banking_stage/unified_scheduler.rs b/core/src/banking_stage/unified_scheduler.rs index c05b2b7ce47a6c..ab1c29468f20a1 100644 --- a/core/src/banking_stage/unified_scheduler.rs +++ b/core/src/banking_stage/unified_scheduler.rs @@ -32,7 +32,7 @@ use { packet_deserializer::PacketDeserializer, LikeClusterInfo, }, - crate::banking_trace::Channels, + crate::{banking_stage::BankingStage, banking_trace::Channels}, agave_banking_stage_ingress_types::BankingPacketBatch, solana_poh::poh_recorder::PohRecorder, solana_runtime::{bank_forks::BankForks, root_bank_cache::RootBankCache}, @@ -49,16 +49,22 @@ pub(crate) fn ensure_banking_stage_setup( cluster_info: &impl LikeClusterInfo, poh_recorder: &Arc>, ) { + if !pool.block_production_supported() { + return; + } + let mut root_bank_cache = RootBankCache::new(bank_forks.clone()); let unified_receiver = channels.unified_receiver().clone(); let mut decision_maker = DecisionMaker::new(cluster_info.id(), poh_recorder.clone()); + let block_producing_scheduler_handler_threads = BankingStage::num_threads() as usize; + let banking_stage_monitor = Box::new(decision_maker.clone()); let transaction_recorder = poh_recorder.read().unwrap().new_recorder(); let banking_packet_handler = Box::new( move |helper: &BankingStageHelper, batches: BankingPacketBatch| { let decision = decision_maker.make_consume_or_forward_decision(); if matches!(decision, BufferedPacketsDecision::Forward) { - return; + return Ok(()); } let bank = root_bank_cache.root_bank(); for batch in batches.iter() { @@ -80,14 +86,17 @@ pub(crate) fn ensure_banking_stage_setup( let index = task_id_base + packet_index; let task = helper.create_new_task(transaction, index); - helper.send_new_task(task); + helper.send_new_task(task)? } } + Ok(()) }, ); pool.register_banking_stage( unified_receiver, + block_producing_scheduler_handler_threads, + banking_stage_monitor, banking_packet_handler, transaction_recorder, ); diff --git a/core/src/banking_trace.rs b/core/src/banking_trace.rs index 7fc8b85cd35b8f..f3a5cf5708330b 100644 --- a/core/src/banking_trace.rs +++ b/core/src/banking_trace.rs @@ -5,6 +5,7 @@ use { crossbeam_channel::{unbounded, Receiver, SendError, Sender, TryRecvError}, rolling_file::{RollingCondition, RollingConditionBasic, RollingFileAppender}, solana_sdk::{hash::Hash, slot_history::Slot}, + solana_unified_scheduler_pool::DefaultSchedulerPool, std::{ fs::{create_dir_all, remove_dir_all}, io::{self, Write}, @@ -184,7 +185,6 @@ pub struct Channels { pub gossip_vote_receiver: BankingPacketReceiver, } -#[allow(dead_code)] impl Channels { #[cfg(feature = "dev-context-only-utils")] pub fn unified_sender(&self) -> &BankingPacketSender { @@ -285,6 +285,11 @@ impl BankingTracer { } } + pub fn create_channels_for_scheduler_pool(&self, pool: &DefaultSchedulerPool) -> Channels { + let should_unify = pool.block_production_supported(); + self.create_channels(should_unify) + } + fn create_channel(&self, label: ChannelLabel) -> (BankingPacketSender, BankingPacketReceiver) { Self::channel(label, self.active_tracer.as_ref().cloned()) } diff --git a/core/src/replay_stage.rs b/core/src/replay_stage.rs index 048a10f14dd97f..5266abbf31d4c2 100644 --- a/core/src/replay_stage.rs +++ b/core/src/replay_stage.rs @@ -77,6 +77,7 @@ use { transaction::Transaction, }, solana_timings::ExecuteTimings, + solana_unified_scheduler_logic::SchedulingMode, solana_vote::vote_transaction::VoteTransaction, std::{ collections::{HashMap, HashSet}, @@ -2828,6 +2829,28 @@ impl ReplayStage { } } + fn wait_for_cleared_bank(bank: BankWithScheduler) { + if matches!( + bank.scheduling_mode(), + Some(SchedulingMode::BlockProduction) + ) { + info!("Reaping cleared tpu_bank: {}...", bank.slot()); + if let Some((result, _completed_execute_timings)) = bank.wait_for_completed_scheduler() + { + info!( + "Reaped aborted tpu_bank with unified scheduler: {} {:?}", + bank.slot(), + result + ); + } else { + info!( + "Skipped to reap a tpu_bank (seems unified scheduler is disabled): {}", + bank.slot() + ); + } + } + } + fn reset_poh_recorder( my_pubkey: &Pubkey, blockstore: &Blockstore, @@ -2846,7 +2869,10 @@ impl ReplayStage { GRACE_TICKS_FACTOR * MAX_GRACE_SLOTS, ); - poh_recorder.write().unwrap().reset(bank, next_leader_slot); + let cleared_bank = poh_recorder.write().unwrap().reset(bank, next_leader_slot); + if let Some(cleared_bank) = cleared_bank { + Self::wait_for_cleared_bank(cleared_bank); + } let next_leader_msg = if let Some(next_leader_slot) = next_leader_slot { format!("My next leader slot is {}", next_leader_slot.0) diff --git a/core/src/validator.rs b/core/src/validator.rs index 372a3734004d05..a01e09878e774e 100644 --- a/core/src/validator.rs +++ b/core/src/validator.rs @@ -5,6 +5,7 @@ use { crate::{ accounts_hash_verifier::AccountsHashVerifier, admin_rpc_post_init::AdminRpcRequestMetadataPostInit, + banking_stage::unified_scheduler::ensure_banking_stage_setup, banking_trace::{self, BankingTracer, TraceError}, cluster_info_vote_listener::VoteTracker, completed_data_sets_service::CompletedDataSetsService, @@ -129,7 +130,8 @@ use { DEFAULT_TPU_CONNECTION_POOL_SIZE, DEFAULT_TPU_USE_QUIC, DEFAULT_VOTE_USE_QUIC, }, solana_turbine::{self, broadcast_stage::BroadcastStageType}, - solana_unified_scheduler_pool::DefaultSchedulerPool, + solana_unified_scheduler_logic::SchedulingMode, + solana_unified_scheduler_pool::{DefaultSchedulerPool, SupportedSchedulingMode}, solana_vote_program::vote_state, solana_wen_restart::wen_restart::{wait_for_wen_restart, WenRestartConfig}, std::{ @@ -186,12 +188,15 @@ impl BlockVerificationMethod { } } -#[derive(Clone, EnumString, EnumVariantNames, Default, IntoStaticStr, Display)] +#[derive( + Clone, EnumCount, EnumIter, EnumString, EnumVariantNames, Default, IntoStaticStr, Display, +)] #[strum(serialize_all = "kebab-case")] pub enum BlockProductionMethod { - #[default] CentralScheduler, CentralSchedulerGreedy, + #[default] + UnifiedScheduler, } impl BlockProductionMethod { @@ -236,6 +241,23 @@ impl TransactionStructure { } } +pub fn supported_scheduling_mode( + (verification, production): (&BlockVerificationMethod, &BlockProductionMethod), +) -> SupportedSchedulingMode { + match (verification, production) { + (BlockVerificationMethod::UnifiedScheduler, BlockProductionMethod::UnifiedScheduler) => { + SupportedSchedulingMode::Both + } + (BlockVerificationMethod::UnifiedScheduler, _) => { + SupportedSchedulingMode::Either(SchedulingMode::BlockVerification) + } + (_, BlockProductionMethod::UnifiedScheduler) => { + SupportedSchedulingMode::Either(SchedulingMode::BlockProduction) + } + _ => unreachable!("seems unified scheduler is disabled"), + } +} + /// Configuration for the block generator invalidator for replay. #[derive(Clone, Debug)] pub struct GeneratorConfig { @@ -984,32 +1006,46 @@ impl Validator { } else { info!("Disabled banking trace"); } - let banking_tracer_channels = banking_tracer.create_channels(false); - - match &config.block_verification_method { - BlockVerificationMethod::BlockstoreProcessor => { - info!("no scheduler pool is installed for block verification..."); - if let Some(count) = config.unified_scheduler_handler_threads { - warn!( - "--unified-scheduler-handler-threads={count} is ignored because unified \ - scheduler isn't enabled" - ); - } - } - BlockVerificationMethod::UnifiedScheduler => { - let scheduler_pool = DefaultSchedulerPool::new_dyn( + let banking_tracer_channels = match ( + &config.block_verification_method, + &config.block_production_method, + ) { + methods @ (BlockVerificationMethod::UnifiedScheduler, _) + | methods @ (_, BlockProductionMethod::UnifiedScheduler) => { + let scheduler_pool = DefaultSchedulerPool::new( + supported_scheduling_mode(methods), config.unified_scheduler_handler_threads, config.runtime_config.log_messages_bytes_limit, transaction_status_sender.clone(), Some(replay_vote_sender.clone()), prioritization_fee_cache.clone(), ); + + let channels = banking_tracer.create_channels_for_scheduler_pool(&scheduler_pool); + ensure_banking_stage_setup( + &scheduler_pool, + &bank_forks, + &channels, + &cluster_info, + &poh_recorder, + ); bank_forks .write() .unwrap() .install_scheduler_pool(scheduler_pool); + channels } - } + _ => { + info!("no scheduler pool is installed for block verification/production..."); + if let Some(count) = config.unified_scheduler_handler_threads { + warn!( + "--unified-scheduler-handler-threads={count} is ignored because unified \ + scheduler isn't enabled" + ); + } + banking_tracer.create_channels(false) + } + }; let entry_notification_sender = entry_notifier_service .as_ref() diff --git a/core/tests/unified_scheduler.rs b/core/tests/unified_scheduler.rs index 5565110f9a3b7c..fde7f24bd41381 100644 --- a/core/tests/unified_scheduler.rs +++ b/core/tests/unified_scheduler.rs @@ -41,7 +41,7 @@ use { solana_unified_scheduler_logic::{SchedulingMode, Task}, solana_unified_scheduler_pool::{ DefaultSchedulerPool, DefaultTaskHandler, HandlerContext, PooledScheduler, SchedulerPool, - TaskHandler, + SupportedSchedulingMode, TaskHandler, }, std::{ collections::HashMap, @@ -87,7 +87,7 @@ fn test_scheduler_waited_by_drop_bank_service() { let genesis_bank = Bank::new_for_tests(&genesis_config); let bank_forks = BankForks::new_rw_arc(genesis_bank); let ignored_prioritization_fee_cache = Arc::new(PrioritizationFeeCache::new(0u64)); - let pool_raw = SchedulerPool::, _>::new( + let pool_raw = SchedulerPool::, _>::new_for_verification( None, None, None, @@ -229,10 +229,17 @@ fn test_scheduler_producing_blocks() { None, Some(leader_schedule_cache), ); - let pool = DefaultSchedulerPool::new(None, None, None, None, ignored_prioritization_fee_cache); + let pool = DefaultSchedulerPool::new( + SupportedSchedulingMode::Either(SchedulingMode::BlockProduction), + None, + None, + None, + None, + ignored_prioritization_fee_cache, + ); let channels = { let banking_tracer = BankingTracer::new_disabled(); - banking_tracer.create_channels(true) + banking_tracer.create_channels_for_scheduler_pool(&pool) }; let cluster_info = { let keypair = Arc::new(Keypair::new()); @@ -271,6 +278,7 @@ fn test_scheduler_producing_blocks() { .write() .unwrap() .set_bank(tpu_bank.clone_with_scheduler(), false); + tpu_bank.unblock_block_production(); let tpu_bank = bank_forks.read().unwrap().working_bank_with_scheduler(); assert_eq!(tpu_bank.transaction_count(), 0); diff --git a/ledger-tool/Cargo.toml b/ledger-tool/Cargo.toml index 41b356c5c31ba1..6007b107261149 100644 --- a/ledger-tool/Cargo.toml +++ b/ledger-tool/Cargo.toml @@ -43,6 +43,7 @@ solana-ledger = { workspace = true, features = ["dev-context-only-utils"] } solana-log-collector = { workspace = true } solana-logger = { workspace = true } solana-measure = { workspace = true } +solana-poh = { workspace = true } solana-program-runtime = { workspace = true, features = ["metrics"] } solana-rpc = { workspace = true, features = ["dev-context-only-utils"] } solana-runtime = { workspace = true, features = ["dev-context-only-utils"] } diff --git a/ledger-tool/src/ledger_utils.rs b/ledger-tool/src/ledger_utils.rs index 37dea623197bdc..c7a3e448958328 100644 --- a/ledger-tool/src/ledger_utils.rs +++ b/ledger-tool/src/ledger_utils.rs @@ -9,7 +9,8 @@ use { }, solana_core::{ accounts_hash_verifier::AccountsHashVerifier, - snapshot_packager_service::PendingSnapshotPackages, validator::BlockVerificationMethod, + snapshot_packager_service::PendingSnapshotPackages, + validator::{supported_scheduling_mode, BlockProductionMethod, BlockVerificationMethod}, }, solana_geyser_plugin_manager::geyser_plugin_service::{ GeyserPluginService, GeyserPluginServiceError, @@ -65,6 +66,7 @@ pub struct LoadAndProcessLedgerOutput { // not. It is safe to let ABS continue in the background, and ABS will stop // if/when it finally checks the exit flag pub accounts_background_service: AccountsBackgroundService, + pub unified_scheduler_pool: Option>, } const PROCESS_SLOTS_HELP_STRING: &str = @@ -354,43 +356,66 @@ pub fn load_and_process_ledger( exit.clone(), ) .map_err(LoadAndProcessLedgerError::LoadBankForks)?; + let leader_schedule_cache = Arc::new(leader_schedule_cache); let block_verification_method = value_t!( arg_matches, "block_verification_method", BlockVerificationMethod ) .unwrap_or_default(); + let block_production_method = value_t!( + arg_matches, + "block_production_method", + BlockProductionMethod + ) + .inspect(|method| { + if matches!(method, BlockProductionMethod::UnifiedScheduler) + && !arg_matches.is_present("enable_experimental_block_production_method") + { + error!( + "Currently, the unified-scheduler method is experimental for block-production. \ + Explicitly pass --enable-experimental-block-production-method to supress this error" + ); + } + }) + .unwrap_or_default(); info!( - "Using: block-verification-method: {}", - block_verification_method, + "Using: block-verification-method: {}, block-production-method: {}", + block_verification_method, block_production_method ); let unified_scheduler_handler_threads = value_t!(arg_matches, "unified_scheduler_handler_threads", usize).ok(); - match block_verification_method { - BlockVerificationMethod::BlockstoreProcessor => { - info!("no scheduler pool is installed for block verification..."); + let unified_scheduler_pool = match (&block_verification_method, &block_production_method) { + methods @ (BlockVerificationMethod::UnifiedScheduler, _) + | methods @ (_, BlockProductionMethod::UnifiedScheduler) => { + let no_replay_vote_sender = None; + let ignored_prioritization_fee_cache = Arc::new(PrioritizationFeeCache::new(0u64)); + + let pool = DefaultSchedulerPool::new( + supported_scheduling_mode(methods), + unified_scheduler_handler_threads, + process_options.runtime_config.log_messages_bytes_limit, + transaction_status_sender.clone(), + no_replay_vote_sender, + ignored_prioritization_fee_cache, + ); + bank_forks + .write() + .unwrap() + .install_scheduler_pool(pool.clone()); + Some(pool) + } + _ => { + info!("no scheduler pool is installed for block verification/production..."); if let Some(count) = unified_scheduler_handler_threads { warn!( "--unified-scheduler-handler-threads={count} is ignored because unified \ scheduler isn't enabled" ); } + None } - BlockVerificationMethod::UnifiedScheduler => { - let no_replay_vote_sender = None; - let ignored_prioritization_fee_cache = Arc::new(PrioritizationFeeCache::new(0u64)); - bank_forks - .write() - .unwrap() - .install_scheduler_pool(DefaultSchedulerPool::new_dyn( - unified_scheduler_handler_threads, - process_options.runtime_config.log_messages_bytes_limit, - transaction_status_sender.clone(), - no_replay_vote_sender, - ignored_prioritization_fee_cache, - )); - } - } + }; let pending_snapshot_packages = Arc::new(Mutex::new(PendingSnapshotPackages::default())); let (accounts_package_sender, accounts_package_receiver) = crossbeam_channel::unbounded(); @@ -439,6 +464,7 @@ pub fn load_and_process_ledger( bank_forks, starting_snapshot_hashes, accounts_background_service, + unified_scheduler_pool, }) .map_err(LoadAndProcessLedgerError::ProcessBlockstoreFromRoot); diff --git a/ledger-tool/src/main.rs b/ledger-tool/src/main.rs index e6fc98013dd3e2..4f8b107b00e1fb 100644 --- a/ledger-tool/src/main.rs +++ b/ledger-tool/src/main.rs @@ -995,6 +995,15 @@ fn main() { .global(true) .help(DefaultSchedulerPool::cli_message()), ) + .arg( + Arg::with_name("enable_experimental_block_production_method") + .long("enable-experimental-block-production-method") + .takes_value(false) + .help( + "Accept unified-scheduler to be used as an experimental block \ + production method", + ), + ) .arg( Arg::with_name("output_format") .long("output") @@ -2072,6 +2081,7 @@ fn main() { bank_forks, starting_snapshot_hashes, accounts_background_service, + .. } = load_and_process_ledger_or_exit( arg_matches, &genesis_config, @@ -2514,14 +2524,17 @@ fn main() { AccessType::Primary, // needed for purging already existing simulated block shreds... )); let genesis_config = open_genesis_config_by(&ledger_path, arg_matches); - let LoadAndProcessLedgerOutput { bank_forks, .. } = - load_and_process_ledger_or_exit( - arg_matches, - &genesis_config, - blockstore.clone(), - process_options, - None, // transaction status sender - ); + let LoadAndProcessLedgerOutput { + bank_forks, + unified_scheduler_pool, + .. + } = load_and_process_ledger_or_exit( + arg_matches, + &genesis_config, + blockstore.clone(), + process_options, + None, // transaction status sender + ); let block_production_method = value_t!( arg_matches, @@ -2541,6 +2554,7 @@ fn main() { blockstore, block_production_method, transaction_struct, + unified_scheduler_pool, ) { Ok(()) => println!("Ok"), Err(error) => { diff --git a/ledger/src/blockstore_processor.rs b/ledger/src/blockstore_processor.rs index d7720dd533e5d6..43e1a45929bba4 100644 --- a/ledger/src/blockstore_processor.rs +++ b/ledger/src/blockstore_processor.rs @@ -5039,9 +5039,10 @@ pub mod tests { let mut mocked_scheduler = MockInstalledScheduler::new(); let seq = Arc::new(Mutex::new(mockall::Sequence::new())); let seq_cloned = seq.clone(); + // Used for assertions in BankWithScheduler::{new, schedule_transaction_executions} mocked_scheduler .expect_context() - .times(1) + .times(2) .in_sequence(&mut seq.lock().unwrap()) .return_const(context); if should_succeed { diff --git a/local-cluster/tests/local_cluster.rs b/local-cluster/tests/local_cluster.rs index 9a9f80ccbe839a..9d8d35e932a3eb 100644 --- a/local-cluster/tests/local_cluster.rs +++ b/local-cluster/tests/local_cluster.rs @@ -16,7 +16,7 @@ use { }, optimistic_confirmation_verifier::OptimisticConfirmationVerifier, replay_stage::DUPLICATE_THRESHOLD, - validator::{BlockVerificationMethod, ValidatorConfig}, + validator::{BlockProductionMethod, BlockVerificationMethod, ValidatorConfig}, }, solana_download_utils::download_snapshot_archive, solana_entry::entry::create_ticks, @@ -5835,6 +5835,40 @@ fn test_randomly_mixed_block_verification_methods_between_bootstrap_and_not() { ); } +#[test] +#[serial] +fn test_randomly_mixed_block_production_methods_between_bootstrap_and_not() { + // tailored logging just to see two block production methods are working correctly + solana_logger::setup_with_default( + "solana_metrics::metrics=warn,\ + solana_core=warn,\ + solana_runtime::installed_scheduler_pool=trace,\ + solana_ledger::blockstore_processor=debug,\ + info", + ); + + let num_nodes = BlockProductionMethod::COUNT; + let mut config = + ClusterConfig::new_with_equal_stakes(num_nodes, DEFAULT_MINT_LAMPORTS, DEFAULT_NODE_STAKE); + + // Overwrite block_production_method with shuffled variants + let mut methods = BlockProductionMethod::iter().collect::>(); + methods.shuffle(&mut rand::thread_rng()); + for (validator_config, method) in config.validator_configs.iter_mut().zip_eq(methods) { + validator_config.block_production_method = method; + } + + let local = LocalCluster::new(&mut config, SocketAddrSpace::Unspecified); + cluster_tests::spend_and_verify_all_nodes( + &local.entry_point_info, + &local.funding_keypair, + num_nodes, + HashSet::new(), + SocketAddrSpace::Unspecified, + &local.connection_cache, + ); +} + /// Forks previous marked invalid should be marked as such in fork choice on restart #[test] #[ignore] diff --git a/perf/Cargo.toml b/perf/Cargo.toml index 45c508ee762409..b3c14dfdc3e972 100644 --- a/perf/Cargo.toml +++ b/perf/Cargo.toml @@ -13,6 +13,7 @@ edition = { workspace = true } ahash = { workspace = true } bincode = { workspace = true } bv = { workspace = true, features = ["serde"] } +crossbeam-channel = { workspace = true } curve25519-dalek = { workspace = true } dlopen2 = { workspace = true } fnv = { workspace = true } diff --git a/poh/src/poh_recorder.rs b/poh/src/poh_recorder.rs index d633018958075c..c8b0a6b6084c48 100644 --- a/poh/src/poh_recorder.rs +++ b/poh/src/poh_recorder.rs @@ -140,6 +140,13 @@ impl TransactionRecorder { } } + pub fn new_dummy() -> Self { + Self { + record_sender: crossbeam_channel::unbounded().0, + is_exited: Arc::new(AtomicBool::default()), + } + } + /// Hashes `transactions` and sends to PoH service for recording. Waits for response up to 1s. /// Panics on unexpected (non-`MaxHeightReached`) errors. pub fn record_transactions( @@ -419,7 +426,12 @@ impl PohRecorder { } // synchronize PoH with a bank - pub fn reset(&mut self, reset_bank: Arc, next_leader_slot: Option<(Slot, Slot)>) { + pub fn reset( + &mut self, + reset_bank: Arc, + next_leader_slot: Option<(Slot, Slot)>, + ) -> Option { + let cleared_bank = self.clear_bank(); self.clear_bank(); self.reset_poh(reset_bank, true); @@ -440,6 +452,7 @@ impl PohRecorder { self.grace_ticks = grace_ticks; self.leader_first_tick_height = leader_first_tick_height; self.leader_last_tick_height = leader_last_tick_height; + cleared_bank } // Returns the index of `transactions.first()` in the slot, if being tracked by WorkingBank @@ -600,7 +613,8 @@ impl PohRecorder { let _ = self.flush_cache(false); } - fn clear_bank(&mut self) { + fn clear_bank(&mut self) -> Option { + let mut cleared_bank = None; if let Some(WorkingBank { bank, start, .. }) = self.working_bank.take() { self.leader_bank_notifier.set_completed(bank.slot()); let next_leader_slot = self.leader_schedule_cache.next_leader_slot( @@ -622,6 +636,7 @@ impl PohRecorder { ("slot", bank.slot(), i64), ("elapsed", start.elapsed().as_millis(), i64), ); + cleared_bank = Some(bank); } if let Some(ref signal) = self.clear_bank_signal { @@ -635,6 +650,7 @@ impl PohRecorder { } } } + cleared_bank } fn reset_poh(&mut self, reset_bank: Arc, reset_start_bank: bool) { diff --git a/programs/sbf/Cargo.lock b/programs/sbf/Cargo.lock index f95d2746870204..6a57c9611d5bf8 100644 --- a/programs/sbf/Cargo.lock +++ b/programs/sbf/Cargo.lock @@ -5696,6 +5696,7 @@ dependencies = [ "chrono", "crossbeam-channel", "dashmap", + "derive_more 1.0.0", "etcd-client", "futures 0.3.31", "histogram", @@ -5756,6 +5757,7 @@ dependencies = [ "solana-tpu-client", "solana-transaction-status", "solana-turbine", + "solana-unified-scheduler-logic", "solana-unified-scheduler-pool", "solana-version", "solana-vote", @@ -6663,6 +6665,7 @@ dependencies = [ "bincode", "bv", "caps", + "crossbeam-channel", "curve25519-dalek 4.1.3", "dlopen2", "fnv", @@ -7301,6 +7304,7 @@ dependencies = [ "ahash 0.8.11", "aquamarine", "arrayref", + "assert_matches", "base64 0.22.1", "bincode", "blake3", @@ -8901,7 +8905,6 @@ dependencies = [ "derive_more 1.0.0", "dyn-clone", "log", - "qualifier_attr", "scopeguard", "solana-ledger", "solana-poh", diff --git a/runtime/Cargo.toml b/runtime/Cargo.toml index 639f8d5eba1d53..92963e94b93f8a 100644 --- a/runtime/Cargo.toml +++ b/runtime/Cargo.toml @@ -13,6 +13,7 @@ edition = { workspace = true } ahash = { workspace = true } aquamarine = { workspace = true } arrayref = { workspace = true } +assert_matches = { workspace = true } base64 = { workspace = true } bincode = { workspace = true } blake3 = { workspace = true } @@ -101,7 +102,6 @@ name = "solana_runtime" [dev-dependencies] agave-transaction-view = { workspace = true } -assert_matches = { workspace = true } ed25519-dalek = { workspace = true } libsecp256k1 = { workspace = true } memoffset = { workspace = true } diff --git a/runtime/src/bank_forks.rs b/runtime/src/bank_forks.rs index 4270fab7ec514c..35023a830fb9b1 100644 --- a/runtime/src/bank_forks.rs +++ b/runtime/src/bank_forks.rs @@ -243,18 +243,7 @@ impl BankForks { let bank = Arc::new(bank); let bank = if let Some(scheduler_pool) = &self.scheduler_pool { - let context = SchedulingContext::new_with_mode(mode, bank.clone()); - let scheduler = scheduler_pool.take_scheduler(context); - let bank_with_scheduler = BankWithScheduler::new(bank, Some(scheduler)); - // Skip registering for block production. Both the tvu main loop in the replay stage - // and PohRecorder don't support _concurrent block production_ at all. It's strongly - // assumed that block is produced in singleton way and it's actually desired, while - // ignoring the opportunity cost of (hopefully rare!) fork switching... - if matches!(mode, SchedulingMode::BlockVerification) { - scheduler_pool - .register_timeout_listener(bank_with_scheduler.create_timeout_listener()); - } - bank_with_scheduler + Self::install_scheduler_into_bank(scheduler_pool, mode, bank, false) } else { BankWithScheduler::new_without_scheduler(bank) }; @@ -268,6 +257,48 @@ impl BankForks { bank } + fn install_scheduler_into_bank( + scheduler_pool: &InstalledSchedulerPoolArc, + mode: SchedulingMode, + bank: Arc, + is_reinstall: bool, + ) -> BankWithScheduler { + trace!( + "Inserting bank (slot: {}) with scheduler (mode: {:?}, reinstall: {:?})", + bank.slot(), + mode, + is_reinstall, + ); + let context = SchedulingContext::new_with_mode(mode, bank.clone()); + let Some(scheduler) = scheduler_pool.take_scheduler(context) else { + return BankWithScheduler::new_without_scheduler(bank); + }; + let bank_with_scheduler = BankWithScheduler::new(bank, Some(scheduler)); + // Skip registering for block production. Both the tvu main loop in the replay stage + // and PohRecorder don't support _concurrent block production_ at all. It's strongly + // assumed that block is produced in singleton way and it's actually desired, while + // ignoring the opportunity cost of (hopefully rare!) fork switching... + if matches!(mode, SchedulingMode::BlockVerification) { + scheduler_pool.register_timeout_listener(bank_with_scheduler.create_timeout_listener()); + } + bank_with_scheduler + } + + #[cfg(feature = "dev-context-only-utils")] + pub fn reinstall_block_production_scheduler_into_working_genesis_bank( + &mut self, + ) -> BankWithScheduler { + let bank = self.working_bank(); + assert!(self.banks.len() == 1 && bank.slot() == 0 && !bank.is_frozen()); + let pool = self.scheduler_pool.as_ref().unwrap(); + let mode = SchedulingMode::BlockProduction; + let bank = Self::install_scheduler_into_bank(pool, mode, bank, true); + self.banks + .insert(bank.slot(), bank.clone_with_scheduler()) + .expect("some removed bank"); + bank + } + pub fn insert_from_ledger(&mut self, bank: Bank) -> BankWithScheduler { self.highest_slot_at_startup = std::cmp::max(self.highest_slot_at_startup, bank.slot()); self.insert(bank) diff --git a/runtime/src/installed_scheduler_pool.rs b/runtime/src/installed_scheduler_pool.rs index 4dec084c057e89..1c0b4c74beeb1e 100644 --- a/runtime/src/installed_scheduler_pool.rs +++ b/runtime/src/installed_scheduler_pool.rs @@ -22,6 +22,7 @@ use { crate::bank::Bank, + assert_matches::assert_matches, log::*, solana_runtime_transaction::runtime_transaction::RuntimeTransaction, solana_sdk::{ @@ -49,7 +50,7 @@ pub fn initialized_result_with_timings() -> ResultWithTimings { pub trait InstalledSchedulerPool: Send + Sync + Debug { /// A very thin wrapper of [`Self::take_resumed_scheduler`] to take a scheduler from this pool /// for a brand-new bank. - fn take_scheduler(&self, context: SchedulingContext) -> InstalledSchedulerBox { + fn take_scheduler(&self, context: SchedulingContext) -> Option { self.take_resumed_scheduler(context, initialized_result_with_timings()) } @@ -57,7 +58,7 @@ pub trait InstalledSchedulerPool: Send + Sync + Debug { &self, context: SchedulingContext, result_with_timings: ResultWithTimings, - ) -> InstalledSchedulerBox; + ) -> Option; /// Registers an opaque timeout listener. /// @@ -178,6 +179,8 @@ pub trait InstalledScheduler: Send + Sync + Debug + 'static { index: usize, ) -> ScheduleResult; + fn unblock_scheduling(&self); + /// Return the error which caused the scheduler to abort. /// /// Note that this must not be called until it's observed that `schedule_execution()` has @@ -348,7 +351,7 @@ pub enum SchedulerStatus { /// transaction to be executed (= [`BankWithScheduler::schedule_transaction_executions`] is /// called, which internally calls [`BankWithSchedulerInner::with_active_scheduler`] to make /// the transition happen). - Stale(InstalledSchedulerPoolArc, ResultWithTimings), + Stale(InstalledSchedulerPoolArc, SchedulingMode, ResultWithTimings), } impl SchedulerStatus { @@ -359,13 +362,23 @@ impl SchedulerStatus { } } + fn scheduling_mode(&self) -> Option { + match self { + SchedulerStatus::Unavailable => None, + SchedulerStatus::Active(scheduler) => Some(scheduler.context().mode()), + SchedulerStatus::Stale(_, mode, _) => Some(*mode), + } + } + fn transition_from_stale_to_active( &mut self, f: impl FnOnce(InstalledSchedulerPoolArc, ResultWithTimings) -> InstalledSchedulerBox, ) { - let Self::Stale(pool, result_with_timings) = mem::replace(self, Self::Unavailable) else { + let Self::Stale(pool, mode, result_with_timings) = mem::replace(self, Self::Unavailable) + else { panic!("transition to Active failed: {self:?}"); }; + assert_matches!(mode, SchedulingMode::BlockVerification); *self = Self::Active(f(pool, result_with_timings)); } @@ -379,8 +392,9 @@ impl SchedulerStatus { let Self::Active(scheduler) = mem::replace(self, Self::Unavailable) else { unreachable!("not active: {self:?}"); }; + let mode = scheduler.context().mode; let (pool, result_with_timings) = f(scheduler); - *self = Self::Stale(pool, result_with_timings); + *self = Self::Stale(pool, mode, result_with_timings); } fn transition_from_active_to_unavailable(&mut self) -> InstalledSchedulerBox { @@ -391,7 +405,8 @@ impl SchedulerStatus { } fn transition_from_stale_to_unavailable(&mut self) -> ResultWithTimings { - let Self::Stale(_pool, result_with_timings) = mem::replace(self, Self::Unavailable) else { + let Self::Stale(_pool, _mode, result_with_timings) = mem::replace(self, Self::Unavailable) + else { panic!("transition to Unavailable failed: {self:?}"); }; result_with_timings @@ -509,6 +524,10 @@ impl BankWithScheduler { ); let schedule_result: ScheduleResult = self.inner.with_active_scheduler(|scheduler| { + assert_matches!( + scheduler.context().mode(), + SchedulingMode::BlockVerification + ); for (sanitized_transaction, index) in transactions_with_indexes { scheduler.schedule_execution(sanitized_transaction, index)?; } @@ -529,6 +548,13 @@ impl BankWithScheduler { Ok(()) } + pub fn unblock_block_production(&self) { + if let SchedulerStatus::Active(scheduler) = &*self.inner.scheduler.read().unwrap() { + assert_matches!(scheduler.context().mode(), SchedulingMode::BlockProduction); + scheduler.unblock_scheduling(); + } + } + #[cfg_attr(feature = "dev-context-only-utils", qualifiers(pub))] pub(crate) fn create_timeout_listener(&self) -> TimeoutListener { self.inner.do_create_timeout_listener() @@ -562,6 +588,10 @@ impl BankWithScheduler { ) } + pub fn scheduling_mode(&self) -> Option { + self.inner.scheduler.read().unwrap().scheduling_mode() + } + pub const fn no_scheduler_available() -> InstalledSchedulerRwLock { RwLock::new(SchedulerStatus::Unavailable) } @@ -578,14 +608,16 @@ impl BankWithSchedulerInner { // This is the fast path, needing single read-lock most of time. f(scheduler) } - SchedulerStatus::Stale(_pool, (result, _timings)) if result.is_err() => { + SchedulerStatus::Stale(_pool, mode, (result, _timings)) if result.is_err() => { + assert_matches!(mode, SchedulingMode::BlockVerification); trace!( "with_active_scheduler: bank (slot: {}) has a stale aborted scheduler...", self.bank.slot(), ); Err(SchedulerAborted) } - SchedulerStatus::Stale(pool, _result_with_timings) => { + SchedulerStatus::Stale(pool, mode, _result_with_timings) => { + assert_matches!(mode, SchedulingMode::BlockVerification); let pool = pool.clone(); drop(scheduler); @@ -595,7 +627,9 @@ impl BankWithSchedulerInner { let mut scheduler = self.scheduler.write().unwrap(); trace!("with_active_scheduler: {:?}", scheduler); scheduler.transition_from_stale_to_active(|pool, result_with_timings| { - let scheduler = pool.take_resumed_scheduler(context, result_with_timings); + let scheduler = pool + .take_resumed_scheduler(context, result_with_timings) + .expect("successful retaking"); info!( "with_active_scheduler: bank (slot: {}) got active, taking scheduler (id: {})", self.bank.slot(), @@ -636,24 +670,25 @@ impl BankWithSchedulerInner { // the pool and yet it has yet to be `wait_for_termination()`-ed. To avoid unbounded // thread creation under forky condition, return the scheduler for now, even if the // bank could process more transactions later. + let mut id = None; scheduler.maybe_transition_from_active_to_stale(|scheduler| { // Return the installed scheduler back to the scheduler pool as soon as the // scheduler indicates the completion of all currently-scheduled transaction // executions by `solana_unified_scheduler_pool::ThreadManager::end_session()` // internally. - let id = scheduler.id(); + id = Some(scheduler.id()); let (result_with_timings, uninstalled_scheduler) = scheduler.wait_for_termination(false); uninstalled_scheduler.return_to_pool(); info!( - "timeout_listener: bank (slot: {}) got stale, returning scheduler (id: {})", + "timeout_listener: bank (slot: {}) got stale, returned scheduler (id: {:?})", bank.bank.slot(), id, ); (pool, result_with_timings) }); - trace!("timeout_listener: {:?}", scheduler); + trace!("timeout_listener: {:?}", id); }) } @@ -663,7 +698,8 @@ impl BankWithSchedulerInner { let mut scheduler = self.scheduler.write().unwrap(); match &mut *scheduler { SchedulerStatus::Active(scheduler) => scheduler.recover_error_after_abort(), - SchedulerStatus::Stale(_pool, (result, _timings)) if result.is_err() => { + SchedulerStatus::Stale(_pool, mode, (result, _timings)) if result.is_err() => { + assert_matches!(mode, SchedulingMode::BlockVerification); result.clone().unwrap_err() } _ => unreachable!("no error in {:?}", self.scheduler), @@ -705,12 +741,12 @@ impl BankWithSchedulerInner { uninstalled_scheduler.return_to_pool(); (false, Some(result_with_timings)) } - SchedulerStatus::Stale(_pool, _result_with_timings) if reason.is_paused() => { + SchedulerStatus::Stale(_pool, _mode, _result_with_timings) if reason.is_paused() => { // Do nothing for pauses because the scheduler termination is guaranteed to be // called later. (true, None) } - SchedulerStatus::Stale(_pool, _result_with_timings) => { + SchedulerStatus::Stale(_pool, _mode, _result_with_timings) => { let result_with_timings = scheduler.transition_from_stale_to_unavailable(); (true, Some(result_with_timings)) } @@ -777,7 +813,6 @@ mod tests { bank::test_utils::goto_end_of_slot_with_scheduler, genesis_utils::{create_genesis_config, GenesisConfigInfo}, }, - assert_matches::assert_matches, mockall::Sequence, solana_sdk::system_transaction, std::sync::Mutex, @@ -787,12 +822,14 @@ mod tests { bank: Arc, is_dropped_flags: impl Iterator, f: Option, + extra_context_use: usize, ) -> InstalledSchedulerBox { let mut mock = MockInstalledScheduler::new(); let seq = Arc::new(Mutex::new(Sequence::new())); + // Could be used for assertions in BankWithScheduler::{new, schedule_transaction_executions} mock.expect_context() - .times(1) + .times(1 + extra_context_use) .in_sequence(&mut seq.lock().unwrap()) .return_const(SchedulingContext::for_verification(bank)); @@ -831,6 +868,7 @@ mod tests { bank, is_dropped_flags, None:: ()>, + 0, ) } @@ -892,6 +930,7 @@ mod tests { .times(1) .returning(|| ()); }), + 0, )), ); goto_end_of_slot_with_scheduler(&bank); @@ -933,6 +972,7 @@ mod tests { .returning(|| TransactionError::InsufficientFundsForFee); } }), + 1, ); let bank = BankWithScheduler::new(bank, Some(mocked_scheduler)); diff --git a/svm/examples/Cargo.lock b/svm/examples/Cargo.lock index d1a01238cae7d7..f708902463dc75 100644 --- a/svm/examples/Cargo.lock +++ b/svm/examples/Cargo.lock @@ -5549,6 +5549,7 @@ dependencies = [ "chrono", "crossbeam-channel", "dashmap", + "derive_more 1.0.0", "etcd-client", "futures 0.3.31", "histogram", @@ -5609,6 +5610,7 @@ dependencies = [ "solana-tpu-client", "solana-transaction-status", "solana-turbine", + "solana-unified-scheduler-logic", "solana-unified-scheduler-pool", "solana-version", "solana-vote", @@ -6482,6 +6484,7 @@ dependencies = [ "bincode", "bv", "caps", + "crossbeam-channel", "curve25519-dalek 4.1.3", "dlopen2", "fnv", @@ -7120,6 +7123,7 @@ dependencies = [ "ahash 0.8.11", "aquamarine", "arrayref", + "assert_matches", "base64 0.22.1", "bincode", "blake3", @@ -8237,7 +8241,6 @@ dependencies = [ "derive_more 1.0.0", "dyn-clone", "log", - "qualifier_attr", "scopeguard", "solana-ledger", "solana-poh", diff --git a/unified-scheduler-logic/src/lib.rs b/unified-scheduler-logic/src/lib.rs index ab986007ad258e..79e764fe2976a5 100644 --- a/unified-scheduler-logic/src/lib.rs +++ b/unified-scheduler-logic/src/lib.rs @@ -458,6 +458,10 @@ impl TaskInner { .with_borrow_mut(token, |usage_count| usage_count.decrement_self().is_zero()); did_unblock.then_some(self) } + + pub fn into_transaction(self: Task) -> RuntimeTransaction { + Task::into_inner(self).unwrap().transaction + } } /// [`Task`]'s per-address context to lock a [usage_queue](UsageQueue) with [certain kind of @@ -626,23 +630,37 @@ const_assert_eq!(mem::size_of::(), 8); pub struct SchedulingStateMachine { unblocked_task_queue: VecDeque, active_task_count: ShortCounter, + executing_task_count: ShortCounter, + max_executing_task_count: u32, handled_task_count: ShortCounter, unblocked_task_count: ShortCounter, total_task_count: ShortCounter, count_token: BlockedUsageCountToken, usage_queue_token: UsageQueueToken, } -const_assert_eq!(mem::size_of::(), 48); +const_assert_eq!(mem::size_of::(), 56); impl SchedulingStateMachine { pub fn has_no_active_task(&self) -> bool { self.active_task_count.is_zero() } + pub fn has_no_executing_task(&self) -> bool { + self.executing_task_count.current() == 0 + } + pub fn has_unblocked_task(&self) -> bool { !self.unblocked_task_queue.is_empty() } + pub fn has_runnable_task(&mut self) -> bool { + self.is_task_runnable() && self.has_unblocked_task() + } + + pub fn is_task_runnable(&self) -> bool { + self.executing_task_count.current() < self.max_executing_task_count + } + pub fn unblocked_task_queue_count(&self) -> usize { self.unblocked_task_queue.len() } @@ -702,13 +720,14 @@ impl SchedulingStateMachine { self.active_task_count.increment_self(); self.try_lock_usage_queues(task).and_then(|task| { // locking succeeded, and then ... - if force_buffering { + if !self.is_task_runnable() || force_buffering { // ... push to unblocked_task_queue, if buffering is forced. self.unblocked_task_count.increment_self(); self.unblocked_task_queue.push_back(task); None } else { // ... return the task back as schedulable to the caller as-is otherwise. + self.executing_task_count.increment_self(); Some(task) } }) @@ -717,6 +736,7 @@ impl SchedulingStateMachine { #[must_use] pub fn schedule_next_unblocked_task(&mut self) -> Option { self.unblocked_task_queue.pop_front().inspect(|_| { + self.executing_task_count.increment_self(); self.unblocked_task_count.increment_self(); }) } @@ -732,6 +752,7 @@ impl SchedulingStateMachine { /// tasks inside `SchedulingStateMachine` to provide an offloading-based optimization /// opportunity for callers. pub fn deschedule_task(&mut self, task: &Task) { + self.executing_task_count.decrement_self(); self.active_task_count.decrement_self(); self.handled_task_count.increment_self(); self.unlock_usage_queues(task); @@ -887,11 +908,14 @@ impl SchedulingStateMachine { /// other slots. pub fn reinitialize(&mut self) { assert!(self.has_no_active_task()); + assert_eq!(self.executing_task_count.current(), 0); assert_eq!(self.unblocked_task_queue.len(), 0); // nice trick to ensure all fields are handled here if new one is added. let Self { unblocked_task_queue: _, active_task_count, + executing_task_count: _, + max_executing_task_count: _, handled_task_count, unblocked_task_count, total_task_count, @@ -911,12 +935,16 @@ impl SchedulingStateMachine { /// # Safety /// Call this exactly once for each thread. See [`TokenCell`] for details. #[must_use] - pub unsafe fn exclusively_initialize_current_thread_for_scheduling() -> Self { + pub unsafe fn exclusively_initialize_current_thread_for_scheduling( + max_executing_task_count: u32, + ) -> Self { Self { // It's very unlikely this is desired to be configurable, like // `UsageQueueInner::blocked_usages_from_tasks`'s cap. unblocked_task_queue: VecDeque::with_capacity(1024), active_task_count: ShortCounter::zero(), + executing_task_count: ShortCounter::zero(), + max_executing_task_count, handled_task_count: ShortCounter::zero(), unblocked_task_count: ShortCounter::zero(), total_task_count: ShortCounter::zero(), @@ -924,6 +952,11 @@ impl SchedulingStateMachine { usage_queue_token: unsafe { UsageQueueToken::assume_exclusive_mutating_thread() }, } } + + #[cfg(test)] + unsafe fn exclusively_initialize_current_thread_for_scheduling_for_test() -> Self { + Self::exclusively_initialize_current_thread_for_scheduling(200) + } } #[cfg(test)] @@ -985,7 +1018,7 @@ mod tests { #[test] fn test_scheduling_state_machine_creation() { let state_machine = unsafe { - SchedulingStateMachine::exclusively_initialize_current_thread_for_scheduling() + SchedulingStateMachine::exclusively_initialize_current_thread_for_scheduling_for_test() }; assert_eq!(state_machine.active_task_count(), 0); assert_eq!(state_machine.total_task_count(), 0); @@ -995,7 +1028,7 @@ mod tests { #[test] fn test_scheduling_state_machine_good_reinitialization() { let mut state_machine = unsafe { - SchedulingStateMachine::exclusively_initialize_current_thread_for_scheduling() + SchedulingStateMachine::exclusively_initialize_current_thread_for_scheduling_for_test() }; state_machine.total_task_count.increment_self(); assert_eq!(state_machine.total_task_count(), 1); @@ -1007,7 +1040,7 @@ mod tests { #[should_panic(expected = "assertion failed: self.has_no_active_task()")] fn test_scheduling_state_machine_bad_reinitialization() { let mut state_machine = unsafe { - SchedulingStateMachine::exclusively_initialize_current_thread_for_scheduling() + SchedulingStateMachine::exclusively_initialize_current_thread_for_scheduling_for_test() }; let address_loader = &mut create_address_loader(None); let task = SchedulingStateMachine::create_task(simplest_transaction(), 3, address_loader); @@ -1032,7 +1065,7 @@ mod tests { let task = SchedulingStateMachine::create_task(sanitized, 3, address_loader); let mut state_machine = unsafe { - SchedulingStateMachine::exclusively_initialize_current_thread_for_scheduling() + SchedulingStateMachine::exclusively_initialize_current_thread_for_scheduling_for_test() }; let task = state_machine.schedule_task(task).unwrap(); assert_eq!(state_machine.active_task_count(), 1); @@ -1052,7 +1085,7 @@ mod tests { let task3 = SchedulingStateMachine::create_task(sanitized.clone(), 103, address_loader); let mut state_machine = unsafe { - SchedulingStateMachine::exclusively_initialize_current_thread_for_scheduling() + SchedulingStateMachine::exclusively_initialize_current_thread_for_scheduling_for_test() }; assert_matches!( state_machine @@ -1104,7 +1137,7 @@ mod tests { let task3 = SchedulingStateMachine::create_task(sanitized.clone(), 103, address_loader); let mut state_machine = unsafe { - SchedulingStateMachine::exclusively_initialize_current_thread_for_scheduling() + SchedulingStateMachine::exclusively_initialize_current_thread_for_scheduling_for_test() }; assert_matches!( state_machine @@ -1154,7 +1187,7 @@ mod tests { let task2 = SchedulingStateMachine::create_task(sanitized2, 102, address_loader); let mut state_machine = unsafe { - SchedulingStateMachine::exclusively_initialize_current_thread_for_scheduling() + SchedulingStateMachine::exclusively_initialize_current_thread_for_scheduling_for_test() }; // both of read-only tasks should be immediately runnable assert_matches!( @@ -1195,7 +1228,7 @@ mod tests { let task3 = SchedulingStateMachine::create_task(sanitized3, 103, address_loader); let mut state_machine = unsafe { - SchedulingStateMachine::exclusively_initialize_current_thread_for_scheduling() + SchedulingStateMachine::exclusively_initialize_current_thread_for_scheduling_for_test() }; assert_matches!( state_machine @@ -1246,7 +1279,7 @@ mod tests { let task3 = SchedulingStateMachine::create_task(sanitized3, 103, address_loader); let mut state_machine = unsafe { - SchedulingStateMachine::exclusively_initialize_current_thread_for_scheduling() + SchedulingStateMachine::exclusively_initialize_current_thread_for_scheduling_for_test() }; assert_matches!( state_machine @@ -1288,7 +1321,7 @@ mod tests { let task2 = SchedulingStateMachine::create_task(sanitized2, 102, address_loader); let mut state_machine = unsafe { - SchedulingStateMachine::exclusively_initialize_current_thread_for_scheduling() + SchedulingStateMachine::exclusively_initialize_current_thread_for_scheduling_for_test() }; assert_matches!( state_machine @@ -1324,7 +1357,7 @@ mod tests { let task4 = SchedulingStateMachine::create_task(sanitized4, 104, address_loader); let mut state_machine = unsafe { - SchedulingStateMachine::exclusively_initialize_current_thread_for_scheduling() + SchedulingStateMachine::exclusively_initialize_current_thread_for_scheduling_for_test() }; assert_matches!( state_machine @@ -1380,7 +1413,7 @@ mod tests { let task2 = SchedulingStateMachine::create_task(sanitized2, 102, address_loader); let mut state_machine = unsafe { - SchedulingStateMachine::exclusively_initialize_current_thread_for_scheduling() + SchedulingStateMachine::exclusively_initialize_current_thread_for_scheduling_for_test() }; assert_matches!( state_machine @@ -1420,7 +1453,7 @@ mod tests { #[should_panic(expected = "internal error: entered unreachable code")] fn test_unreachable_unlock_conditions1() { let mut state_machine = unsafe { - SchedulingStateMachine::exclusively_initialize_current_thread_for_scheduling() + SchedulingStateMachine::exclusively_initialize_current_thread_for_scheduling_for_test() }; let usage_queue = UsageQueue::default(); usage_queue @@ -1434,7 +1467,7 @@ mod tests { #[should_panic(expected = "internal error: entered unreachable code")] fn test_unreachable_unlock_conditions2() { let mut state_machine = unsafe { - SchedulingStateMachine::exclusively_initialize_current_thread_for_scheduling() + SchedulingStateMachine::exclusively_initialize_current_thread_for_scheduling_for_test() }; let usage_queue = UsageQueue::default(); usage_queue @@ -1449,7 +1482,7 @@ mod tests { #[should_panic(expected = "internal error: entered unreachable code")] fn test_unreachable_unlock_conditions3() { let mut state_machine = unsafe { - SchedulingStateMachine::exclusively_initialize_current_thread_for_scheduling() + SchedulingStateMachine::exclusively_initialize_current_thread_for_scheduling_for_test() }; let usage_queue = UsageQueue::default(); usage_queue diff --git a/unified-scheduler-pool/Cargo.toml b/unified-scheduler-pool/Cargo.toml index 091191229e2333..cfa4434cf23c4d 100644 --- a/unified-scheduler-pool/Cargo.toml +++ b/unified-scheduler-pool/Cargo.toml @@ -19,7 +19,6 @@ derive-where = { workspace = true } derive_more = { workspace = true } dyn-clone = { workspace = true } log = { workspace = true } -qualifier_attr = { workspace = true } scopeguard = { workspace = true } solana-ledger = { workspace = true } solana-poh = { workspace = true } @@ -44,6 +43,8 @@ solana-keypair = { workspace = true } solana-logger = { workspace = true } solana-runtime = { workspace = true, features = ["dev-context-only-utils"] } solana-system-transaction = { workspace = true } +# See order-crates-for-publishing.py for using this unusual `path = "."` +solana-unified-scheduler-pool = { path = ".", features = ["dev-context-only-utils"] } test-case = { workspace = true } [features] diff --git a/unified-scheduler-pool/src/lib.rs b/unified-scheduler-pool/src/lib.rs index 69e9c244951fd5..5fd9987c8efb29 100644 --- a/unified-scheduler-pool/src/lib.rs +++ b/unified-scheduler-pool/src/lib.rs @@ -12,8 +12,6 @@ //! Refer to [`PooledScheduler`] doc comment for general overview of scheduler state transitions //! regarding to pooling and the actual use. -#[cfg(feature = "dev-context-only-utils")] -use qualifier_attr::qualifiers; use { agave_banking_stage_ingress_types::{BankingPacketBatch, BankingPacketReceiver}, assert_matches::assert_matches, @@ -31,9 +29,9 @@ use { solana_runtime::{ installed_scheduler_pool::{ initialized_result_with_timings, InstalledScheduler, InstalledSchedulerBox, - InstalledSchedulerPool, InstalledSchedulerPoolArc, ResultWithTimings, ScheduleResult, - SchedulerAborted, SchedulerId, SchedulingContext, TimeoutListener, - UninstalledScheduler, UninstalledSchedulerBox, + InstalledSchedulerPool, ResultWithTimings, ScheduleResult, SchedulerAborted, + SchedulerId, SchedulingContext, TimeoutListener, UninstalledScheduler, + UninstalledSchedulerBox, }, prioritization_fee_cache::PrioritizationFeeCache, vote_sender_types::ReplayVoteSender, @@ -80,6 +78,33 @@ enum CheckPoint { type AtomicSchedulerId = AtomicU64; +#[derive(Debug)] +pub enum SupportedSchedulingMode { + Either(SchedulingMode), + Both, +} + +impl SupportedSchedulingMode { + fn is_supported(&self, requested_mode: SchedulingMode) -> bool { + match (self, requested_mode) { + (Self::Both, _) => true, + (Self::Either(ref supported), ref requested) if supported == requested => true, + _ => false, + } + } + + #[cfg(feature = "dev-context-only-utils")] + fn block_verification_only() -> Self { + Self::Either(BlockVerification) + } + + #[cfg(feature = "dev-context-only-utils")] + fn block_production_only() -> Self { + // todo... + Self::Both + } +} + /// A pool of idling schedulers (usually [`PooledScheduler`]), ready to be taken by bank. /// /// Also, the pool runs a _cleaner_ thread named as `solScCleaner`. its jobs include: @@ -96,11 +121,12 @@ type AtomicSchedulerId = AtomicU64; /// explanation of this rather complex dyn trait/type hierarchy. #[derive(Debug)] pub struct SchedulerPool, TH: TaskHandler> { + supported_scheduling_mode: SupportedSchedulingMode, scheduler_inners: Mutex>, block_production_scheduler_inner: Mutex>, trashed_scheduler_inners: Mutex>, timeout_listeners: Mutex>, - handler_count: usize, + block_verification_handler_count: usize, common_handler_context: CommonHandlerContext, banking_stage_handler_context: Mutex>, // weak_self could be elided by changing InstalledScheduler::take_scheduler()'s receiver to @@ -161,6 +187,20 @@ impl, TH: TaskHandler> BlockProductionSchedulerInner bool { + match self { + Self::NotSpawned | Self::Taken(_) => false, + Self::Pooled(_) => true, + } + } + + fn peek_pooled(&self) -> Option<&S::Inner> { + match self { + Self::NotSpawned | Self::Taken(_) => None, + Self::Pooled(inner) => Some(inner), + } + } + fn take_pooled(&mut self) -> S::Inner { let id = { let Self::Pooled(inner) = &self else { @@ -177,6 +217,7 @@ impl, TH: TaskHandler> BlockProductionSchedulerInner, transaction_status_sender: Option, replay_vote_sender: Option, @@ -188,6 +229,23 @@ pub struct HandlerContext { transaction_recorder: Option, } +impl HandlerContext { + fn new_task_creator(&self) -> TaskCreator { + match &self.banking_stage_helper { + None => TaskCreator::ForBlockVerification { + usage_queue_loader: UsageQueueLoader::default(), + }, + Some(helper) => TaskCreator::ForBlockProduction { + banking_stage_helper: helper.clone(), + }, + } + } + + fn banking_stage_helper(&self) -> &BankingStageHelper { + self.banking_stage_helper.as_ref().unwrap() + } +} + #[derive(Debug, Clone)] struct CommonHandlerContext { log_messages_bytes_limit: Option, @@ -199,6 +257,7 @@ struct CommonHandlerContext { impl CommonHandlerContext { fn into_handler_context( self, + parallelism: usize, banking_packet_receiver: BankingPacketReceiver, banking_packet_handler: Box, banking_stage_helper: Option>, @@ -212,6 +271,7 @@ impl CommonHandlerContext { } = self; HandlerContext { + parallelism, log_messages_bytes_limit, transaction_status_sender, replay_vote_sender, @@ -230,11 +290,13 @@ struct BankingStageHandlerContext { #[debug("{banking_packet_handler:p}")] banking_packet_handler: Box, transaction_recorder: TransactionRecorder, + block_production_handler_count: usize, + banking_stage_monitor: Box, } trait_set! { pub trait BankingPacketHandler = - DynClone + FnMut(&BankingStageHelper, BankingPacketBatch) + Send + 'static; + DynClone + Send + 'static + FnMut(&BankingStageHelper, BankingPacketBatch) -> ScheduleResult; } // Make this `Clone`-able so that it can easily propagated to all the handler threads. clone_trait_object!(BankingPacketHandler); @@ -243,15 +305,15 @@ clone_trait_object!(BankingPacketHandler); pub struct BankingStageHelper { usage_queue_loader: UsageQueueLoader, next_task_id: AtomicUsize, - new_task_sender: Sender, + new_task_sender: Weak>, } impl BankingStageHelper { - fn new(new_task_sender: Sender) -> Self { + fn new(new_task_sender: &Arc>) -> Self { Self { usage_queue_loader: UsageQueueLoader::default(), next_task_id: AtomicUsize::default(), - new_task_sender, + new_task_sender: Arc::downgrade(new_task_sender), } } @@ -259,7 +321,7 @@ impl BankingStageHelper { self.next_task_id.fetch_add(count, Relaxed) } - pub fn create_new_task( + fn do_create_task( &self, transaction: RuntimeTransaction, index: usize, @@ -269,19 +331,47 @@ impl BankingStageHelper { }) } - pub fn send_new_task(&self, task: Task) { - self.new_task_sender - .send(NewTaskPayload::Payload(task)) - .unwrap(); + pub fn create_new_task( + &self, + transaction: RuntimeTransaction, + index: usize, + ) -> Task { + self.do_create_task(transaction, index) + } + + fn recreate_task(&self, executed_task: Box) -> Task { + let new_index = self.generate_task_ids(1); + let transaction = executed_task.into_task().into_transaction(); + self.do_create_task(transaction, new_index) + } + + pub fn send_new_task(&self, task: Task) -> ScheduleResult { + let Some(sender) = self.new_task_sender.upgrade() else { + return Err(SchedulerAborted); + }; + sender.send(NewTaskPayload::Payload(task)).unwrap(); + Ok(()) + } + + fn signal_disconnection(&self) { + let Some(sender) = self.new_task_sender.upgrade() else { + return; + }; + if sender.send(NewTaskPayload::Disconnect).is_ok() { + info!("notified a disconnect from {:?}", thread::current()); + } else { + // It seems that the scheduler thread has been aborted already... + warn!("failed to notify a disconnect from {:?}", thread::current()); + } } } pub type DefaultSchedulerPool = SchedulerPool, DefaultTaskHandler>; -const DEFAULT_POOL_CLEANER_INTERVAL: Duration = Duration::from_secs(10); +const DEFAULT_POOL_CLEANER_INTERVAL: Duration = Duration::from_secs(5); const DEFAULT_MAX_POOLING_DURATION: Duration = Duration::from_secs(180); -const DEFAULT_TIMEOUT_DURATION: Duration = Duration::from_secs(12); +const DEFAULT_TIMEOUT_DURATION: Duration = Duration::from_secs(5); // Rough estimate of max UsageQueueLoader size in bytes: // UsageFromTask * UsageQueue's capacity * DEFAULT_MAX_USAGE_QUEUE_COUNT // 16 bytes * 128 items * 262_144 entries == 512 MiB @@ -303,10 +393,8 @@ where S: SpawnableScheduler, TH: TaskHandler, { - // Some internal impl and test code want an actual concrete type, NOT the - // `dyn InstalledSchedulerPool`. So don't merge this into `Self::new_dyn()`. - #[cfg_attr(feature = "dev-context-only-utils", qualifiers(pub))] - fn new( + pub fn new( + supported_scheduling_mode: SupportedSchedulingMode, handler_count: Option, log_messages_bytes_limit: Option, transaction_status_sender: Option, @@ -314,6 +402,7 @@ where prioritization_fee_cache: Arc, ) -> Arc { Self::do_new( + supported_scheduling_mode, handler_count, log_messages_bytes_limit, transaction_status_sender, @@ -326,7 +415,45 @@ where ) } + #[cfg(feature = "dev-context-only-utils")] + pub fn new_for_verification( + handler_count: Option, + log_messages_bytes_limit: Option, + transaction_status_sender: Option, + replay_vote_sender: Option, + prioritization_fee_cache: Arc, + ) -> Arc { + Self::new( + SupportedSchedulingMode::block_verification_only(), + handler_count, + log_messages_bytes_limit, + transaction_status_sender, + replay_vote_sender, + prioritization_fee_cache, + ) + } + + #[cfg(feature = "dev-context-only-utils")] + pub fn new_for_production( + handler_count: Option, + log_messages_bytes_limit: Option, + transaction_status_sender: Option, + replay_vote_sender: Option, + prioritization_fee_cache: Arc, + ) -> Arc { + Self::new( + SupportedSchedulingMode::block_production_only(), + handler_count, + log_messages_bytes_limit, + transaction_status_sender, + replay_vote_sender, + prioritization_fee_cache, + ) + } + + #[allow(clippy::too_many_arguments)] fn do_new( + supported_scheduling_mode: SupportedSchedulingMode, handler_count: Option, log_messages_bytes_limit: Option, transaction_status_sender: Option, @@ -338,14 +465,14 @@ where timeout_duration: Duration, ) -> Arc { let handler_count = handler_count.unwrap_or(Self::default_handler_count()); - assert!(handler_count >= 1); let scheduler_pool = Arc::new_cyclic(|weak_self| Self { + supported_scheduling_mode, scheduler_inners: Mutex::default(), block_production_scheduler_inner: Mutex::default(), trashed_scheduler_inners: Mutex::default(), timeout_listeners: Mutex::default(), - handler_count, + block_verification_handler_count: handler_count, common_handler_context: CommonHandlerContext { log_messages_bytes_limit, transaction_status_sender, @@ -362,8 +489,10 @@ where let cleaner_main_loop = { let weak_scheduler_pool = Arc::downgrade(&scheduler_pool); + let mut exiting = false; move || loop { sleep(pool_cleaner_interval); + trace!("Scheduler pool cleaner: start!!!",); let Some(scheduler_pool) = weak_scheduler_pool.upgrade() else { break; @@ -396,6 +525,12 @@ where idle_inner_count }; + let banking_stage_status = scheduler_pool.banking_stage_status(); + if !exiting && matches!(banking_stage_status, Some(BankingStageStatus::Exited)) { + exiting = true; + scheduler_pool.unregister_banking_stage(); + } + let trashed_inner_count = { let Ok(mut trashed_scheduler_inners) = scheduler_pool.trashed_scheduler_inners.lock() @@ -410,7 +545,7 @@ where trashed_inner_count }; - let triggered_timeout_listener_count = { + let (triggered_timeout_listener_count, active_timeout_listener_count) = { // Pre-allocate rather large capacity to avoid reallocation inside the lock. let mut expired_listeners = Vec::with_capacity(128); let Ok(mut timeout_listeners) = scheduler_pool.timeout_listeners.lock() else { @@ -422,27 +557,62 @@ where now.duration_since(*registered_at) > timeout_duration }, )); + let not_expired_count = timeout_listeners.len(); drop(timeout_listeners); - let count = expired_listeners.len(); + let expired_count = expired_listeners.len(); // Now triggers all expired listeners. Usually, triggering timeouts does // nothing because the callbacks will be no-op if already successfully // `wait_for_termination()`-ed. for (timeout_listener, _registered_at) in expired_listeners { timeout_listener.trigger(scheduler_pool.clone()); } - count + (expired_count, not_expired_count) }; + if matches!(banking_stage_status, Some(BankingStageStatus::Inactive)) { + let mut id_and_inner = scheduler_pool + .block_production_scheduler_inner + .lock() + .unwrap(); + if let Some(pooled) = id_and_inner.peek_pooled() { + if pooled.is_overgrown() { + let pooled = id_and_inner.take_pooled(); + //assert_eq!(Some(pooled.id()), id_and_inner.0.take()); + scheduler_pool.spawn_block_production_scheduler(&mut id_and_inner); + drop(id_and_inner); + drop(pooled); + } else { + pooled.reset(); + } + } + } + info!( - "Scheduler pool cleaner: dropped {} idle inners, {} trashed inners, triggered {} timeout listeners", - idle_inner_count, trashed_inner_count, triggered_timeout_listener_count, + "Scheduler pool cleaner: dropped {} idle inners, {} trashed inners, triggered {} timeout listeners, (exit: {:?})", + idle_inner_count, trashed_inner_count, triggered_timeout_listener_count, exiting, ); sleepless_testing::at(CheckPoint::IdleSchedulerCleaned(idle_inner_count)); sleepless_testing::at(CheckPoint::TrashedSchedulerCleaned(trashed_inner_count)); sleepless_testing::at(CheckPoint::TimeoutListenerTriggered( triggered_timeout_listener_count, )); + + if exiting && active_timeout_listener_count == 0 { + // Wait a bit to ensure the replay stage has gone. + sleep(Duration::from_secs(1)); + + let mut id_and_inner = scheduler_pool + .block_production_scheduler_inner + .lock() + .unwrap(); + if id_and_inner.can_take() { + // assert that block production scheduler isn't taken anymore and has been + // returned to the pool + id_and_inner.take_pooled(); + } + break; + } } }; @@ -455,24 +625,6 @@ where scheduler_pool } - // This apparently-meaningless wrapper is handy, because some callers explicitly want - // `dyn InstalledSchedulerPool` to be returned for type inference convenience. - pub fn new_dyn( - handler_count: Option, - log_messages_bytes_limit: Option, - transaction_status_sender: Option, - replay_vote_sender: Option, - prioritization_fee_cache: Arc, - ) -> InstalledSchedulerPoolArc { - Self::new( - handler_count, - log_messages_bytes_limit, - transaction_status_sender, - replay_vote_sender, - prioritization_fee_cache, - ) - } - // See a comment at the weak_self field for justification of this method's existence. fn self_arc(&self) -> Arc { self.weak_self @@ -486,10 +638,15 @@ where // This fn needs to return immediately due to being part of the blocking // `::wait_for_termination()` call. - fn return_scheduler(&self, scheduler: S::Inner) { + fn return_scheduler(&self, mut scheduler: S::Inner) { // Refer to the comment in is_aborted() as to the exact definition of the concept of // _trashed_ and the interaction among different parts of unified scheduler. let should_trash = scheduler.is_trashed(); + let id = scheduler.id(); + if should_trash { + info!("trashing scheduler (id: {})...", scheduler.id()); + } + debug!("return_scheduler(): id: {id} should_trash: {should_trash}"); let mut block_production_scheduler_inner = self.block_production_scheduler_inner.lock().unwrap(); @@ -497,19 +654,26 @@ where // Maintain the runtime invariant established in register_banking_stage() about // the availability of pooled block production scheduler by re-spawning one. if block_production_scheduler_inner.can_put(&scheduler) { + // Abort this trashed scheduler to stop receiving BankingPacketBatch anymore... + scheduler.ensure_abort(); + block_production_scheduler_inner.trash_taken(); // To prevent block-production scheduler from being taken in // do_take_resumed_scheduler() by different thread at this very moment, the // preceding `.trash_taken()` and following `.put_spawned()` must be done // atomically. That's why we pass around MutexGuard into // spawn_block_production_scheduler(). - self.spawn_block_production_scheduler(&mut block_production_scheduler_inner); + if self.should_respawn() { + info!("respawning scheduler after being trashed..."); + self.spawn_block_production_scheduler(&mut block_production_scheduler_inner); + info!("respawned scheduler after being trashed."); + } } // Delay drop()-ing this trashed returned scheduler inner by stashing it in // self.trashed_scheduler_inners, which is periodically drained by the `solScCleaner` // thread. Dropping it could take long time (in fact, - // PooledSchedulerInner::usage_queue_loader can contain many entries to drop). + // TaskCreator::usage_queue_loader() can contain many entries to drop). self.trashed_scheduler_inners .lock() .expect("not poisoned") @@ -574,13 +738,21 @@ where self.scheduler_inners.lock().expect("not poisoned").len() } + pub fn block_production_supported(&self) -> bool { + self.supported_scheduling_mode.is_supported(BlockProduction) + } + pub fn register_banking_stage( &self, banking_packet_receiver: BankingPacketReceiver, + block_production_handler_count: usize, + banking_stage_monitor: Box, banking_packet_handler: Box, transaction_recorder: TransactionRecorder, ) { *self.banking_stage_handler_context.lock().unwrap() = Some(BankingStageHandlerContext { + block_production_handler_count, + banking_stage_monitor, banking_packet_receiver, banking_packet_handler, transaction_recorder, @@ -592,39 +764,74 @@ where ); } + fn unregister_banking_stage(&self) { + assert!(self + .banking_stage_handler_context + .lock() + .unwrap() + .take() + .is_some()); + } + + fn banking_stage_status(&self) -> Option { + self.banking_stage_handler_context + .lock() + .unwrap() + .as_mut() + .map(|respawner| respawner.banking_stage_monitor.status()) + } + + fn should_respawn(&self) -> bool { + !matches!( + self.banking_stage_status(), + None | Some(BankingStageStatus::Exited) + ) + } + fn create_handler_context( &self, mode: SchedulingMode, - new_task_sender: &Sender, + new_task_sender: &Arc>, ) -> HandlerContext { let ( + parallelism, banking_packet_receiver, banking_packet_handler, banking_stage_helper, transaction_recorder, ): ( + _, _, Box, /* to aid type inference */ _, _, ) = match mode { BlockVerification => { - // Return various type-specific no-op values. - (never(), Box::new(|_, _| {}), None, None) + ( + self.block_verification_handler_count, + // Return various type-specific no-op values. + never(), + Box::new(|_, _| Ok(())), + None, + None, + ) } BlockProduction => { let handler_context = self.banking_stage_handler_context.lock().unwrap(); let handler_context = handler_context.as_ref().unwrap(); ( + handler_context.block_production_handler_count, handler_context.banking_packet_receiver.clone(), handler_context.banking_packet_handler.clone(), - Some(Arc::new(BankingStageHelper::new(new_task_sender.clone()))), + Some(Arc::new(BankingStageHelper::new(new_task_sender))), Some(handler_context.transaction_recorder.clone()), ) } }; + assert!(parallelism >= 1); self.common_handler_context.clone().into_handler_context( + parallelism, banking_packet_receiver, banking_packet_handler, banking_stage_helper, @@ -636,6 +843,7 @@ where &self, block_production_scheduler_inner: &mut MutexGuard<'_, BlockProductionSchedulerInner>, ) { + trace!("spawn block production scheduler: start!"); let scheduler = S::spawn( self.self_arc(), SchedulingContext::for_preallocation(), @@ -644,6 +852,7 @@ where let ((result, _timings), inner) = scheduler.into_inner(); assert_matches!(result, Ok(_)); block_production_scheduler_inner.put_spawned(inner); + trace!("spawn block production scheduler: end!"); } pub fn default_handler_count() -> usize { @@ -687,8 +896,14 @@ where &self, context: SchedulingContext, result_with_timings: ResultWithTimings, - ) -> InstalledSchedulerBox { - Box::new(self.do_take_resumed_scheduler(context, result_with_timings)) + ) -> Option { + if !self.supported_scheduling_mode.is_supported(context.mode()) { + return None; + } + + Some(Box::new( + self.do_take_resumed_scheduler(context, result_with_timings), + )) } fn register_timeout_listener(&self, timeout_listener: TimeoutListener) { @@ -763,6 +978,7 @@ impl TaskHandler for DefaultTaskHandler { .as_ref() .unwrap() .record_transactions(bank.slot(), vec![transaction.to_versioned_transaction()]); + trace!("pre_commit_callback: poh: {result:?}"); match result { Ok(()) => Ok(starting_transaction_index), Err(_) => Err(TransactionError::CommitCancelled), @@ -796,6 +1012,10 @@ impl ExecutedTask { result_with_timings: initialized_result_with_timings(), }) } + + fn into_task(self) -> Task { + self.task + } } // A very tiny generic message type to signal about opening and closing of subchannels, which are @@ -804,10 +1024,14 @@ impl ExecutedTask { // Note that the above properties can be upheld only when this is used inside MPSC or SPSC channels // (i.e. the consumer side needs to be single threaded). For the multiple consumer cases, // ChainedChannel can be used instead. +#[derive(Debug)] enum SubchanneledPayload { Payload(P1), OpenSubchannel(P2), CloseSubchannel, + Unblock, + Disconnect, + Reset, } type NewTaskPayload = SubchanneledPayload>; @@ -1066,10 +1290,37 @@ pub struct PooledScheduler { context: SchedulingContext, } +#[derive(Debug)] +enum TaskCreator { + ForBlockVerification { + usage_queue_loader: UsageQueueLoader, + }, + ForBlockProduction { + banking_stage_helper: Arc, + }, +} + +impl TaskCreator { + fn usage_queue_loader(&self) -> &UsageQueueLoader { + use TaskCreator::*; + + match self { + ForBlockVerification { usage_queue_loader } => usage_queue_loader, + ForBlockProduction { + banking_stage_helper, + } => &banking_stage_helper.usage_queue_loader, + } + } + + fn is_overgrown(&self, max_usage_queue_count: usize) -> bool { + self.usage_queue_loader().count() > max_usage_queue_count + } +} + #[derive(Debug)] pub struct PooledSchedulerInner, TH: TaskHandler> { thread_manager: ThreadManager, - usage_queue_loader: UsageQueueLoader, + task_creator: TaskCreator, } impl Drop for ThreadManager @@ -1100,7 +1351,7 @@ where // Ensure to initiate thread shutdown via disconnected new_task_receiver by replacing the // current new_task_sender with a random one... - self.new_task_sender = crossbeam_channel::unbounded().0; + self.disconnect_new_task_sender(); self.ensure_join_threads(true); assert_matches!(self.session_result_with_timings, Some((Ok(_), _))); @@ -1132,10 +1383,6 @@ where // before that. self.thread_manager.are_threads_joined() } - - fn is_overgrown(&self) -> bool { - self.usage_queue_loader.count() > self.thread_manager.pool.max_usage_queue_count - } } // This type manages the OS threads for scheduling and executing transactions. The term @@ -1147,7 +1394,7 @@ where struct ThreadManager, TH: TaskHandler> { scheduler_id: SchedulerId, pool: Arc>, - new_task_sender: Sender, + new_task_sender: Arc>, new_task_receiver: Option>, session_result_sender: Sender, session_result_receiver: Receiver, @@ -1167,7 +1414,7 @@ impl, TH: TaskHandler> ThreadManager { Self { scheduler_id: pool.new_scheduler_id(), pool, - new_task_sender, + new_task_sender: Arc::new(new_task_sender), new_task_receiver: Some(new_task_receiver), session_result_sender, session_result_receiver, @@ -1194,20 +1441,44 @@ impl, TH: TaskHandler> ThreadManager { #[must_use] fn accumulate_result_with_timings( + mode: SchedulingMode, (result, timings): &mut ResultWithTimings, executed_task: HandlerResult, - ) -> Option> { + ) -> Option<(Box, bool)> { let Ok(executed_task) = executed_task else { return None; }; timings.accumulate(&executed_task.result_with_timings.1); - match executed_task.result_with_timings.0 { - Ok(()) => Some(executed_task), - Err(error) => { - error!("error is detected while accumulating....: {error:?}"); - *result = Err(error); - None - } + match mode { + BlockVerification => match executed_task.result_with_timings.0 { + Ok(()) => Some((executed_task, false)), + Err(error) => { + error!("error is detected while accumulating....: {error:?}"); + *result = Err(error); + None + } + }, + BlockProduction => match executed_task.result_with_timings.0 { + Ok(()) => Some((executed_task, false)), + Err(TransactionError::CommitCancelled) + | Err(TransactionError::WouldExceedMaxBlockCostLimit) + | Err(TransactionError::WouldExceedMaxVoteCostLimit) + | Err(TransactionError::WouldExceedMaxAccountCostLimit) + | Err(TransactionError::WouldExceedAccountDataBlockLimit) => { + Some((executed_task, true)) + } + Err(ref error) => { + debug!("error is detected while accumulating....: {error:?}"); + Some((executed_task, false)) + } + }, + } + } + + fn can_finish_session(mode: SchedulingMode, state_machine: &SchedulingStateMachine) -> bool { + match mode { + BlockVerification => state_machine.has_no_active_task(), + BlockProduction => state_machine.has_no_executing_task(), } } @@ -1339,7 +1610,6 @@ impl, TH: TaskHandler> ThreadManager { // 5. the handler thread reply back to the scheduler thread as an executed task. // 6. the scheduler thread post-processes the executed task. let scheduler_main_loop = { - let handler_count = self.pool.handler_count; let session_result_sender = self.session_result_sender.clone(); // Taking new_task_receiver here is important to ensure there's a single receiver. In // this way, the replay stage will get .send() failures reliably, after this scheduler @@ -1349,6 +1619,9 @@ impl, TH: TaskHandler> ThreadManager { .take() .expect("no 2nd start_threads()"); + let handler_context = handler_context.clone(); + let mut session_resetting = false; + // Now, this is the main loop for the scheduler thread, which is a special beast. // // That's because it could be the most notable bottleneck of throughput in the future @@ -1398,7 +1671,14 @@ impl, TH: TaskHandler> ThreadManager { }; let mut state_machine = unsafe { - SchedulingStateMachine::exclusively_initialize_current_thread_for_scheduling() + SchedulingStateMachine::exclusively_initialize_current_thread_for_scheduling( + handler_context + .parallelism + .checked_mul(2) + .unwrap() + .try_into() + .unwrap(), + ) }; // The following loop maintains and updates ResultWithTimings as its @@ -1413,8 +1693,10 @@ impl, TH: TaskHandler> ThreadManager { // which isn't great and is inconsistent with `if`s in the Rust's match // arm. So, eagerly binding the result to a variable unconditionally here // makes no perf. difference... - let dummy_unblocked_task_receiver = - dummy_receiver(state_machine.has_unblocked_task()); + let dummy_unblocked_task_receiver = dummy_receiver( + state_machine.has_runnable_task() + && !(scheduling_mode == BlockProduction && session_ending), + ); // There's something special called dummy_unblocked_task_receiver here. // This odd pattern was needed to react to newly unblocked tasks from @@ -1428,13 +1710,25 @@ impl, TH: TaskHandler> ThreadManager { // to measure _actual_ cpu usage easily with the select approach. select_biased! { recv(finished_blocked_task_receiver) -> executed_task => { - let Some(executed_task) = Self::accumulate_result_with_timings( + let Ok(executed_task) = executed_task else { + assert_matches!(scheduling_mode, BlockProduction); + break 'nonaborted_main_loop; + }; + + let Some((executed_task, trigger_ending)) = Self::accumulate_result_with_timings( + scheduling_mode, &mut result_with_timings, - executed_task.expect("alive handler"), + executed_task, ) else { break 'nonaborted_main_loop; }; state_machine.deschedule_task(&executed_task.task); + if trigger_ending { + assert_matches!(scheduling_mode, BlockProduction); + session_ending = true; + let task = handler_context.banking_stage_helper().recreate_task(executed_task); + state_machine.buffer_task(task); + } }, recv(dummy_unblocked_task_receiver) -> dummy => { assert_matches!(dummy, Err(RecvError)); @@ -1457,28 +1751,43 @@ impl, TH: TaskHandler> ThreadManager { Ok(NewTaskPayload::CloseSubchannel) => { session_ending = true; } - Ok(NewTaskPayload::OpenSubchannel(_context_and_result_with_timings)) => + Ok(NewTaskPayload::OpenSubchannel(_) | NewTaskPayload::Unblock) => unreachable!(), - Err(RecvError) => { + Ok(NewTaskPayload::Disconnect) | Err(RecvError) => { // Mostly likely is that this scheduler is dropped for pruned blocks of // abandoned forks... // This short-circuiting is tested with test_scheduler_drop_short_circuiting. break 'nonaborted_main_loop; } + Ok(NewTaskPayload::Reset) => { + assert_matches!(scheduling_mode, BlockProduction); + session_ending = true; + session_resetting = true; + } } }, recv(finished_idle_task_receiver) -> executed_task => { - let Some(executed_task) = Self::accumulate_result_with_timings( + let Some((executed_task, trigger_ending)) = Self::accumulate_result_with_timings( + scheduling_mode, &mut result_with_timings, + // finished_idle_task_sender won't be disconnected + // unlike finished_blocked_task_sender??? executed_task.expect("alive handler"), ) else { break 'nonaborted_main_loop; }; state_machine.deschedule_task(&executed_task.task); + if trigger_ending { + assert_matches!(scheduling_mode, BlockProduction); + session_ending = true; + let task = handler_context.banking_stage_helper().recreate_task(executed_task); + state_machine.buffer_task(task); + } }, }; - is_finished = session_ending && state_machine.has_no_active_task(); + is_finished = session_ending + && Self::can_finish_session(scheduling_mode, &state_machine); } assert!(mem::replace(&mut is_finished, false)); @@ -1487,42 +1796,70 @@ impl, TH: TaskHandler> ThreadManager { session_result_sender .send(result_with_timings) .expect("always outlived receiver"); - state_machine.reinitialize(); assert!(mem::replace(&mut session_ending, false)); + let mut new_result_with_timings = initialized_result_with_timings(); loop { + if session_resetting { + while let Some(task) = state_machine.schedule_next_unblocked_task() { + state_machine.deschedule_task(&task); + drop(task); + } + // should call state_machine.reinitialize()??? + session_resetting = false; + } // Prepare for the new session. match new_task_receiver.recv() { Ok(NewTaskPayload::Payload(task)) => { sleepless_testing::at(CheckPoint::NewBufferedTask( task.task_index(), )); - assert_eq!(scheduling_mode, BlockProduction); + assert_matches!(scheduling_mode, BlockProduction); state_machine.buffer_task(task); } Ok(NewTaskPayload::OpenSubchannel(context_and_result_with_timings)) => { - let (new_context, new_result_with_timings) = + let new_context; + (new_context, new_result_with_timings) = *context_and_result_with_timings; // We just received subsequent (= not initial) session and about to // enter into the preceding `while(!is_finished) {...}` loop again. // Before that, propagate new SchedulingContext to handler threads assert_eq!(scheduling_mode, new_context.mode()); assert!(!new_context.is_preallocated()); + if matches!(scheduling_mode, BlockVerification) { + state_machine.reinitialize(); + } + runnable_task_sender - .send_chained_channel(&new_context, handler_count) + .send_chained_channel(&new_context, handler_context.parallelism) .unwrap(); - result_with_timings = new_result_with_timings; - break; + if matches!(scheduling_mode, BlockVerification) { + result_with_timings = new_result_with_timings; + break; + } } Ok(NewTaskPayload::CloseSubchannel) => { + assert_matches!(scheduling_mode, BlockProduction); // This match arm can be hit if context.is_preallocated() + info!("ignoring duplicate CloseSubchannel..."); } - Err(_) => { + Ok(NewTaskPayload::Disconnect) | Err(_) => { // This unusual condition must be triggered by ThreadManager::drop(). // Initialize result_with_timings with a harmless value... result_with_timings = initialized_result_with_timings(); break 'nonaborted_main_loop; } + Ok(NewTaskPayload::Unblock) => { + assert_matches!(scheduling_mode, BlockProduction); + // borrow checker won't allow us to do this inside OpenSubchannel + // to remove duplicate assignments... + result_with_timings = new_result_with_timings; + break; + } + Ok(NewTaskPayload::Reset) => { + assert_matches!(scheduling_mode, BlockProduction); + session_resetting = true; + } } } } @@ -1593,9 +1930,13 @@ impl, TH: TaskHandler> ThreadManager { // justification of this additional work in the handler thread. let Ok(banking_packet) = banking_packet else { info!("disconnected banking_packet_receiver"); + banking_stage_helper.signal_disconnection(); break; }; - banking_packet_handler(banking_stage_helper, banking_packet); + if let Err(SchedulerAborted) = banking_packet_handler(banking_stage_helper, banking_packet) { + info!("dead new_task_sender"); + break; + } continue; }, }; @@ -1630,18 +1971,23 @@ impl, TH: TaskHandler> ThreadManager { } }; + let mode_char = match scheduling_mode { + BlockVerification => 'V', + BlockProduction => 'P', + }; + self.scheduler_thread = Some( thread::Builder::new() - .name("solScheduler".to_owned()) + .name(format!("solSchedule{mode_char}")) .spawn_tracked(scheduler_main_loop) .unwrap(), ); - self.handler_threads = (0..self.pool.handler_count) + self.handler_threads = (0..handler_context.parallelism) .map({ |thx| { thread::Builder::new() - .name(format!("solScHandler{:02}", thx)) + .name(format!("solScHandle{mode_char}{:02}", thx)) .spawn_tracked(handler_main_loop()) .unwrap() } @@ -1693,17 +2039,8 @@ impl, TH: TaskHandler> ThreadManager { }; } - fn ensure_join_threads_after_abort( - &mut self, - should_receive_aborted_session_result: bool, - ) -> TransactionError { + fn ensure_join_threads_after_abort(&mut self, should_receive_aborted_session_result: bool) { self.ensure_join_threads(should_receive_aborted_session_result); - self.session_result_with_timings - .as_mut() - .unwrap() - .0 - .clone() - .unwrap_err() } fn are_threads_joined(&self) -> bool { @@ -1717,7 +2054,7 @@ impl, TH: TaskHandler> ThreadManager { } } - fn end_session(&mut self) { + fn do_end_session(&mut self, nonblocking: bool) { if self.are_threads_joined() { assert!(self.session_result_with_timings.is_some()); debug!("end_session(): skipping; already joined the aborted threads.."); @@ -1741,6 +2078,10 @@ impl, TH: TaskHandler> ThreadManager { return; } + if nonblocking { + return; + } + // Even if abort is detected, it's guaranteed that the scheduler thread puts the last // message into the session_result_sender before terminating. let result_with_timings = self.session_result_receiver.recv().unwrap(); @@ -1752,6 +2093,10 @@ impl, TH: TaskHandler> ThreadManager { debug!("end_session(): ended session at {:?}...", thread::current()); } + fn end_session(&mut self) { + self.do_end_session(false) + } + fn start_session( &mut self, context: SchedulingContext, @@ -1766,11 +2111,24 @@ impl, TH: TaskHandler> ThreadManager { )))) .expect("no new session after aborted"); } + + fn unblock_session(&self) { + self.new_task_sender + .send(NewTaskPayload::Unblock) + .expect("no new session after aborted"); + } + + fn disconnect_new_task_sender(&mut self) { + self.new_task_sender = Arc::new(crossbeam_channel::unbounded().0); + } } pub trait SchedulerInner { fn id(&self) -> SchedulerId; fn is_trashed(&self) -> bool; + fn is_overgrown(&self) -> bool; + fn reset(&self); + fn ensure_abort(&mut self); } pub trait SpawnableScheduler: InstalledScheduler { @@ -1821,19 +2179,31 @@ impl SpawnableScheduler for PooledScheduler { context: SchedulingContext, result_with_timings: ResultWithTimings, ) -> Self { - let mut inner = Self::Inner { - thread_manager: ThreadManager::new(pool.clone()), - usage_queue_loader: UsageQueueLoader::default(), + info!("spawning new scheduler for slot: {:?}", context.slot()); + let mut thread_manager = ThreadManager::new(pool.clone()); + let handler_context = + pool.create_handler_context(context.mode(), &thread_manager.new_task_sender); + let task_creator = handler_context.new_task_creator(); + thread_manager.start_threads(context.clone(), result_with_timings, handler_context); + let inner = Self::Inner { + thread_manager, + task_creator, }; - inner.thread_manager.start_threads( - context.clone(), - result_with_timings, - pool.create_handler_context(context.mode(), &inner.thread_manager.new_task_sender), - ); Self { inner, context } } } +#[derive(Debug)] +pub enum BankingStageStatus { + Active, + Inactive, + Exited, +} + +pub trait BankingStageMonitor: Send + Debug { + fn status(&mut self) -> BankingStageStatus; +} + impl InstalledScheduler for PooledScheduler { fn id(&self) -> SchedulerId { self.inner.id() @@ -1848,16 +2218,29 @@ impl InstalledScheduler for PooledScheduler { transaction: RuntimeTransaction, index: usize, ) -> ScheduleResult { + //assert_matches!(self.context().mode(), BlockVerification); let task = SchedulingStateMachine::create_task(transaction, index, &mut |pubkey| { - self.inner.usage_queue_loader.load(pubkey) + self.inner.task_creator.usage_queue_loader().load(pubkey) }); self.inner.thread_manager.send_task(task) } + fn unblock_scheduling(&self) { + self.inner.thread_manager.unblock_session(); + } + fn recover_error_after_abort(&mut self) -> TransactionError { self.inner .thread_manager - .ensure_join_threads_after_abort(true) + .ensure_join_threads_after_abort(true); + self.inner + .thread_manager + .session_result_with_timings + .as_mut() + .unwrap() + .0 + .clone() + .unwrap_err() } fn wait_for_termination( @@ -1869,7 +2252,10 @@ impl InstalledScheduler for PooledScheduler { } fn pause_for_recent_blockhash(&mut self) { - self.inner.thread_manager.end_session(); + // this fn is called from poh thread, while it's being locked. so, we can't wait scheduler + // termination here to avoid deadlock. just async signaling is enough + let nonblocking = matches!(self.context().mode(), BlockProduction); + self.inner.thread_manager.do_end_session(nonblocking); } } @@ -1885,6 +2271,28 @@ where fn is_trashed(&self) -> bool { self.is_aborted() || self.is_overgrown() } + + fn is_overgrown(&self) -> bool { + self.task_creator + .is_overgrown(self.thread_manager.pool.max_usage_queue_count) + } + + fn reset(&self) { + if let Err(a) = self + .thread_manager + .new_task_sender + .send(NewTaskPayload::Reset) + { + warn!("failed to send a reset due to error: {a:?}"); + } + } + + fn ensure_abort(&mut self) { + if self.thread_manager.are_threads_joined() { + return; + } + self.thread_manager.disconnect_new_task_sender() + } } impl UninstalledScheduler for PooledSchedulerInner @@ -1918,7 +2326,9 @@ mod tests { bank::Bank, bank_forks::BankForks, genesis_utils::{create_genesis_config, GenesisConfigInfo}, - installed_scheduler_pool::{BankWithScheduler, SchedulingContext}, + installed_scheduler_pool::{ + BankWithScheduler, InstalledSchedulerPoolArc, SchedulingContext, + }, prioritization_fee_cache::PrioritizationFeeCache, }, solana_system_transaction as system_transaction, @@ -1932,6 +2342,56 @@ mod tests { test_case::test_matrix, }; + impl SchedulerPool + where + S: SpawnableScheduler, + TH: TaskHandler, + { + fn do_new_for_verification( + handler_count: Option, + log_messages_bytes_limit: Option, + transaction_status_sender: Option, + replay_vote_sender: Option, + prioritization_fee_cache: Arc, + pool_cleaner_interval: Duration, + max_pooling_duration: Duration, + max_usage_queue_count: usize, + timeout_duration: Duration, + ) -> Arc { + Self::do_new( + SupportedSchedulingMode::block_verification_only(), + handler_count, + log_messages_bytes_limit, + transaction_status_sender, + replay_vote_sender, + prioritization_fee_cache, + pool_cleaner_interval, + max_pooling_duration, + max_usage_queue_count, + timeout_duration, + ) + } + + // This apparently-meaningless wrapper is handy, because some callers explicitly want + // `dyn InstalledSchedulerPool` to be returned for type inference convenience. + fn new_dyn_for_verification( + handler_count: Option, + log_messages_bytes_limit: Option, + transaction_status_sender: Option, + replay_vote_sender: Option, + prioritization_fee_cache: Arc, + ) -> InstalledSchedulerPoolArc { + Self::new( + SupportedSchedulingMode::block_verification_only(), + handler_count, + log_messages_bytes_limit, + transaction_status_sender, + replay_vote_sender, + prioritization_fee_cache, + ) + } + } + #[derive(Debug)] enum TestCheckPoint { BeforeNewTask, @@ -1953,8 +2413,13 @@ mod tests { solana_logger::setup(); let ignored_prioritization_fee_cache = Arc::new(PrioritizationFeeCache::new(0u64)); - let pool = - DefaultSchedulerPool::new_dyn(None, None, None, None, ignored_prioritization_fee_cache); + let pool = DefaultSchedulerPool::new_dyn_for_verification( + None, + None, + None, + None, + ignored_prioritization_fee_cache, + ); // this indirectly proves that there should be circular link because there's only one Arc // at this moment now @@ -1969,11 +2434,16 @@ mod tests { solana_logger::setup(); let ignored_prioritization_fee_cache = Arc::new(PrioritizationFeeCache::new(0u64)); - let pool = - DefaultSchedulerPool::new_dyn(None, None, None, None, ignored_prioritization_fee_cache); + let pool = DefaultSchedulerPool::new_dyn_for_verification( + None, + None, + None, + None, + ignored_prioritization_fee_cache, + ); let bank = Arc::new(Bank::default_for_tests()); let context = SchedulingContext::for_verification(bank); - let scheduler = pool.take_scheduler(context); + let scheduler = pool.take_scheduler(context).unwrap(); let debug = format!("{scheduler:#?}"); assert!(!debug.is_empty()); @@ -1994,7 +2464,7 @@ mod tests { ]); let ignored_prioritization_fee_cache = Arc::new(PrioritizationFeeCache::new(0u64)); - let pool_raw = DefaultSchedulerPool::do_new( + let pool_raw = DefaultSchedulerPool::do_new_for_verification( None, None, None, @@ -2059,7 +2529,7 @@ mod tests { let ignored_prioritization_fee_cache = Arc::new(PrioritizationFeeCache::new(0u64)); const REDUCED_MAX_USAGE_QUEUE_COUNT: usize = 1; - let pool_raw = DefaultSchedulerPool::do_new( + let pool_raw = DefaultSchedulerPool::do_new_for_verification( None, None, None, @@ -2080,14 +2550,16 @@ mod tests { for _ in 0..REDUCED_MAX_USAGE_QUEUE_COUNT { small_scheduler .inner - .usage_queue_loader + .task_creator + .usage_queue_loader() .load(Pubkey::new_unique()); } let big_scheduler = pool.do_take_scheduler(context2); for _ in 0..REDUCED_MAX_USAGE_QUEUE_COUNT + 1 { big_scheduler .inner - .usage_queue_loader + .task_creator + .usage_queue_loader() .load(Pubkey::new_unique()); } @@ -2134,7 +2606,7 @@ mod tests { ]); let ignored_prioritization_fee_cache = Arc::new(PrioritizationFeeCache::new(0u64)); - let pool_raw = DefaultSchedulerPool::do_new( + let pool_raw = DefaultSchedulerPool::do_new_for_verification( None, None, None, @@ -2148,7 +2620,7 @@ mod tests { let pool = pool_raw.clone(); let bank = Arc::new(Bank::default_for_tests()); let context = SchedulingContext::for_verification(bank.clone()); - let scheduler = pool.take_scheduler(context); + let scheduler = pool.take_scheduler(context).unwrap(); let bank = BankWithScheduler::new(bank, Some(scheduler)); pool.register_timeout_listener(bank.create_timeout_listener()); assert_eq!(pool_raw.scheduler_inners.lock().unwrap().len(), 0); @@ -2182,17 +2654,18 @@ mod tests { ]); let ignored_prioritization_fee_cache = Arc::new(PrioritizationFeeCache::new(0u64)); - let pool_raw = SchedulerPool::, _>::do_new( - None, - None, - None, - None, - ignored_prioritization_fee_cache, - SHORTENED_POOL_CLEANER_INTERVAL, - DEFAULT_MAX_POOLING_DURATION, - DEFAULT_MAX_USAGE_QUEUE_COUNT, - SHORTENED_TIMEOUT_DURATION, - ); + let pool_raw = + SchedulerPool::, _>::do_new_for_verification( + None, + None, + None, + None, + ignored_prioritization_fee_cache, + SHORTENED_POOL_CLEANER_INTERVAL, + DEFAULT_MAX_POOLING_DURATION, + DEFAULT_MAX_USAGE_QUEUE_COUNT, + SHORTENED_TIMEOUT_DURATION, + ); #[derive(Debug)] struct ExecuteTimingCounter; @@ -2219,7 +2692,7 @@ mod tests { let context = SchedulingContext::for_verification(bank.clone()); - let scheduler = pool.take_scheduler(context); + let scheduler = pool.take_scheduler(context).unwrap(); let bank = BankWithScheduler::new(bank, Some(scheduler)); pool.register_timeout_listener(bank.create_timeout_listener()); @@ -2268,7 +2741,7 @@ mod tests { ]); let ignored_prioritization_fee_cache = Arc::new(PrioritizationFeeCache::new(0u64)); - let pool_raw = DefaultSchedulerPool::do_new( + let pool_raw = DefaultSchedulerPool::do_new_for_verification( None, None, None, @@ -2287,7 +2760,7 @@ mod tests { let context = SchedulingContext::for_verification(bank.clone()); - let scheduler = pool.take_scheduler(context); + let scheduler = pool.take_scheduler(context).unwrap(); let bank = BankWithScheduler::new(bank, Some(scheduler)); pool.register_timeout_listener(bank.create_timeout_listener()); @@ -2315,7 +2788,7 @@ mod tests { ]); let ignored_prioritization_fee_cache = Arc::new(PrioritizationFeeCache::new(0u64)); - let pool_raw = SchedulerPool::, _>::do_new( + let pool_raw = SchedulerPool::, _>::do_new_for_verification( None, None, None, @@ -2339,7 +2812,7 @@ mod tests { let context = SchedulingContext::for_verification(bank.clone()); - let scheduler = pool.take_scheduler(context); + let scheduler = pool.take_scheduler(context).unwrap(); let bank = BankWithScheduler::new(bank, Some(scheduler)); pool.register_timeout_listener(bank.create_timeout_listener()); @@ -2417,7 +2890,7 @@ mod tests { let bank = Bank::new_for_tests(&genesis_config); let (bank, _bank_forks) = setup_dummy_fork_graph(bank); let ignored_prioritization_fee_cache = Arc::new(PrioritizationFeeCache::new(0u64)); - let pool = SchedulerPool::, _>::new( + let pool = SchedulerPool::, _>::new_for_verification( None, None, None, @@ -2509,7 +2982,7 @@ mod tests { let bank = Bank::new_for_tests(&genesis_config); let (bank, _bank_forks) = setup_dummy_fork_graph(bank); let ignored_prioritization_fee_cache = Arc::new(PrioritizationFeeCache::new(0u64)); - let pool = SchedulerPool::, _>::new( + let pool = SchedulerPool::, _>::new_for_verification( None, None, None, @@ -2549,8 +3022,13 @@ mod tests { solana_logger::setup(); let ignored_prioritization_fee_cache = Arc::new(PrioritizationFeeCache::new(0u64)); - let pool = - DefaultSchedulerPool::new(None, None, None, None, ignored_prioritization_fee_cache); + let pool = DefaultSchedulerPool::new_for_verification( + None, + None, + None, + None, + ignored_prioritization_fee_cache, + ); let bank = Arc::new(Bank::default_for_tests()); let context = &SchedulingContext::for_verification(bank); @@ -2578,8 +3056,13 @@ mod tests { solana_logger::setup(); let ignored_prioritization_fee_cache = Arc::new(PrioritizationFeeCache::new(0u64)); - let pool = - DefaultSchedulerPool::new(None, None, None, None, ignored_prioritization_fee_cache); + let pool = DefaultSchedulerPool::new_for_verification( + None, + None, + None, + None, + ignored_prioritization_fee_cache, + ); let bank = Arc::new(Bank::default_for_tests()); let context = &SchedulingContext::for_verification(bank); let mut scheduler = pool.do_take_scheduler(context.clone()); @@ -2597,8 +3080,13 @@ mod tests { solana_logger::setup(); let ignored_prioritization_fee_cache = Arc::new(PrioritizationFeeCache::new(0u64)); - let pool = - DefaultSchedulerPool::new(None, None, None, None, ignored_prioritization_fee_cache); + let pool = DefaultSchedulerPool::new_for_verification( + None, + None, + None, + None, + ignored_prioritization_fee_cache, + ); let old_bank = &Arc::new(Bank::default_for_tests()); let new_bank = &Arc::new(Bank::default_for_tests()); assert!(!Arc::ptr_eq(old_bank, new_bank)); @@ -2610,7 +3098,7 @@ mod tests { let scheduler_id = scheduler.id(); pool.return_scheduler(scheduler.into_inner().1); - let scheduler = pool.take_scheduler(new_context.clone()); + let scheduler = pool.take_scheduler(new_context.clone()).unwrap(); assert_eq!(scheduler_id, scheduler.id()); assert!(Arc::ptr_eq(scheduler.context().bank().unwrap(), new_bank)); } @@ -2623,8 +3111,13 @@ mod tests { let bank_forks = BankForks::new_rw_arc(bank); let mut bank_forks = bank_forks.write().unwrap(); let ignored_prioritization_fee_cache = Arc::new(PrioritizationFeeCache::new(0u64)); - let pool = - DefaultSchedulerPool::new_dyn(None, None, None, None, ignored_prioritization_fee_cache); + let pool = DefaultSchedulerPool::new_dyn_for_verification( + None, + None, + None, + None, + ignored_prioritization_fee_cache, + ); bank_forks.install_scheduler_pool(pool); } @@ -2637,8 +3130,13 @@ mod tests { let child_bank = Bank::new_from_parent(bank, &Pubkey::default(), 1); let ignored_prioritization_fee_cache = Arc::new(PrioritizationFeeCache::new(0u64)); - let pool = - DefaultSchedulerPool::new_dyn(None, None, None, None, ignored_prioritization_fee_cache); + let pool = DefaultSchedulerPool::new_dyn_for_verification( + None, + None, + None, + None, + ignored_prioritization_fee_cache, + ); let bank = Bank::default_for_tests(); let bank_forks = BankForks::new_rw_arc(bank); @@ -2687,12 +3185,17 @@ mod tests { let bank = Bank::new_for_tests(&genesis_config); let (bank, _bank_forks) = setup_dummy_fork_graph(bank); let ignored_prioritization_fee_cache = Arc::new(PrioritizationFeeCache::new(0u64)); - let pool = - DefaultSchedulerPool::new_dyn(None, None, None, None, ignored_prioritization_fee_cache); + let pool = DefaultSchedulerPool::new_dyn_for_verification( + None, + None, + None, + None, + ignored_prioritization_fee_cache, + ); let context = SchedulingContext::for_verification(bank.clone()); assert_eq!(bank.transaction_count(), 0); - let scheduler = pool.take_scheduler(context); + let scheduler = pool.take_scheduler(context).unwrap(); scheduler.schedule_execution(tx0, 0).unwrap(); let bank = BankWithScheduler::new(bank, Some(scheduler)); assert_matches!(bank.wait_for_completed_scheduler(), Some((Ok(()), _))); @@ -2722,7 +3225,7 @@ mod tests { let (bank, _bank_forks) = setup_dummy_fork_graph(bank); let ignored_prioritization_fee_cache = Arc::new(PrioritizationFeeCache::new(0u64)); - let pool_raw = DefaultSchedulerPool::do_new( + let pool_raw = DefaultSchedulerPool::do_new_for_verification( None, None, None, @@ -2735,7 +3238,7 @@ mod tests { ); let pool = pool_raw.clone(); let context = SchedulingContext::for_verification(bank.clone()); - let scheduler = pool.take_scheduler(context); + let scheduler = pool.take_scheduler(context).unwrap(); let unfunded_keypair = Keypair::new(); let bad_tx = RuntimeTransaction::from_transaction_for_tests(system_transaction::transfer( @@ -2855,7 +3358,7 @@ mod tests { const TX_COUNT: usize = 2; let ignored_prioritization_fee_cache = Arc::new(PrioritizationFeeCache::new(0u64)); - let pool = SchedulerPool::, _>::new_dyn( + let pool = SchedulerPool::, _>::new_dyn_for_verification( Some(TX_COUNT), // fix to use exactly 2 handlers None, None, @@ -2864,7 +3367,7 @@ mod tests { ); let context = SchedulingContext::for_verification(bank.clone()); - let scheduler = pool.take_scheduler(context); + let scheduler = pool.take_scheduler(context).unwrap(); for index in 0..TX_COUNT { // Use 2 non-conflicting txes to exercise the channel disconnected case as well. @@ -2930,7 +3433,7 @@ mod tests { let bank = Bank::new_for_tests(&genesis_config); let (bank, _bank_forks) = setup_dummy_fork_graph(bank); let ignored_prioritization_fee_cache = Arc::new(PrioritizationFeeCache::new(0u64)); - let pool = SchedulerPool::, _>::new( + let pool = SchedulerPool::, _>::new_for_verification( None, None, None, @@ -2963,8 +3466,13 @@ mod tests { assert!(*TASK_COUNT.lock().unwrap() < 10); } - #[test] - fn test_scheduler_schedule_execution_blocked() { + #[allow(clippy::arithmetic_side_effects)] + #[test_matrix( + [BlockVerification, BlockProduction] + )] + fn test_scheduler_schedule_execution_blocked_at_session_ending( + scheduling_mode: SchedulingMode, + ) { solana_logger::setup(); const STALLED_TRANSACTION_INDEX: usize = 0; @@ -2995,7 +3503,7 @@ mod tests { genesis_config, mint_keypair, .. - } = create_genesis_config(10_000); + } = solana_ledger::genesis_utils::create_genesis_config(10_000); // tx0 and tx1 is definitely conflicting to write-lock the mint address let tx0 = RuntimeTransaction::from_transaction_for_tests(system_transaction::transfer( @@ -3014,17 +3522,53 @@ mod tests { let bank = Bank::new_for_tests(&genesis_config); let (bank, _bank_forks) = setup_dummy_fork_graph(bank); let ignored_prioritization_fee_cache = Arc::new(PrioritizationFeeCache::new(0u64)); - let pool = SchedulerPool::, _>::new_dyn( + let pool = SchedulerPool::, _>::new_for_production( None, None, None, None, ignored_prioritization_fee_cache, ); - let context = SchedulingContext::for_verification(bank.clone()); + let (ledger_path, _blockhash) = create_new_tmp_ledger_auto_delete!(&genesis_config); + let blockstore = Arc::new(Blockstore::open(ledger_path.path()).unwrap()); + let leader_schedule_cache = Arc::new(LeaderScheduleCache::new_from_bank(&bank)); + let (_banking_packet_sender, banking_packet_receiver) = crossbeam_channel::unbounded(); + let (exit, poh_recorder, poh_service, _signal_receiver) = + create_test_recorder_with_index_tracking( + bank.clone(), + blockstore.clone(), + None, + Some(leader_schedule_cache), + ); - assert_eq!(bank.transaction_count(), 0); - let scheduler = pool.take_scheduler(context); + if matches!(scheduling_mode, BlockProduction) { + pool.register_banking_stage( + banking_packet_receiver, + DefaultSchedulerPool::default_handler_count(), + Box::new(DummyBankingMinitor), + Box::new(|_, _| unreachable!()), + poh_recorder.read().unwrap().new_recorder(), + ); + } + + // Wait until genesis_bank reaches its tick height... + while poh_recorder.read().unwrap().bank().is_some() { + sleep(Duration::from_millis(100)); + } + + let bank = Arc::new(Bank::new_from_parent( + bank.clone(), + &Pubkey::default(), + bank.slot().checked_add(1).unwrap(), + )); + let mut current_transaction_count = 0; + assert_eq!(bank.transaction_count(), current_transaction_count); + + let context = SchedulingContext::new_with_mode(scheduling_mode, bank.clone()); + let scheduler = pool.take_scheduler(context).unwrap(); + if matches!(scheduling_mode, BlockProduction) { + scheduler.unblock_scheduling(); + } // Stall handling tx0 and tx1 let lock_to_stall = LOCK_TO_STALL.lock().unwrap(); @@ -3035,14 +3579,61 @@ mod tests { .schedule_execution(tx1, BLOCKED_TRANSACTION_INDEX) .unwrap(); + let bank = BankWithScheduler::new(bank, Some(scheduler)); + poh_recorder + .write() + .unwrap() + .set_bank(bank.clone_with_scheduler(), true); + // Wait a bit for the scheduler thread to decide to block tx1 - std::thread::sleep(std::time::Duration::from_secs(1)); + std::thread::sleep(std::time::Duration::from_millis(100)); // Resume handling by unlocking LOCK_TO_STALL drop(lock_to_stall); + assert_matches!(bank.wait_for_completed_scheduler(), Some((Ok(()), _))); + + current_transaction_count += 1; + if matches!(scheduling_mode, BlockVerification) { + // Block verification scheduler should fully clear its blocked transactions before + // finishing. + current_transaction_count += 1; + } + assert_eq!(bank.transaction_count(), current_transaction_count); + + // Wait until genesis_bank reaches its tick height... + while poh_recorder.read().unwrap().bank().is_some() { + sleep(Duration::from_millis(100)); + } + + // Create new bank to observe behavior difference around session ending + let bank = Arc::new(Bank::new_from_parent( + bank.clone_without_scheduler(), + &Pubkey::default(), + bank.slot().checked_add(1).unwrap(), + )); + assert_eq!(bank.transaction_count(), current_transaction_count); + + let context = SchedulingContext::new_with_mode(scheduling_mode, bank.clone()); + let scheduler = pool.take_scheduler(context).unwrap(); + if matches!(scheduling_mode, BlockProduction) { + scheduler.unblock_scheduling(); + } + let bank = BankWithScheduler::new(bank, Some(scheduler)); + poh_recorder + .write() + .unwrap() + .set_bank(bank.clone_with_scheduler(), true); assert_matches!(bank.wait_for_completed_scheduler(), Some((Ok(()), _))); - assert_eq!(bank.transaction_count(), 2); + + if matches!(scheduling_mode, BlockProduction) { + // Block production scheduler should carry over transactions from previous bank + current_transaction_count += 1; + } + assert_eq!(bank.transaction_count(), current_transaction_count); + + exit.store(true, Ordering::Relaxed); + poh_service.join().unwrap(); } #[test] @@ -3080,7 +3671,7 @@ mod tests { )); let ignored_prioritization_fee_cache = Arc::new(PrioritizationFeeCache::new(0u64)); - let pool = SchedulerPool::, _>::new( + let pool = SchedulerPool::, _>::new_for_verification( Some(4), // spawn 4 threads None, None, @@ -3105,7 +3696,7 @@ mod tests { .cycle() .take(10000) { - let scheduler = pool.take_scheduler(context.clone()); + let scheduler = pool.take_scheduler(context.clone()).unwrap(); scheduler .schedule_execution(dummy_tx.clone(), index) .unwrap(); @@ -3175,7 +3766,7 @@ mod tests { &task, &pool.create_handler_context( BlockVerification, - &crossbeam_channel::unbounded().0, + &Arc::new(crossbeam_channel::unbounded().0), ), ); (result, timings) @@ -3184,6 +3775,10 @@ mod tests { Ok(()) } + fn unblock_scheduling(&self) { + unimplemented!(); + } + fn recover_error_after_abort(&mut self) -> TransactionError { unimplemented!(); } @@ -3218,6 +3813,18 @@ mod tests { fn is_trashed(&self) -> bool { false } + + fn is_overgrown(&self) -> bool { + unimplemented!() + } + + fn reset(&self) { + unimplemented!() + } + + fn ensure_abort(&mut self) { + unimplemented!() + } } impl UninstalledScheduler @@ -3293,14 +3900,14 @@ mod tests { let ignored_prioritization_fee_cache = Arc::new(PrioritizationFeeCache::new(0u64)); let pool = - SchedulerPool::, DefaultTaskHandler>::new_dyn( + SchedulerPool::, DefaultTaskHandler>::new_dyn_for_verification( None, None, None, None, ignored_prioritization_fee_cache, ); - let scheduler = pool.take_scheduler(context); + let scheduler = pool.take_scheduler(context).unwrap(); let bank = BankWithScheduler::new(bank, Some(scheduler)); assert_eq!(bank.transaction_count(), 0); @@ -3377,12 +3984,13 @@ mod tests { let prioritization_fee_cache = Arc::new(PrioritizationFeeCache::new(0u64)); let scheduling_context = &SchedulingContext::for_verification(bank.clone()); let handler_context = &HandlerContext { + parallelism: 0, log_messages_bytes_limit: None, transaction_status_sender: None, replay_vote_sender: None, prioritization_fee_cache, banking_packet_receiver: never(), - banking_packet_handler: Box::new(|_, _| {}), + banking_packet_handler: Box::new(|_, _| Ok(())), banking_stage_helper: None, transaction_recorder: None, }; @@ -3461,12 +4069,13 @@ mod tests { Some(leader_schedule_cache), ); let handler_context = &HandlerContext { + parallelism: 0, log_messages_bytes_limit: None, transaction_status_sender: Some(TransactionStatusSender { sender }), replay_vote_sender: None, prioritization_fee_cache, banking_packet_receiver: never(), - banking_packet_handler: Box::new(|_, _| {}), + banking_packet_handler: Box::new(|_, _| Ok(())), banking_stage_helper: None, transaction_recorder: Some(poh_recorder.read().unwrap().new_recorder()), }; @@ -3530,6 +4139,15 @@ mod tests { poh_service.join().unwrap(); } + #[derive(Debug)] + struct DummyBankingMinitor; + + impl BankingStageMonitor for DummyBankingMinitor { + fn status(&mut self) -> BankingStageStatus { + BankingStageStatus::Active + } + } + #[test] fn test_block_production_scheduler_schedule_execution_success() { solana_logger::setup(); @@ -3544,8 +4162,13 @@ mod tests { let (bank, _bank_forks) = setup_dummy_fork_graph(bank); let ignored_prioritization_fee_cache = Arc::new(PrioritizationFeeCache::new(0u64)); - let pool = - DefaultSchedulerPool::new(None, None, None, None, ignored_prioritization_fee_cache); + let pool = DefaultSchedulerPool::new_for_production( + None, + None, + None, + None, + ignored_prioritization_fee_cache, + ); let (_banking_packet_sender, banking_packet_receiver) = crossbeam_channel::unbounded(); let (ledger_path, _blockhash) = create_new_tmp_ledger_auto_delete!(&genesis_config); @@ -3560,6 +4183,8 @@ mod tests { ); pool.register_banking_stage( banking_packet_receiver, + DefaultSchedulerPool::default_handler_count(), + Box::new(DummyBankingMinitor), // we don't use the banking packet channel in this test. so, pass panicking handler. Box::new(|_, _| unreachable!()), poh_recorder.read().unwrap().new_recorder(), @@ -3567,7 +4192,8 @@ mod tests { assert_eq!(bank.transaction_count(), 0); let context = SchedulingContext::for_production(bank.clone()); - let scheduler = pool.take_scheduler(context); + let scheduler = pool.take_scheduler(context).unwrap(); + scheduler.unblock_scheduling(); let tx0 = RuntimeTransaction::from_transaction_for_tests(system_transaction::transfer( &mint_keypair, &solana_pubkey::new_rand(), @@ -3601,8 +4227,13 @@ mod tests { let (bank, _bank_forks) = setup_dummy_fork_graph(bank); let ignored_prioritization_fee_cache = Arc::new(PrioritizationFeeCache::new(0u64)); - let pool = - DefaultSchedulerPool::new(None, None, None, None, ignored_prioritization_fee_cache); + let pool = DefaultSchedulerPool::new_for_production( + None, + None, + None, + None, + ignored_prioritization_fee_cache, + ); let (ledger_path, _blockhash) = create_new_tmp_ledger_auto_delete!(&genesis_config); let blockstore = Arc::new(Blockstore::open(ledger_path.path()).unwrap()); @@ -3635,6 +4266,8 @@ mod tests { }); pool.register_banking_stage( banking_packet_receiver, + DefaultSchedulerPool::default_handler_count(), + Box::new(DummyBankingMinitor), fixed_banking_packet_handler, poh_recorder.read().unwrap().new_recorder(), ); @@ -3645,7 +4278,8 @@ mod tests { assert_eq!(bank.transaction_count(), 0); let context = SchedulingContext::for_production(bank.clone()); - let scheduler = pool.take_scheduler(context); + let scheduler = pool.take_scheduler(context).unwrap(); + scheduler.unblock_scheduling(); let bank = BankWithScheduler::new(bank, Some(scheduler)); assert_matches!(bank.wait_for_completed_scheduler(), Some((Ok(()), _))); assert_eq!(bank.transaction_count(), 1); @@ -3672,8 +4306,13 @@ mod tests { let (bank, _bank_forks) = setup_dummy_fork_graph(bank); let ignored_prioritization_fee_cache = Arc::new(PrioritizationFeeCache::new(0u64)); - let pool = - DefaultSchedulerPool::new(None, None, None, None, ignored_prioritization_fee_cache); + let pool = DefaultSchedulerPool::new_for_production( + None, + None, + None, + None, + ignored_prioritization_fee_cache, + ); let (ledger_path, _blockhash) = create_new_tmp_ledger_auto_delete!(&genesis_config); let blockstore = Arc::new(Blockstore::open(ledger_path.path()).unwrap()); @@ -3701,6 +4340,8 @@ mod tests { let (banking_packet_sender, banking_packet_receiver) = crossbeam_channel::unbounded(); pool.register_banking_stage( banking_packet_receiver, + DefaultSchedulerPool::default_handler_count(), + Box::new(DummyBankingMinitor), fixed_banking_packet_handler, poh_recorder.read().unwrap().new_recorder(), ); @@ -3708,7 +4349,8 @@ mod tests { // Quickly take and return the scheduler so that this test can test the behavior while // waiting for new session... let context = SchedulingContext::for_production(bank.clone()); - let scheduler = pool.take_scheduler(context.clone()); + let scheduler = pool.take_scheduler(context.clone()).unwrap(); + scheduler.unblock_scheduling(); let bank_tmp = BankWithScheduler::new(bank.clone(), Some(scheduler)); assert_matches!(bank_tmp.wait_for_completed_scheduler(), Some((Ok(()), _))); @@ -3723,7 +4365,8 @@ mod tests { assert_eq!(banking_packet_sender.len(), 0); assert_eq!(bank.transaction_count(), 0); - let scheduler = pool.take_scheduler(context); + let scheduler = pool.take_scheduler(context).unwrap(); + scheduler.unblock_scheduling(); let bank = BankWithScheduler::new(bank, Some(scheduler)); assert_matches!(bank.wait_for_completed_scheduler(), Some((Ok(()), _))); assert_eq!(bank.transaction_count(), 1); @@ -3738,8 +4381,13 @@ mod tests { solana_logger::setup(); let ignored_prioritization_fee_cache = Arc::new(PrioritizationFeeCache::new(0u64)); - let pool = - DefaultSchedulerPool::new(None, None, None, None, ignored_prioritization_fee_cache); + let pool = DefaultSchedulerPool::new_for_production( + None, + None, + None, + None, + ignored_prioritization_fee_cache, + ); let bank = Arc::new(Bank::default_for_tests()); let context = &SchedulingContext::for_production(bank); let scheduler = pool.do_take_scheduler(context.clone()); @@ -3757,8 +4405,13 @@ mod tests { let (bank, _bank_forks) = setup_dummy_fork_graph(bank); let ignored_prioritization_fee_cache = Arc::new(PrioritizationFeeCache::new(0u64)); - let pool = - DefaultSchedulerPool::new(None, None, None, None, ignored_prioritization_fee_cache); + let pool = DefaultSchedulerPool::new_for_production( + None, + None, + None, + None, + ignored_prioritization_fee_cache, + ); let (ledger_path, _blockhash) = create_new_tmp_ledger_auto_delete!(&genesis_config); let blockstore = Arc::new(Blockstore::open(ledger_path.path()).unwrap()); @@ -3773,6 +4426,8 @@ mod tests { let (_banking_packet_sender, banking_packet_receiver) = crossbeam_channel::unbounded(); pool.register_banking_stage( banking_packet_receiver, + DefaultSchedulerPool::default_handler_count(), + Box::new(DummyBankingMinitor), Box::new(|_, _| unreachable!()), poh_recorder.read().unwrap().new_recorder(), ); @@ -3800,6 +4455,7 @@ mod tests { let ignored_prioritization_fee_cache = Arc::new(PrioritizationFeeCache::new(0u64)); const REDUCED_MAX_USAGE_QUEUE_COUNT: usize = 0; let pool = DefaultSchedulerPool::do_new( + SupportedSchedulingMode::block_production_only(), None, None, None, @@ -3825,23 +4481,28 @@ mod tests { let (_banking_packet_sender, banking_packet_receiver) = crossbeam_channel::unbounded(); pool.register_banking_stage( banking_packet_receiver, + DefaultSchedulerPool::default_handler_count(), + Box::new(DummyBankingMinitor), Box::new(|_, _| unreachable!()), poh_recorder.read().unwrap().new_recorder(), ); let context = SchedulingContext::for_production(bank); let scheduler = pool.do_take_scheduler(context.clone()); + scheduler.unblock_scheduling(); let trashed_old_scheduler_id = scheduler.id(); // Make scheduler overgrown and trash it by returning scheduler .inner - .usage_queue_loader + .task_creator + .usage_queue_loader() .load(Pubkey::new_unique()); Box::new(scheduler.into_inner().1).return_to_pool(); // Re-take a brand-new one let scheduler = pool.do_take_scheduler(context); + scheduler.unblock_scheduling(); let respawned_new_scheduler_id = scheduler.id(); Box::new(scheduler.into_inner().1).return_to_pool(); @@ -3862,8 +4523,13 @@ mod tests { let (bank, _bank_forks) = setup_dummy_fork_graph(bank); let ignored_prioritization_fee_cache = Arc::new(PrioritizationFeeCache::new(0u64)); - let pool = - DefaultSchedulerPool::new(None, None, None, None, ignored_prioritization_fee_cache); + let pool = DefaultSchedulerPool::new_for_production( + None, + None, + None, + None, + ignored_prioritization_fee_cache, + ); let (_banking_packet_sender, banking_packet_receiver) = crossbeam_channel::unbounded(); let (ledger_path, _blockhash) = create_new_tmp_ledger_auto_delete!(&genesis_config); @@ -3878,6 +4544,8 @@ mod tests { ); pool.register_banking_stage( banking_packet_receiver, + DefaultSchedulerPool::default_handler_count(), + Box::new(DummyBankingMinitor), Box::new(|_, _| unreachable!()), poh_recorder.read().unwrap().new_recorder(), ); @@ -3885,7 +4553,7 @@ mod tests { // Make sure the assertion in BlockProductionSchedulerInner::can_put() doesn't cause false // positives... let context = SchedulingContext::for_verification(bank.clone()); - let scheduler = pool.take_scheduler(context); + let scheduler = pool.take_scheduler(context).unwrap(); let bank_tmp = BankWithScheduler::new(bank, Some(scheduler)); assert_matches!(bank_tmp.wait_for_completed_scheduler(), Some((Ok(()), _))); diff --git a/validator/src/commands/run/args.rs b/validator/src/commands/run/args.rs index f4284cfbdd1719..2da81ca0383147 100644 --- a/validator/src/commands/run/args.rs +++ b/validator/src/commands/run/args.rs @@ -1589,6 +1589,15 @@ pub fn add_args<'a>(app: App<'a, 'a>, default_args: &'a DefaultArgs) -> App<'a, .validator(|s| is_within_range(s, 1..)) .help(DefaultSchedulerPool::cli_message()), ) + .arg( + Arg::with_name("enable_experimental_block_production_method") + .long("enable-experimental-block-production-method") + .takes_value(false) + .help( + "Accept unified-scheduler to be used as an experimental block \ + production method", + ), + ) .arg( Arg::with_name("wen_restart") .long("wen-restart") diff --git a/validator/src/commands/run/execute.rs b/validator/src/commands/run/execute.rs index e35a065ce90859..3dde5ef4004776 100644 --- a/validator/src/commands/run/execute.rs +++ b/validator/src/commands/run/execute.rs @@ -1040,6 +1040,17 @@ pub fn execute( "block_production_method", BlockProductionMethod ) + .inspect(|method| { + if matches!(method, BlockProductionMethod::UnifiedScheduler) + && !matches.is_present("enable_experimental_block_production_method") + { + eprintln!( + "Currently, the unified-scheduler method is experimental for block-production. \ + Explicitly pass --enable-experimental-block-production-method to use it." + ); + exit(1); + } + }) .unwrap_or_default(); validator_config.transaction_struct = value_t!( matches, // comment to align formatting...