From 50de43d5a0dfec23cfaaea93af2318a9e988a4e5 Mon Sep 17 00:00:00 2001 From: Ryo Onodera Date: Mon, 16 Dec 2024 13:47:05 +0000 Subject: [PATCH 01/12] Support tx poh recording in unified scheduler --- Cargo.lock | 2 + ledger/benches/blockstore_processor.rs | 1 + ledger/src/blockstore_processor.rs | 155 +++++++++++++++--- poh/src/poh_recorder.rs | 38 ++++- programs/sbf/Cargo.lock | 2 + runtime/Cargo.toml | 1 + runtime/src/bank.rs | 40 ++++- runtime/src/installed_scheduler_pool.rs | 21 ++- sdk/transaction-error/src/lib.rs | 5 + storage-proto/proto/transaction_by_addr.proto | 1 + storage-proto/src/convert.rs | 4 + svm/examples/Cargo.lock | 2 + transaction-status/src/token_balances.rs | 1 + unified-scheduler-logic/src/lib.rs | 6 + unified-scheduler-pool/Cargo.toml | 2 + unified-scheduler-pool/src/lib.rs | 144 +++++++++++++++- 16 files changed, 390 insertions(+), 35 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 2fcdb5f2ec439f..aac73b4588039a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8769,6 +8769,7 @@ dependencies = [ "solana-system-program", "solana-timings", "solana-transaction-status-client-types", + "solana-unified-scheduler-logic", "solana-version", "solana-vote", "solana-vote-program", @@ -10022,6 +10023,7 @@ dependencies = [ "solana-keypair", "solana-ledger", "solana-logger", + "solana-poh", "solana-pubkey", "solana-runtime", "solana-runtime-transaction", diff --git a/ledger/benches/blockstore_processor.rs b/ledger/benches/blockstore_processor.rs index 44f65db1d54fd4..711c5381b63b8c 100644 --- a/ledger/benches/blockstore_processor.rs +++ b/ledger/benches/blockstore_processor.rs @@ -162,6 +162,7 @@ fn bench_execute_batch( &mut timing, None, &prioritization_fee_cache, + None:: Option>>, ); } }); diff --git a/ledger/src/blockstore_processor.rs b/ledger/src/blockstore_processor.rs index ac6b578c193e6e..a3dd10e7a69cae 100644 --- a/ledger/src/blockstore_processor.rs +++ b/ledger/src/blockstore_processor.rs @@ -110,6 +110,7 @@ fn first_err(results: &[Result<()>]) -> Result<()> { fn get_first_error( batch: &TransactionBatch, commit_results: &[TransactionCommitResult], + is_block_producing_unified_scheduler: bool, ) -> Option<(Result<()>, Signature)> { let mut first_err = None; for (commit_result, transaction) in commit_results.iter().zip(batch.sanitized_transactions()) { @@ -117,18 +118,22 @@ fn get_first_error( if first_err.is_none() { first_err = Some((Err(err.clone()), *transaction.signature())); } - warn!( - "Unexpected validator error: {:?}, transaction: {:?}", - err, transaction - ); - datapoint_error!( - "validator_process_entry_error", - ( - "error", - format!("error: {err:?}, transaction: {transaction:?}"), - String - ) - ); + // Skip with block producing unified scheduler because it's quite common to observe + // transaction errors... + if !is_block_producing_unified_scheduler { + warn!( + "Unexpected validator error: {:?}, transaction: {:?}", + err, transaction + ); + datapoint_error!( + "validator_process_entry_error", + ( + "error", + format!("error: {err:?}, transaction: {transaction:?}"), + String + ) + ); + } } } first_err @@ -150,12 +155,14 @@ pub fn execute_batch( timings: &mut ExecuteTimings, log_messages_bytes_limit: Option, prioritization_fee_cache: &PrioritizationFeeCache, + pre_commit_callback: Option Option>>, ) -> Result<()> { let TransactionBatchWithIndexes { batch, transaction_indexes, } = batch; let record_token_balances = transaction_status_sender.is_some(); + let mut transaction_indexes = transaction_indexes.to_vec(); let mut mint_decimals: HashMap = HashMap::new(); @@ -165,14 +172,34 @@ pub fn execute_batch( vec![] }; - let (commit_results, balances) = batch.bank().load_execute_and_commit_transactions( - batch, - MAX_PROCESSING_AGE, - transaction_status_sender.is_some(), - ExecutionRecordingConfig::new_single_setting(transaction_status_sender.is_some()), - timings, - log_messages_bytes_limit, - ); + let is_block_producing_unified_scheduler = pre_commit_callback.is_some(); + let pre_commit_callback = pre_commit_callback.map(|wrapped_callback| { + || { + wrapped_callback() + .inspect(|&maybe_index| { + if let Some(index) = maybe_index { + assert!(transaction_indexes.is_empty()); + transaction_indexes.push(index); + } + }) + .is_some() + } + }); + + let Some((commit_results, balances)) = batch + .bank() + .load_execute_and_commit_transactions_with_pre_commit_callback( + batch, + MAX_PROCESSING_AGE, + transaction_status_sender.is_some(), + ExecutionRecordingConfig::new_single_setting(transaction_status_sender.is_some()), + timings, + log_messages_bytes_limit, + pre_commit_callback, + ) + else { + return Err(TransactionError::CommitFailed); + }; bank_utils::find_and_send_votes( batch.sanitized_transactions(), @@ -201,7 +228,7 @@ pub fn execute_batch( .filter_map(|(commit_result, tx)| commit_result.was_committed().then_some(tx)) .collect_vec(); - let first_err = get_first_error(batch, &commit_results); + let first_err = get_first_error(batch, &commit_results, is_block_producing_unified_scheduler); if let Some(transaction_status_sender) = transaction_status_sender { let transactions: Vec = batch @@ -224,7 +251,7 @@ pub fn execute_batch( commit_results, balances, token_balances, - transaction_indexes.to_vec(), + transaction_indexes, ); } @@ -322,6 +349,7 @@ fn execute_batches_internal( &mut timings, log_messages_bytes_limit, prioritization_fee_cache, + None:: Option>>, )); let thread_index = replay_tx_thread_pool.current_thread_index().unwrap(); @@ -2202,11 +2230,13 @@ pub fn process_single_slot( } #[allow(clippy::large_enum_variant)] +#[derive(Debug)] pub enum TransactionStatusMessage { Batch(TransactionStatusBatch), Freeze(Slot), } +#[derive(Debug)] pub struct TransactionStatusBatch { pub slot: Slot, pub transactions: Vec, @@ -4392,7 +4422,7 @@ pub mod tests { &mut ExecuteTimings::default(), None, ); - let (err, signature) = get_first_error(&batch, &commit_results).unwrap(); + let (err, signature) = get_first_error(&batch, &commit_results, false).unwrap(); assert_eq!(err.unwrap_err(), TransactionError::AccountNotFound); assert_eq!(signature, account_not_found_sig); } @@ -5041,6 +5071,85 @@ pub mod tests { do_test_schedule_batches_for_execution(false); } + fn do_test_execute_batch_pre_commit_callback(poh_result: Option>) { + solana_logger::setup(); + let dummy_leader_pubkey = solana_sdk::pubkey::new_rand(); + let GenesisConfigInfo { + genesis_config, + mint_keypair, + .. + } = create_genesis_config_with_leader(500, &dummy_leader_pubkey, 100); + let bank = Bank::new_for_tests(&genesis_config); + let (bank, _bank_forks) = bank.wrap_with_bank_forks_for_tests(); + let bank = Arc::new(bank); + let txs = create_test_transactions(&mint_keypair, &genesis_config.hash()); + let mut batch = TransactionBatch::new( + vec![Ok(()); 1], + &bank, + OwnedOrBorrowed::Borrowed(&txs[0..1]), + ); + batch.set_needs_unlock(false); + let poh_with_index = matches!(poh_result, Some(Some(_))); + let transaction_indexes = if poh_with_index { vec![] } else { vec![3] }; + let batch = TransactionBatchWithIndexes { + batch, + transaction_indexes, + }; + let prioritization_fee_cache = PrioritizationFeeCache::default(); + let mut timing = ExecuteTimings::default(); + let (sender, receiver) = crossbeam_channel::unbounded(); + + assert_eq!(bank.transaction_count(), 0); + let result = execute_batch( + &batch, + &bank, + Some(&TransactionStatusSender { sender }), + None, + &mut timing, + None, + &prioritization_fee_cache, + Some(|| poh_result), + ); + let should_succeed = poh_result.is_some(); + if should_succeed { + assert_matches!(result, Ok(())); + assert_eq!(bank.transaction_count(), 1); + } else { + assert_matches!(result, Err(TransactionError::CommitFailed)); + assert_eq!(bank.transaction_count(), 0); + } + if poh_with_index { + assert_matches!( + receiver.try_recv(), + Ok(TransactionStatusMessage::Batch(TransactionStatusBatch{transaction_indexes, ..})) + if transaction_indexes == vec![4_usize] + ); + } else if should_succeed { + assert_matches!( + receiver.try_recv(), + Ok(TransactionStatusMessage::Batch(TransactionStatusBatch{transaction_indexes, ..})) + if transaction_indexes == vec![3_usize] + ); + } else { + assert_matches!(receiver.try_recv(), Err(_)); + } + } + + #[test] + fn test_execute_batch_pre_commit_callback_success() { + do_test_execute_batch_pre_commit_callback(Some(None)); + } + + #[test] + fn test_execute_batch_pre_commit_callback_success_with_index() { + do_test_execute_batch_pre_commit_callback(Some(Some(4))); + } + + #[test] + fn test_execute_batch_pre_commit_callback_failure() { + do_test_execute_batch_pre_commit_callback(None); + } + #[test] fn test_confirm_slot_entries_with_fix() { const HASHES_PER_TICK: u64 = 10; diff --git a/poh/src/poh_recorder.rs b/poh/src/poh_recorder.rs index 0fcdb217e05fae..5a16cc791b0d42 100644 --- a/poh/src/poh_recorder.rs +++ b/poh/src/poh_recorder.rs @@ -135,7 +135,7 @@ pub struct RecordTransactionsSummary { pub starting_transaction_index: Option, } -#[derive(Clone)] +#[derive(Clone, Debug)] pub struct TransactionRecorder { // shared by all users of PohRecorder pub record_sender: Sender, @@ -1139,11 +1139,12 @@ impl PohRecorder { } } -pub fn create_test_recorder( +fn do_create_test_recorder( bank: Arc, blockstore: Arc, poh_config: Option, leader_schedule_cache: Option>, + track_transaction_indexes: bool, ) -> ( Arc, Arc>, @@ -1169,7 +1170,10 @@ pub fn create_test_recorder( ); let ticks_per_slot = bank.ticks_per_slot(); - poh_recorder.set_bank(BankWithScheduler::new_without_scheduler(bank), false); + poh_recorder.set_bank( + BankWithScheduler::new_without_scheduler(bank), + track_transaction_indexes, + ); let poh_recorder = Arc::new(RwLock::new(poh_recorder)); let poh_service = PohService::new( poh_recorder.clone(), @@ -1184,6 +1188,34 @@ pub fn create_test_recorder( (exit, poh_recorder, poh_service, entry_receiver) } +pub fn create_test_recorder( + bank: Arc, + blockstore: Arc, + poh_config: Option, + leader_schedule_cache: Option>, +) -> ( + Arc, + Arc>, + PohService, + Receiver, +) { + do_create_test_recorder(bank, blockstore, poh_config, leader_schedule_cache, false) +} + +pub fn create_test_recorder_with_index_tracking( + bank: Arc, + blockstore: Arc, + poh_config: Option, + leader_schedule_cache: Option>, +) -> ( + Arc, + Arc>, + PohService, + Receiver, +) { + do_create_test_recorder(bank, blockstore, poh_config, leader_schedule_cache, true) +} + #[cfg(test)] mod tests { use { diff --git a/programs/sbf/Cargo.lock b/programs/sbf/Cargo.lock index 35d8672e56cd1f..82736d4adf88d7 100644 --- a/programs/sbf/Cargo.lock +++ b/programs/sbf/Cargo.lock @@ -6891,6 +6891,7 @@ dependencies = [ "solana-system-program", "solana-timings", "solana-transaction-status-client-types", + "solana-unified-scheduler-logic", "solana-version", "solana-vote", "solana-vote-program", @@ -8331,6 +8332,7 @@ dependencies = [ "qualifier_attr", "scopeguard", "solana-ledger", + "solana-poh", "solana-pubkey", "solana-runtime", "solana-runtime-transaction", diff --git a/runtime/Cargo.toml b/runtime/Cargo.toml index 1556a459f5e0c0..0687e594a9dead 100644 --- a/runtime/Cargo.toml +++ b/runtime/Cargo.toml @@ -82,6 +82,7 @@ solana-svm-transaction = { workspace = true } solana-system-program = { workspace = true, optional = true } solana-timings = { workspace = true } solana-transaction-status-client-types = { workspace = true } +solana-unified-scheduler-logic = { workspace = true } solana-version = { workspace = true } solana-vote = { workspace = true } solana-vote-program = { workspace = true } diff --git a/runtime/src/bank.rs b/runtime/src/bank.rs index 8bb1c1f4b72a6d..dbeb7a68a9ebb6 100644 --- a/runtime/src/bank.rs +++ b/runtime/src/bank.rs @@ -249,7 +249,7 @@ struct RentMetrics { pub type BankStatusCache = StatusCache>; #[cfg_attr( feature = "frozen-abi", - frozen_abi(digest = "BHg4qpwegtaJypLUqAdjQYzYeLfEGf6tA4U5cREbHMHi") + frozen_abi(digest = "Fj6ATu6Rr5ossAykzbRSkCsuUzjdAZbYo5JaqfR1A72G") )] pub type BankSlotDelta = SlotDelta>; @@ -343,7 +343,7 @@ pub struct TransactionSimulationResult { pub inner_instructions: Option>, } -#[derive(Clone)] +#[derive(Clone, Debug)] pub struct TransactionBalancesSet { pub pre_balances: TransactionBalances, pub post_balances: TransactionBalances, @@ -4520,6 +4520,29 @@ impl Bank { timings: &mut ExecuteTimings, log_messages_bytes_limit: Option, ) -> (Vec, TransactionBalancesSet) { + self.load_execute_and_commit_transactions_with_pre_commit_callback( + batch, + max_age, + collect_balances, + recording_config, + timings, + log_messages_bytes_limit, + None:: bool>, + ) + .unwrap() + } + + #[must_use] + pub fn load_execute_and_commit_transactions_with_pre_commit_callback( + &self, + batch: &TransactionBatch, + max_age: usize, + collect_balances: bool, + recording_config: ExecutionRecordingConfig, + timings: &mut ExecuteTimings, + log_messages_bytes_limit: Option, + pre_commit_callback: Option bool>, + ) -> Option<(Vec, TransactionBalancesSet)> { let pre_balances = if collect_balances { self.collect_balances(batch) } else { @@ -4545,6 +4568,15 @@ impl Bank { }, ); + if let Some(pre_commit_callback) = pre_commit_callback { + if let Some(e) = processing_results.first() { + assert_eq!(processing_results.len(), 1); + if e.is_ok() && !pre_commit_callback() { + return None; + } + } + } + let commit_results = self.commit_transactions( batch.sanitized_transactions(), processing_results, @@ -4556,10 +4588,10 @@ impl Bank { } else { vec![] }; - ( + Some(( commit_results, TransactionBalancesSet::new(pre_balances, post_balances), - ) + )) } /// Process a Transaction. This is used for unit tests and simply calls the vector diff --git a/runtime/src/installed_scheduler_pool.rs b/runtime/src/installed_scheduler_pool.rs index fb6bb3dc79c732..69e806c5f48ce5 100644 --- a/runtime/src/installed_scheduler_pool.rs +++ b/runtime/src/installed_scheduler_pool.rs @@ -30,6 +30,7 @@ use { transaction::{Result, SanitizedTransaction, TransactionError}, }, solana_timings::ExecuteTimings, + solana_unified_scheduler_logic::SchedulingMode, std::{ fmt::{self, Debug}, mem, @@ -227,13 +228,29 @@ pub type SchedulerId = u64; /// `SchedulingContext`s. #[derive(Clone, Debug)] pub struct SchedulingContext { - // mode: SchedulingMode, // this will be added later. + mode: SchedulingMode, bank: Arc, } impl SchedulingContext { pub fn new(bank: Arc) -> Self { - Self { bank } + // mode will be configurable later + Self { + mode: SchedulingMode::BlockVerification, + bank, + } + } + + #[cfg(feature = "dev-context-only-utils")] + pub fn for_production(bank: Arc) -> Self { + Self { + mode: SchedulingMode::BlockProduction, + bank, + } + } + + pub fn mode(&self) -> SchedulingMode { + self.mode } pub fn bank(&self) -> &Arc { diff --git a/sdk/transaction-error/src/lib.rs b/sdk/transaction-error/src/lib.rs index 433a48b0122e31..db08f4fe6ed11f 100644 --- a/sdk/transaction-error/src/lib.rs +++ b/sdk/transaction-error/src/lib.rs @@ -137,6 +137,9 @@ pub enum TransactionError { /// Program cache hit max limit. ProgramCacheHitMaxLimit, + + /// Commit failed internally. + CommitFailed, } impl std::error::Error for TransactionError {} @@ -220,6 +223,8 @@ impl fmt::Display for TransactionError { => f.write_str("Sum of account balances before and after transaction do not match"), Self::ProgramCacheHitMaxLimit => f.write_str("Program cache hit max limit"), + Self::CommitFailed + => f.write_str("CommitFailed"), } } } diff --git a/storage-proto/proto/transaction_by_addr.proto b/storage-proto/proto/transaction_by_addr.proto index d0fa74a2104707..c4025dbafe8922 100644 --- a/storage-proto/proto/transaction_by_addr.proto +++ b/storage-proto/proto/transaction_by_addr.proto @@ -63,6 +63,7 @@ enum TransactionErrorType { PROGRAM_EXECUTION_TEMPORARILY_RESTRICTED = 35; UNBALANCED_TRANSACTION = 36; PROGRAM_CACHE_HIT_MAX_LIMIT = 37; + COMMIT_FAILED = 38; } message InstructionError { diff --git a/storage-proto/src/convert.rs b/storage-proto/src/convert.rs index 6a6e451b4858f1..55a54c3d06d54c 100644 --- a/storage-proto/src/convert.rs +++ b/storage-proto/src/convert.rs @@ -852,6 +852,7 @@ impl TryFrom for TransactionError { 34 => TransactionError::ResanitizationNeeded, 36 => TransactionError::UnbalancedTransaction, 37 => TransactionError::ProgramCacheHitMaxLimit, + 38 => TransactionError::CommitFailed, _ => return Err("Invalid TransactionError"), }) } @@ -973,6 +974,9 @@ impl From for tx_by_addr::TransactionError { TransactionError::ProgramCacheHitMaxLimit => { tx_by_addr::TransactionErrorType::ProgramCacheHitMaxLimit } + TransactionError::CommitFailed => { + tx_by_addr::TransactionErrorType::CommitFailed + } } as i32, instruction_error: match transaction_error { TransactionError::InstructionError(index, ref instruction_error) => { diff --git a/svm/examples/Cargo.lock b/svm/examples/Cargo.lock index 454f691e664720..9bb0a9fda3b9c7 100644 --- a/svm/examples/Cargo.lock +++ b/svm/examples/Cargo.lock @@ -6710,6 +6710,7 @@ dependencies = [ "solana-svm-transaction", "solana-timings", "solana-transaction-status-client-types", + "solana-unified-scheduler-logic", "solana-version", "solana-vote", "solana-vote-program", @@ -7667,6 +7668,7 @@ dependencies = [ "qualifier_attr", "scopeguard", "solana-ledger", + "solana-poh", "solana-pubkey", "solana-runtime", "solana-runtime-transaction", diff --git a/transaction-status/src/token_balances.rs b/transaction-status/src/token_balances.rs index 85a85a053f910f..36b46552cc687f 100644 --- a/transaction-status/src/token_balances.rs +++ b/transaction-status/src/token_balances.rs @@ -2,6 +2,7 @@ use crate::TransactionTokenBalance; pub type TransactionTokenBalances = Vec>; +#[derive(Debug)] pub struct TransactionTokenBalancesSet { pub pre_token_balances: TransactionTokenBalances, pub post_token_balances: TransactionTokenBalances, diff --git a/unified-scheduler-logic/src/lib.rs b/unified-scheduler-logic/src/lib.rs index 2e8caca3b85b8b..b28e4b0c7dc854 100644 --- a/unified-scheduler-logic/src/lib.rs +++ b/unified-scheduler-logic/src/lib.rs @@ -104,6 +104,12 @@ use { std::{collections::VecDeque, mem, sync::Arc}, }; +#[derive(Clone, Copy, Debug)] +pub enum SchedulingMode { + BlockVerification, + BlockProduction, +} + /// Internal utilities. Namely this contains [`ShortCounter`] and [`TokenCell`]. mod utils { use std::{ diff --git a/unified-scheduler-pool/Cargo.toml b/unified-scheduler-pool/Cargo.toml index 3b0a0df66d0ec1..842927373ae2e2 100644 --- a/unified-scheduler-pool/Cargo.toml +++ b/unified-scheduler-pool/Cargo.toml @@ -18,6 +18,7 @@ log = { workspace = true } qualifier_attr = { workspace = true } scopeguard = { workspace = true } solana-ledger = { workspace = true } +solana-poh = { workspace = true } solana-pubkey = { workspace = true } solana-runtime = { workspace = true } solana-runtime-transaction = { workspace = true } @@ -32,6 +33,7 @@ vec_extract_if_polyfill = { workspace = true } assert_matches = { workspace = true } lazy_static = { workspace = true } solana-clock = { workspace = true } +solana-entry = { workspace = true } solana-keypair = { workspace = true } solana-logger = { workspace = true } solana-runtime = { workspace = true, features = ["dev-context-only-utils"] } diff --git a/unified-scheduler-pool/src/lib.rs b/unified-scheduler-pool/src/lib.rs index c496ab455e1828..8f7aa48b56684c 100644 --- a/unified-scheduler-pool/src/lib.rs +++ b/unified-scheduler-pool/src/lib.rs @@ -20,6 +20,7 @@ use { solana_ledger::blockstore_processor::{ execute_batch, TransactionBatchWithIndexes, TransactionStatusSender, }, + solana_poh::poh_recorder::{RecordTransactionsSummary, TransactionRecorder}, solana_pubkey::Pubkey, solana_runtime::{ installed_scheduler_pool::{ @@ -35,7 +36,10 @@ use { solana_timings::ExecuteTimings, solana_transaction::sanitized::SanitizedTransaction, solana_transaction_error::{TransactionError, TransactionResult as Result}, - solana_unified_scheduler_logic::{SchedulingStateMachine, Task, UsageQueue}, + solana_unified_scheduler_logic::{ + SchedulingMode::{BlockProduction, BlockVerification}, + SchedulingStateMachine, Task, UsageQueue, + }, static_assertions::const_assert_eq, std::{ fmt::Debug, @@ -100,6 +104,7 @@ pub struct HandlerContext { transaction_status_sender: Option, replay_vote_sender: Option, prioritization_fee_cache: Arc, + transaction_recorder: Option, } pub type DefaultSchedulerPool = @@ -176,6 +181,8 @@ where transaction_status_sender, replay_vote_sender, prioritization_fee_cache, + // will be configurable later + transaction_recorder: None, }, weak_self: weak_self.clone(), next_scheduler_id: AtomicSchedulerId::default(), @@ -436,9 +443,38 @@ impl TaskHandler for DefaultTaskHandler { let index = task.task_index(); let batch = bank.prepare_unlocked_batch_from_single_tx(transaction); + let transaction_indexes = match scheduling_context.mode() { + BlockVerification => vec![index], + BlockProduction => { + let mut vec = vec![]; + if handler_context.transaction_status_sender.is_some() { + // Create empty vec with the exact needed capacity, which will be filled inside + // `execute_batch()` below. Otherwise, excess cap would be reserved on + // `.push(transaction_index)` in it. + vec.reserve_exact(1); + } + vec + } + }; let batch_with_indexes = TransactionBatchWithIndexes { batch, - transaction_indexes: vec![index], + transaction_indexes, + }; + + let pre_commit_callback = match scheduling_context.mode() { + BlockVerification => None, + BlockProduction => Some(|| { + let RecordTransactionsSummary { + result, + starting_transaction_index, + .. + } = handler_context + .transaction_recorder + .as_ref() + .unwrap() + .record_transactions(bank.slot(), vec![transaction.to_versioned_transaction()]); + result.ok().map(|()| starting_transaction_index) + }), }; *result = execute_batch( @@ -449,6 +485,7 @@ impl TaskHandler for DefaultTaskHandler { timings, handler_context.log_messages_bytes_limit, &handler_context.prioritization_fee_cache, + pre_commit_callback, ); sleepless_testing::at(CheckPoint::TaskHandled(index)); } @@ -1474,6 +1511,13 @@ mod tests { assert_matches::assert_matches, solana_clock::{Slot, MAX_PROCESSING_AGE}, solana_keypair::Keypair, + solana_ledger::{ + blockstore::Blockstore, + blockstore_processor::{TransactionStatusBatch, TransactionStatusMessage}, + create_new_tmp_ledger_auto_delete, + leader_schedule_cache::LeaderScheduleCache, + }, + solana_poh::poh_recorder::create_test_recorder_with_index_tracking, solana_pubkey::Pubkey, solana_runtime::{ bank::Bank, @@ -1487,7 +1531,7 @@ mod tests { solana_transaction::sanitized::SanitizedTransaction, solana_transaction_error::TransactionError, std::{ - sync::{Arc, RwLock}, + sync::{atomic::Ordering, Arc, RwLock}, thread::JoinHandle, }, }; @@ -2937,10 +2981,104 @@ mod tests { transaction_status_sender: None, replay_vote_sender: None, prioritization_fee_cache, + transaction_recorder: None, }; let task = SchedulingStateMachine::create_task(tx, 0, &mut |_| UsageQueue::default()); DefaultTaskHandler::handle(result, timings, scheduling_context, &task, handler_context); assert_matches!(result, Err(TransactionError::AccountLoadedTwice)); } + + fn do_test_task_handler_poh_recording(should_succeed: bool) { + solana_logger::setup(); + + let GenesisConfigInfo { + genesis_config, + ref mint_keypair, + .. + } = solana_ledger::genesis_utils::create_genesis_config(10_000); + let bank = Bank::new_for_tests(&genesis_config); + let bank_forks = BankForks::new_rw_arc(bank); + let bank = bank_forks.read().unwrap().working_bank_with_scheduler(); + + let tx = system_transaction::transfer( + mint_keypair, + &solana_sdk::pubkey::new_rand(), + 2, + genesis_config.hash(), + ); + let tx = RuntimeTransaction::from_transaction_for_tests(tx); + + let result = &mut Ok(()); + let timings = &mut ExecuteTimings::default(); + let prioritization_fee_cache = Arc::new(PrioritizationFeeCache::new(0u64)); + let scheduling_context = &SchedulingContext::for_production(bank.clone()); + let (sender, receiver) = crossbeam_channel::unbounded(); + let (ledger_path, _blockhash) = create_new_tmp_ledger_auto_delete!(&genesis_config); + let blockstore = Arc::new(Blockstore::open(ledger_path.path()).unwrap()); + let leader_schedule_cache = Arc::new(LeaderScheduleCache::new_from_bank(&bank)); + let (exit, poh_recorder, poh_service, signal_receiver) = + create_test_recorder_with_index_tracking( + bank.clone(), + blockstore.clone(), + None, + Some(leader_schedule_cache), + ); + let handler_context = &HandlerContext { + log_messages_bytes_limit: None, + transaction_status_sender: Some(TransactionStatusSender { sender }), + replay_vote_sender: None, + prioritization_fee_cache, + transaction_recorder: Some(poh_recorder.read().unwrap().new_recorder()), + }; + + let task = + SchedulingStateMachine::create_task(tx.clone(), 0, &mut |_| UsageQueue::default()); + + // wait until the poh's working bank is cleared. + // also flush signal_receiver after that. + if !should_succeed { + while poh_recorder.read().unwrap().bank().is_some() { + sleep(Duration::from_millis(100)); + } + while signal_receiver.try_recv().is_ok() {} + } + + assert_eq!(bank.transaction_count(), 0); + DefaultTaskHandler::handle(result, timings, scheduling_context, &task, handler_context); + + if should_succeed { + assert_matches!(result, Ok(())); + assert_eq!(bank.transaction_count(), 1); + assert_matches!( + receiver.try_recv(), + Ok(TransactionStatusMessage::Batch( + TransactionStatusBatch { .. } + )) + ); + assert_matches!( + signal_receiver.try_recv(), + Ok((_, (solana_entry::entry::Entry {transactions, ..} , _))) + if transactions == vec![tx.to_versioned_transaction()] + ); + } else { + assert_matches!(result, Err(TransactionError::CommitFailed)); + assert_eq!(bank.transaction_count(), 0); + assert_matches!(receiver.try_recv(), Err(_)); + assert_matches!(signal_receiver.try_recv(), Err(_)); + } + + exit.store(true, Ordering::Relaxed); + poh_service.join().unwrap(); + } + + #[test] + fn test_task_handler_poh_recording_success() { + do_test_task_handler_poh_recording(true); + } + + #[test] + fn test_task_handler_poh_recording_failure() { + do_test_task_handler_poh_recording(false); + } } From e6a1e1a44497b131631507f3b5db95c78dd8acc4 Mon Sep 17 00:00:00 2001 From: Ryo Onodera Date: Wed, 18 Dec 2024 22:53:47 +0900 Subject: [PATCH 02/12] Make transaction_indexes allocation conditional --- ledger/src/blockstore_processor.rs | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/ledger/src/blockstore_processor.rs b/ledger/src/blockstore_processor.rs index a3dd10e7a69cae..41cc157fa5651f 100644 --- a/ledger/src/blockstore_processor.rs +++ b/ledger/src/blockstore_processor.rs @@ -61,6 +61,7 @@ use { solana_transaction_status::token_balances::TransactionTokenBalancesSet, solana_vote::vote_account::VoteAccountsHashMap, std::{ + borrow::Cow, collections::{HashMap, HashSet}, num::Saturating, ops::{Index, Range}, @@ -162,7 +163,7 @@ pub fn execute_batch( transaction_indexes, } = batch; let record_token_balances = transaction_status_sender.is_some(); - let mut transaction_indexes = transaction_indexes.to_vec(); + let mut transaction_indexes = Cow::from(transaction_indexes); let mut mint_decimals: HashMap = HashMap::new(); @@ -179,7 +180,7 @@ pub fn execute_batch( .inspect(|&maybe_index| { if let Some(index) = maybe_index { assert!(transaction_indexes.is_empty()); - transaction_indexes.push(index); + transaction_indexes.to_mut().push(index); } }) .is_some() @@ -251,7 +252,7 @@ pub fn execute_batch( commit_results, balances, token_balances, - transaction_indexes, + transaction_indexes.into_owned(), ); } From 83d12522afdb8977366675e14bf05a4fae95ac8d Mon Sep 17 00:00:00 2001 From: Ryo Onodera Date: Wed, 18 Dec 2024 23:02:16 +0900 Subject: [PATCH 03/12] Replace Option> with saner type --- ledger/benches/blockstore_processor.rs | 4 ++-- ledger/src/blockstore_processor.rs | 26 ++++++++++++++------------ runtime/src/bank.rs | 17 ++++++++++------- unified-scheduler-pool/src/lib.rs | 6 +++++- 4 files changed, 31 insertions(+), 22 deletions(-) diff --git a/ledger/benches/blockstore_processor.rs b/ledger/benches/blockstore_processor.rs index 711c5381b63b8c..afa293727720e4 100644 --- a/ledger/benches/blockstore_processor.rs +++ b/ledger/benches/blockstore_processor.rs @@ -12,7 +12,7 @@ use { genesis_utils::{create_genesis_config, GenesisConfigInfo}, }, solana_runtime::{ - bank::Bank, + bank::{Bank, PreCommitCallbackResult}, bank_forks::BankForks, prioritization_fee_cache::PrioritizationFeeCache, transaction_batch::{OwnedOrBorrowed, TransactionBatch}, @@ -162,7 +162,7 @@ fn bench_execute_batch( &mut timing, None, &prioritization_fee_cache, - None:: Option>>, + None:: PreCommitCallbackResult>>, ); } }); diff --git a/ledger/src/blockstore_processor.rs b/ledger/src/blockstore_processor.rs index 41cc157fa5651f..41dd0d083b41a1 100644 --- a/ledger/src/blockstore_processor.rs +++ b/ledger/src/blockstore_processor.rs @@ -28,7 +28,7 @@ use { solana_rayon_threadlimit::{get_max_thread_count, get_thread_count}, solana_runtime::{ accounts_background_service::{AbsRequestSender, SnapshotRequestKind}, - bank::{Bank, TransactionBalancesSet}, + bank::{Bank, PreCommitCallbackResult, TransactionBalancesSet}, bank_forks::{BankForks, SetRootError}, bank_utils, commitment::VOTE_THRESHOLD_SIZE, @@ -156,7 +156,7 @@ pub fn execute_batch( timings: &mut ExecuteTimings, log_messages_bytes_limit: Option, prioritization_fee_cache: &PrioritizationFeeCache, - pre_commit_callback: Option Option>>, + pre_commit_callback: Option PreCommitCallbackResult>>, ) -> Result<()> { let TransactionBatchWithIndexes { batch, @@ -183,11 +183,11 @@ pub fn execute_batch( transaction_indexes.to_mut().push(index); } }) - .is_some() + .map(|_| ()) } }); - let Some((commit_results, balances)) = batch + let Ok((commit_results, balances)) = batch .bank() .load_execute_and_commit_transactions_with_pre_commit_callback( batch, @@ -350,7 +350,7 @@ fn execute_batches_internal( &mut timings, log_messages_bytes_limit, prioritization_fee_cache, - None:: Option>>, + None:: PreCommitCallbackResult>>, )); let thread_index = replay_tx_thread_pool.current_thread_index().unwrap(); @@ -2351,7 +2351,7 @@ pub mod tests { solana_entry::entry::{create_ticks, next_entry, next_entry_mut}, solana_program_runtime::declare_process_instruction, solana_runtime::{ - bank::bank_hash_details::SlotDetails, + bank::{bank_hash_details::SlotDetails, PreCommitCallbackFailed}, genesis_utils::{ self, create_genesis_config_with_vote_accounts, ValidatorVoteKeypairs, }, @@ -5072,7 +5072,9 @@ pub mod tests { do_test_schedule_batches_for_execution(false); } - fn do_test_execute_batch_pre_commit_callback(poh_result: Option>) { + fn do_test_execute_batch_pre_commit_callback( + poh_result: PreCommitCallbackResult>, + ) { solana_logger::setup(); let dummy_leader_pubkey = solana_sdk::pubkey::new_rand(); let GenesisConfigInfo { @@ -5090,7 +5092,7 @@ pub mod tests { OwnedOrBorrowed::Borrowed(&txs[0..1]), ); batch.set_needs_unlock(false); - let poh_with_index = matches!(poh_result, Some(Some(_))); + let poh_with_index = matches!(poh_result, Ok(Some(_))); let transaction_indexes = if poh_with_index { vec![] } else { vec![3] }; let batch = TransactionBatchWithIndexes { batch, @@ -5111,7 +5113,7 @@ pub mod tests { &prioritization_fee_cache, Some(|| poh_result), ); - let should_succeed = poh_result.is_some(); + let should_succeed = poh_result.is_ok(); if should_succeed { assert_matches!(result, Ok(())); assert_eq!(bank.transaction_count(), 1); @@ -5138,17 +5140,17 @@ pub mod tests { #[test] fn test_execute_batch_pre_commit_callback_success() { - do_test_execute_batch_pre_commit_callback(Some(None)); + do_test_execute_batch_pre_commit_callback(Ok(None)); } #[test] fn test_execute_batch_pre_commit_callback_success_with_index() { - do_test_execute_batch_pre_commit_callback(Some(Some(4))); + do_test_execute_batch_pre_commit_callback(Ok(Some(4))); } #[test] fn test_execute_batch_pre_commit_callback_failure() { - do_test_execute_batch_pre_commit_callback(None); + do_test_execute_batch_pre_commit_callback(Err(PreCommitCallbackFailed)); } #[test] diff --git a/runtime/src/bank.rs b/runtime/src/bank.rs index dbeb7a68a9ebb6..e8b5f39fd27b7c 100644 --- a/runtime/src/bank.rs +++ b/runtime/src/bank.rs @@ -360,6 +360,10 @@ impl TransactionBalancesSet { } pub type TransactionBalances = Vec>; +#[derive(Clone, Copy, Debug)] +pub struct PreCommitCallbackFailed; +pub type PreCommitCallbackResult = std::result::Result; + #[derive(Serialize, Deserialize, Debug, PartialEq, Eq)] pub enum TransactionLogCollectorFilter { All, @@ -4527,12 +4531,11 @@ impl Bank { recording_config, timings, log_messages_bytes_limit, - None:: bool>, + None:: PreCommitCallbackResult<()>>, ) .unwrap() } - #[must_use] pub fn load_execute_and_commit_transactions_with_pre_commit_callback( &self, batch: &TransactionBatch, @@ -4541,8 +4544,8 @@ impl Bank { recording_config: ExecutionRecordingConfig, timings: &mut ExecuteTimings, log_messages_bytes_limit: Option, - pre_commit_callback: Option bool>, - ) -> Option<(Vec, TransactionBalancesSet)> { + pre_commit_callback: Option PreCommitCallbackResult<()>>, + ) -> PreCommitCallbackResult<(Vec, TransactionBalancesSet)> { let pre_balances = if collect_balances { self.collect_balances(batch) } else { @@ -4571,8 +4574,8 @@ impl Bank { if let Some(pre_commit_callback) = pre_commit_callback { if let Some(e) = processing_results.first() { assert_eq!(processing_results.len(), 1); - if e.is_ok() && !pre_commit_callback() { - return None; + if e.is_ok() && pre_commit_callback().is_err() { + return Err(PreCommitCallbackFailed); } } } @@ -4588,7 +4591,7 @@ impl Bank { } else { vec![] }; - Some(( + Ok(( commit_results, TransactionBalancesSet::new(pre_balances, post_balances), )) diff --git a/unified-scheduler-pool/src/lib.rs b/unified-scheduler-pool/src/lib.rs index 8f7aa48b56684c..dcb42d4de703ca 100644 --- a/unified-scheduler-pool/src/lib.rs +++ b/unified-scheduler-pool/src/lib.rs @@ -23,6 +23,7 @@ use { solana_poh::poh_recorder::{RecordTransactionsSummary, TransactionRecorder}, solana_pubkey::Pubkey, solana_runtime::{ + bank::PreCommitCallbackFailed, installed_scheduler_pool::{ initialized_result_with_timings, InstalledScheduler, InstalledSchedulerBox, InstalledSchedulerPool, InstalledSchedulerPoolArc, ResultWithTimings, ScheduleResult, @@ -473,7 +474,10 @@ impl TaskHandler for DefaultTaskHandler { .as_ref() .unwrap() .record_transactions(bank.slot(), vec![transaction.to_versioned_transaction()]); - result.ok().map(|()| starting_transaction_index) + match result { + Ok(()) => Ok(starting_transaction_index), + Err(_) => Err(PreCommitCallbackFailed), + } }), }; From 45a7e71a6d870e9639b4e4abeef57b0e8b968398 Mon Sep 17 00:00:00 2001 From: Ryo Onodera Date: Thu, 19 Dec 2024 00:20:46 +0900 Subject: [PATCH 04/12] Explain the odd transaction_status_sender.is_some() --- unified-scheduler-pool/src/lib.rs | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/unified-scheduler-pool/src/lib.rs b/unified-scheduler-pool/src/lib.rs index dcb42d4de703ca..48bbf1e74a0a51 100644 --- a/unified-scheduler-pool/src/lib.rs +++ b/unified-scheduler-pool/src/lib.rs @@ -448,10 +448,17 @@ impl TaskHandler for DefaultTaskHandler { BlockVerification => vec![index], BlockProduction => { let mut vec = vec![]; + // transaction_status_sender is usually None for staked nodes because it's only + // used for RPC-related additional data recording. However, a staked node could + // also be running with rpc functionalities during development. So, we need to + // correctly support the use case for produced blocks as well, like verified blocks + // via the replaying stage. + // Refer `record_token_balances` in `execute_batch()` as this conditional treatment + // is mirrored from it. if handler_context.transaction_status_sender.is_some() { - // Create empty vec with the exact needed capacity, which will be filled inside - // `execute_batch()` below. Otherwise, excess cap would be reserved on - // `.push(transaction_index)` in it. + // Adjust the empty new vec with the exact needed capacity, which will be + // filled inside `execute_batch()` below. Otherwise, excess cap would be + // reserved on `.push(transaction_index)` in it. vec.reserve_exact(1); } vec From 004ce1e78b3b4b6f2a6e9e4cd24f5f88e78ebae6 Mon Sep 17 00:00:00 2001 From: Ryo Onodera Date: Thu, 19 Dec 2024 00:36:44 +0900 Subject: [PATCH 05/12] Explain about Option-ed closures --- ledger/src/blockstore_processor.rs | 2 ++ runtime/src/bank.rs | 3 +++ 2 files changed, 5 insertions(+) diff --git a/ledger/src/blockstore_processor.rs b/ledger/src/blockstore_processor.rs index 41dd0d083b41a1..4221e22ffae300 100644 --- a/ledger/src/blockstore_processor.rs +++ b/ledger/src/blockstore_processor.rs @@ -156,6 +156,8 @@ pub fn execute_batch( timings: &mut ExecuteTimings, log_messages_bytes_limit: Option, prioritization_fee_cache: &PrioritizationFeeCache, + // None is meaninglyfully used to detect this is called from the block producing unified + // schduler. If so, supress too verbose logging for the code path. pre_commit_callback: Option PreCommitCallbackResult>>, ) -> Result<()> { let TransactionBatchWithIndexes { diff --git a/runtime/src/bank.rs b/runtime/src/bank.rs index e8b5f39fd27b7c..9f228fe7464399 100644 --- a/runtime/src/bank.rs +++ b/runtime/src/bank.rs @@ -4544,6 +4544,9 @@ impl Bank { recording_config: ExecutionRecordingConfig, timings: &mut ExecuteTimings, log_messages_bytes_limit: Option, + // None is meaninglyfully used to skip the block producing unified schduler special case. + // This avoids wasted cycles due to `if` evaluations in the special case and makes it well + // assert!()-ed. pre_commit_callback: Option PreCommitCallbackResult<()>>, ) -> PreCommitCallbackResult<(Vec, TransactionBalancesSet)> { let pre_balances = if collect_balances { From 44e914bb3b0bb7c978d081ac776946c0a5e0de1c Mon Sep 17 00:00:00 2001 From: Ryo Onodera Date: Thu, 19 Dec 2024 00:41:23 +0900 Subject: [PATCH 06/12] Rename CommitFailed => CommitCancelled --- ledger/src/blockstore_processor.rs | 4 ++-- runtime/src/bank.rs | 2 +- sdk/transaction-error/src/lib.rs | 8 ++++---- storage-proto/proto/transaction_by_addr.proto | 2 +- storage-proto/src/convert.rs | 6 +++--- unified-scheduler-pool/src/lib.rs | 2 +- 6 files changed, 12 insertions(+), 12 deletions(-) diff --git a/ledger/src/blockstore_processor.rs b/ledger/src/blockstore_processor.rs index 4221e22ffae300..2505654ced68d3 100644 --- a/ledger/src/blockstore_processor.rs +++ b/ledger/src/blockstore_processor.rs @@ -201,7 +201,7 @@ pub fn execute_batch( pre_commit_callback, ) else { - return Err(TransactionError::CommitFailed); + return Err(TransactionError::CommitCancelled); }; bank_utils::find_and_send_votes( @@ -5120,7 +5120,7 @@ pub mod tests { assert_matches!(result, Ok(())); assert_eq!(bank.transaction_count(), 1); } else { - assert_matches!(result, Err(TransactionError::CommitFailed)); + assert_matches!(result, Err(TransactionError::CommitCancelled)); assert_eq!(bank.transaction_count(), 0); } if poh_with_index { diff --git a/runtime/src/bank.rs b/runtime/src/bank.rs index 9f228fe7464399..30acf77fddf756 100644 --- a/runtime/src/bank.rs +++ b/runtime/src/bank.rs @@ -249,7 +249,7 @@ struct RentMetrics { pub type BankStatusCache = StatusCache>; #[cfg_attr( feature = "frozen-abi", - frozen_abi(digest = "Fj6ATu6Rr5ossAykzbRSkCsuUzjdAZbYo5JaqfR1A72G") + frozen_abi(digest = "4e7a7AAsQrM5Lp5bhREdVZ5QGZfyETbBthhWjYMYb6zS") )] pub type BankSlotDelta = SlotDelta>; diff --git a/sdk/transaction-error/src/lib.rs b/sdk/transaction-error/src/lib.rs index db08f4fe6ed11f..2f5b72b960833f 100644 --- a/sdk/transaction-error/src/lib.rs +++ b/sdk/transaction-error/src/lib.rs @@ -138,8 +138,8 @@ pub enum TransactionError { /// Program cache hit max limit. ProgramCacheHitMaxLimit, - /// Commit failed internally. - CommitFailed, + /// Commit cancelled internally. + CommitCancelled, } impl std::error::Error for TransactionError {} @@ -223,8 +223,8 @@ impl fmt::Display for TransactionError { => f.write_str("Sum of account balances before and after transaction do not match"), Self::ProgramCacheHitMaxLimit => f.write_str("Program cache hit max limit"), - Self::CommitFailed - => f.write_str("CommitFailed"), + Self::CommitCancelled + => f.write_str("CommitCancelled"), } } } diff --git a/storage-proto/proto/transaction_by_addr.proto b/storage-proto/proto/transaction_by_addr.proto index c4025dbafe8922..5748b05655edba 100644 --- a/storage-proto/proto/transaction_by_addr.proto +++ b/storage-proto/proto/transaction_by_addr.proto @@ -63,7 +63,7 @@ enum TransactionErrorType { PROGRAM_EXECUTION_TEMPORARILY_RESTRICTED = 35; UNBALANCED_TRANSACTION = 36; PROGRAM_CACHE_HIT_MAX_LIMIT = 37; - COMMIT_FAILED = 38; + COMMIT_CANCELLED = 38; } message InstructionError { diff --git a/storage-proto/src/convert.rs b/storage-proto/src/convert.rs index 55a54c3d06d54c..0ed7bf7e96f7ab 100644 --- a/storage-proto/src/convert.rs +++ b/storage-proto/src/convert.rs @@ -852,7 +852,7 @@ impl TryFrom for TransactionError { 34 => TransactionError::ResanitizationNeeded, 36 => TransactionError::UnbalancedTransaction, 37 => TransactionError::ProgramCacheHitMaxLimit, - 38 => TransactionError::CommitFailed, + 38 => TransactionError::CommitCancelled, _ => return Err("Invalid TransactionError"), }) } @@ -974,8 +974,8 @@ impl From for tx_by_addr::TransactionError { TransactionError::ProgramCacheHitMaxLimit => { tx_by_addr::TransactionErrorType::ProgramCacheHitMaxLimit } - TransactionError::CommitFailed => { - tx_by_addr::TransactionErrorType::CommitFailed + TransactionError::CommitCancelled => { + tx_by_addr::TransactionErrorType::CommitCancelled } } as i32, instruction_error: match transaction_error { diff --git a/unified-scheduler-pool/src/lib.rs b/unified-scheduler-pool/src/lib.rs index 48bbf1e74a0a51..67050212b78943 100644 --- a/unified-scheduler-pool/src/lib.rs +++ b/unified-scheduler-pool/src/lib.rs @@ -3073,7 +3073,7 @@ mod tests { if transactions == vec![tx.to_versioned_transaction()] ); } else { - assert_matches!(result, Err(TransactionError::CommitFailed)); + assert_matches!(result, Err(TransactionError::CommitCancelled)); assert_eq!(bank.transaction_count(), 0); assert_matches!(receiver.try_recv(), Err(_)); assert_matches!(signal_receiver.try_recv(), Err(_)); From 827c034d9cc3428421a92bf770293c3250c356e1 Mon Sep 17 00:00:00 2001 From: Ryo Onodera Date: Thu, 19 Dec 2024 13:06:28 +0900 Subject: [PATCH 07/12] Fix typos --- ledger/src/blockstore_processor.rs | 4 ++-- runtime/src/bank.rs | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/ledger/src/blockstore_processor.rs b/ledger/src/blockstore_processor.rs index 2505654ced68d3..597917a9df0f8d 100644 --- a/ledger/src/blockstore_processor.rs +++ b/ledger/src/blockstore_processor.rs @@ -156,8 +156,8 @@ pub fn execute_batch( timings: &mut ExecuteTimings, log_messages_bytes_limit: Option, prioritization_fee_cache: &PrioritizationFeeCache, - // None is meaninglyfully used to detect this is called from the block producing unified - // schduler. If so, supress too verbose logging for the code path. + // None is meaningfully used to detect this is called from the block producing unified + // scheduler. If so, suppress too verbose logging for the code path. pre_commit_callback: Option PreCommitCallbackResult>>, ) -> Result<()> { let TransactionBatchWithIndexes { diff --git a/runtime/src/bank.rs b/runtime/src/bank.rs index 30acf77fddf756..fd6de4e8dbd770 100644 --- a/runtime/src/bank.rs +++ b/runtime/src/bank.rs @@ -4544,7 +4544,7 @@ impl Bank { recording_config: ExecutionRecordingConfig, timings: &mut ExecuteTimings, log_messages_bytes_limit: Option, - // None is meaninglyfully used to skip the block producing unified schduler special case. + // None is meaningfully used to skip the block producing unified scheduler special case. // This avoids wasted cycles due to `if` evaluations in the special case and makes it well // assert!()-ed. pre_commit_callback: Option PreCommitCallbackResult<()>>, From 85699360abe54f443685654f0bb9868a78cb4d7a Mon Sep 17 00:00:00 2001 From: Ryo Onodera Date: Thu, 19 Dec 2024 12:28:55 +0900 Subject: [PATCH 08/12] Document and simplify pre_commit_callback handling --- runtime/src/bank.rs | 23 +++++++++++++++-------- 1 file changed, 15 insertions(+), 8 deletions(-) diff --git a/runtime/src/bank.rs b/runtime/src/bank.rs index fd6de4e8dbd770..1e197d2cd5d428 100644 --- a/runtime/src/bank.rs +++ b/runtime/src/bank.rs @@ -4545,8 +4545,7 @@ impl Bank { timings: &mut ExecuteTimings, log_messages_bytes_limit: Option, // None is meaningfully used to skip the block producing unified scheduler special case. - // This avoids wasted cycles due to `if` evaluations in the special case and makes it well - // assert!()-ed. + // This makes the special case assert!()-ed. pre_commit_callback: Option PreCommitCallbackResult<()>>, ) -> PreCommitCallbackResult<(Vec, TransactionBalancesSet)> { let pre_balances = if collect_balances { @@ -4575,12 +4574,20 @@ impl Bank { ); if let Some(pre_commit_callback) = pre_commit_callback { - if let Some(e) = processing_results.first() { - assert_eq!(processing_results.len(), 1); - if e.is_ok() && pre_commit_callback().is_err() { - return Err(PreCommitCallbackFailed); - } - } + // We're entering into the block-producing unified scheduler special case... + // `processing_results` should always contain exactly only 1 result in that case. + assert_eq!(processing_results.len(), 1); + + // Make `pre_commit_callback()` unconditionally take precedence over + // `processing_results[0].was_processed()`, potentially shadowing processing errors. + // That's desired because pre commit failure signalling is actually used to propagate + // poh recording failures, which is time-sensitive by nature to winds down the unified + // scheduler's active scheduling session as soon as possible upon the blockage by poh + // recorder. + // Also, bail out here rather than overwriting `processing_results[0]`. Reconciling + // various state is rather error-prone at this point. For example, processed_counts + // would need to be correctly updated... + pre_commit_callback()? } let commit_results = self.commit_transactions( From 4d3f89dfcebe3430567dc17281586d396724fee8 Mon Sep 17 00:00:00 2001 From: Ryo Onodera Date: Thu, 19 Dec 2024 13:07:06 +0900 Subject: [PATCH 09/12] Clean up pre_commit_callback wrapping code --- ledger/src/blockstore_processor.rs | 19 ++++++++----------- unified-scheduler-pool/src/lib.rs | 2 +- 2 files changed, 9 insertions(+), 12 deletions(-) diff --git a/ledger/src/blockstore_processor.rs b/ledger/src/blockstore_processor.rs index 597917a9df0f8d..c7b966253b1e0d 100644 --- a/ledger/src/blockstore_processor.rs +++ b/ledger/src/blockstore_processor.rs @@ -178,14 +178,12 @@ pub fn execute_batch( let is_block_producing_unified_scheduler = pre_commit_callback.is_some(); let pre_commit_callback = pre_commit_callback.map(|wrapped_callback| { || { - wrapped_callback() - .inspect(|&maybe_index| { - if let Some(index) = maybe_index { - assert!(transaction_indexes.is_empty()); - transaction_indexes.to_mut().push(index); - } - }) - .map(|_| ()) + wrapped_callback().map(|maybe_index| { + assert!(transaction_indexes.is_empty()); + transaction_indexes.to_mut().extend(maybe_index); + // Strip the index away by implicitly returning (), now that we're done with it + // here (= `solana-ledger`), to make `solana-runtime` not bothered with it. + }) } }); @@ -5095,10 +5093,9 @@ pub mod tests { ); batch.set_needs_unlock(false); let poh_with_index = matches!(poh_result, Ok(Some(_))); - let transaction_indexes = if poh_with_index { vec![] } else { vec![3] }; let batch = TransactionBatchWithIndexes { batch, - transaction_indexes, + transaction_indexes: vec![], }; let prioritization_fee_cache = PrioritizationFeeCache::default(); let mut timing = ExecuteTimings::default(); @@ -5133,7 +5130,7 @@ pub mod tests { assert_matches!( receiver.try_recv(), Ok(TransactionStatusMessage::Batch(TransactionStatusBatch{transaction_indexes, ..})) - if transaction_indexes == vec![3_usize] + if transaction_indexes.is_empty() ); } else { assert_matches!(receiver.try_recv(), Err(_)); diff --git a/unified-scheduler-pool/src/lib.rs b/unified-scheduler-pool/src/lib.rs index 67050212b78943..9e9383c227b934 100644 --- a/unified-scheduler-pool/src/lib.rs +++ b/unified-scheduler-pool/src/lib.rs @@ -458,7 +458,7 @@ impl TaskHandler for DefaultTaskHandler { if handler_context.transaction_status_sender.is_some() { // Adjust the empty new vec with the exact needed capacity, which will be // filled inside `execute_batch()` below. Otherwise, excess cap would be - // reserved on `.push(transaction_index)` in it. + // reserved on `.extend()` in it. vec.reserve_exact(1); } vec From 94a1ba0f1ddd0fc6bc530ec7369e11d3f46af2b0 Mon Sep 17 00:00:00 2001 From: Ryo Onodera Date: Fri, 3 Jan 2025 23:09:22 +0900 Subject: [PATCH 10/12] Extend pre_commit_callback for existing bailouts --- Cargo.lock | 3 + ledger/benches/blockstore_processor.rs | 4 +- ledger/src/blockstore_processor.rs | 332 ++++++++++++++--------- runtime/src/bank.rs | 142 +++++----- svm/src/transaction_processing_result.rs | 13 + unified-scheduler-pool/Cargo.toml | 2 + unified-scheduler-pool/src/lib.rs | 119 +++++--- 7 files changed, 385 insertions(+), 230 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index aac73b4588039a..dd7d86effd6a4f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -10020,6 +10020,8 @@ dependencies = [ "qualifier_attr", "scopeguard", "solana-clock", + "solana-entry", + "solana-hash", "solana-keypair", "solana-ledger", "solana-logger", @@ -10033,6 +10035,7 @@ dependencies = [ "solana-transaction-error", "solana-unified-scheduler-logic", "static_assertions", + "test-case", "vec_extract_if_polyfill", ] diff --git a/ledger/benches/blockstore_processor.rs b/ledger/benches/blockstore_processor.rs index afa293727720e4..c4d39cb1813b73 100644 --- a/ledger/benches/blockstore_processor.rs +++ b/ledger/benches/blockstore_processor.rs @@ -12,7 +12,7 @@ use { genesis_utils::{create_genesis_config, GenesisConfigInfo}, }, solana_runtime::{ - bank::{Bank, PreCommitCallbackResult}, + bank::Bank, bank_forks::BankForks, prioritization_fee_cache::PrioritizationFeeCache, transaction_batch::{OwnedOrBorrowed, TransactionBatch}, @@ -162,7 +162,7 @@ fn bench_execute_batch( &mut timing, None, &prioritization_fee_cache, - None:: PreCommitCallbackResult>>, + None:: _>, ); } }); diff --git a/ledger/src/blockstore_processor.rs b/ledger/src/blockstore_processor.rs index c7b966253b1e0d..9afefadb4b98a7 100644 --- a/ledger/src/blockstore_processor.rs +++ b/ledger/src/blockstore_processor.rs @@ -28,7 +28,7 @@ use { solana_rayon_threadlimit::{get_max_thread_count, get_thread_count}, solana_runtime::{ accounts_background_service::{AbsRequestSender, SnapshotRequestKind}, - bank::{Bank, PreCommitCallbackResult, TransactionBalancesSet}, + bank::{Bank, TransactionBalancesSet}, bank_forks::{BankForks, SetRootError}, bank_utils, commitment::VOTE_THRESHOLD_SIZE, @@ -54,6 +54,7 @@ use { }, solana_svm::{ transaction_commit_result::{TransactionCommitResult, TransactionCommitResultExtensions}, + transaction_processing_result::{ProcessedTransaction, TransactionProcessingResult}, transaction_processor::ExecutionRecordingConfig, }, solana_svm_transaction::{svm_message::SVMMessage, svm_transaction::SVMTransaction}, @@ -108,38 +109,42 @@ fn first_err(results: &[Result<()>]) -> Result<()> { } // Includes transaction signature for unit-testing -fn get_first_error( - batch: &TransactionBatch, - commit_results: &[TransactionCommitResult], - is_block_producing_unified_scheduler: bool, +fn do_get_first_error( + batch: &TransactionBatch, + results: &[Result], ) -> Option<(Result<()>, Signature)> { let mut first_err = None; - for (commit_result, transaction) in commit_results.iter().zip(batch.sanitized_transactions()) { - if let Err(err) = commit_result { + for (result, transaction) in results.iter().zip(batch.sanitized_transactions()) { + if let Err(err) = result { if first_err.is_none() { first_err = Some((Err(err.clone()), *transaction.signature())); } - // Skip with block producing unified scheduler because it's quite common to observe - // transaction errors... - if !is_block_producing_unified_scheduler { - warn!( - "Unexpected validator error: {:?}, transaction: {:?}", - err, transaction - ); - datapoint_error!( - "validator_process_entry_error", - ( - "error", - format!("error: {err:?}, transaction: {transaction:?}"), - String - ) - ); - } + warn!( + "Unexpected validator error: {:?}, transaction: {:?}", + err, transaction + ); + datapoint_error!( + "validator_process_entry_error", + ( + "error", + format!("error: {err:?}, transaction: {transaction:?}"), + String + ) + ); } } first_err } +fn get_first_error( + batch: &TransactionBatch, + commit_results: &[Result], +) -> Result<()> { + do_get_first_error(batch, commit_results) + .map(|(error, _signature)| error) + .unwrap_or(Ok(())) +} + fn create_thread_pool(num_threads: usize) -> ThreadPool { rayon::ThreadPoolBuilder::new() .num_threads(num_threads) @@ -158,7 +163,9 @@ pub fn execute_batch( prioritization_fee_cache: &PrioritizationFeeCache, // None is meaningfully used to detect this is called from the block producing unified // scheduler. If so, suppress too verbose logging for the code path. - pre_commit_callback: Option PreCommitCallbackResult>>, + extra_pre_commit_callback: Option< + impl FnOnce(&Result) -> Result>, + >, ) -> Result<()> { let TransactionBatchWithIndexes { batch, @@ -175,19 +182,26 @@ pub fn execute_batch( vec![] }; - let is_block_producing_unified_scheduler = pre_commit_callback.is_some(); - let pre_commit_callback = pre_commit_callback.map(|wrapped_callback| { - || { - wrapped_callback().map(|maybe_index| { + let pre_commit_callback = |timings: &mut _, processing_results: &_| { + match extra_pre_commit_callback { + None => { + get_first_error(batch, processing_results)?; + check_block_cost_limits_if_enabled(batch, bank, timings, processing_results)?; + } + Some(extra_pre_commit_callback) => { + // We're entering into the block-producing unified scheduler special case... + // `processing_results` should always contain exactly only 1 result in that case. + assert_eq!(processing_results.len(), 1); assert!(transaction_indexes.is_empty()); + + let maybe_index = extra_pre_commit_callback(&processing_results[0])?; transaction_indexes.to_mut().extend(maybe_index); - // Strip the index away by implicitly returning (), now that we're done with it - // here (= `solana-ledger`), to make `solana-runtime` not bothered with it. - }) + } } - }); + Ok(()) + }; - let Ok((commit_results, balances)) = batch + let (commit_results, balances) = batch .bank() .load_execute_and_commit_transactions_with_pre_commit_callback( batch, @@ -197,10 +211,7 @@ pub fn execute_batch( timings, log_messages_bytes_limit, pre_commit_callback, - ) - else { - return Err(TransactionError::CommitCancelled); - }; + )?; bank_utils::find_and_send_votes( batch.sanitized_transactions(), @@ -208,29 +219,12 @@ pub fn execute_batch( replay_vote_sender, ); - let (check_block_cost_limits_result, check_block_cost_limits_us) = measure_us!(if bank - .feature_set - .is_active(&solana_feature_set::apply_cost_tracker_during_replay::id()) - { - check_block_cost_limits(bank, &commit_results, batch.sanitized_transactions()) - } else { - Ok(()) - }); - - timings.saturating_add_in_place( - ExecuteTimingType::CheckBlockLimitsUs, - check_block_cost_limits_us, - ); - check_block_cost_limits_result?; - let committed_transactions = commit_results .iter() .zip(batch.sanitized_transactions()) .filter_map(|(commit_result, tx)| commit_result.was_committed().then_some(tx)) .collect_vec(); - let first_err = get_first_error(batch, &commit_results, is_block_producing_unified_scheduler); - if let Some(transaction_status_sender) = transaction_status_sender { let transactions: Vec = batch .sanitized_transactions() @@ -258,7 +252,7 @@ pub fn execute_batch( prioritization_fee_cache.update(bank, committed_transactions.into_iter()); - first_err.map(|(result, _)| result).unwrap_or(Ok(())) + Ok(()) } // collect transactions actual execution costs, subject to block limits; @@ -266,20 +260,20 @@ pub fn execute_batch( // reported to metric `replay-stage-mark_dead_slot` fn check_block_cost_limits( bank: &Bank, - commit_results: &[TransactionCommitResult], + processing_results: &[TransactionProcessingResult], sanitized_transactions: &[impl TransactionWithMeta], ) -> Result<()> { - assert_eq!(sanitized_transactions.len(), commit_results.len()); + assert_eq!(sanitized_transactions.len(), processing_results.len()); - let tx_costs_with_actual_execution_units: Vec<_> = commit_results + let tx_costs_with_actual_execution_units: Vec<_> = processing_results .iter() .zip(sanitized_transactions) - .filter_map(|(commit_result, tx)| { - if let Ok(committed_tx) = commit_result { + .filter_map(|(processing_result, tx)| { + if let Ok(processed_tx) = processing_result { Some(CostModel::calculate_cost_for_executed_transaction( tx, - committed_tx.executed_units, - committed_tx.loaded_account_stats.loaded_accounts_data_size, + processed_tx.executed_units(), + processed_tx.loaded_accounts_data_size(), &bank.feature_set, )) } else { @@ -299,6 +293,28 @@ fn check_block_cost_limits( Ok(()) } +fn check_block_cost_limits_if_enabled( + batch: &TransactionBatch, + bank: &Bank, + timings: &mut ExecuteTimings, + processing_results: &[TransactionProcessingResult], +) -> Result<()> { + let (check_block_cost_limits_result, check_block_cost_limits_us) = measure_us!(if bank + .feature_set + .is_active(&solana_feature_set::apply_cost_tracker_during_replay::id()) + { + check_block_cost_limits(bank, processing_results, batch.sanitized_transactions()) + } else { + Ok(()) + }); + + timings.saturating_add_in_place( + ExecuteTimingType::CheckBlockLimitsUs, + check_block_cost_limits_us, + ); + check_block_cost_limits_result +} + #[derive(Default)] pub struct ExecuteBatchesInternalMetrics { execution_timings_per_thread: HashMap, @@ -350,7 +366,7 @@ fn execute_batches_internal( &mut timings, log_messages_bytes_limit, prioritization_fee_cache, - None:: PreCommitCallbackResult>>, + None:: _>, )); let thread_index = replay_tx_thread_pool.current_thread_index().unwrap(); @@ -2351,7 +2367,7 @@ pub mod tests { solana_entry::entry::{create_ticks, next_entry, next_entry_mut}, solana_program_runtime::declare_process_instruction, solana_runtime::{ - bank::{bank_hash_details::SlotDetails, PreCommitCallbackFailed}, + bank::bank_hash_details::SlotDetails, genesis_utils::{ self, create_genesis_config_with_vote_accounts, ValidatorVoteKeypairs, }, @@ -2363,12 +2379,10 @@ pub mod tests { solana_sdk::{ account::{AccountSharedData, WritableAccount}, epoch_schedule::EpochSchedule, - fee::FeeDetails, hash::Hash, instruction::{Instruction, InstructionError}, native_token::LAMPORTS_PER_SOL, pubkey::Pubkey, - rent_debits::RentDebits, signature::{Keypair, Signer}, signer::SeedDerivable, system_instruction::SystemError, @@ -2376,8 +2390,9 @@ pub mod tests { transaction::{Transaction, TransactionError}, }, solana_svm::{ - transaction_commit_result::CommittedTransaction, - transaction_execution_result::TransactionLoadedAccountsStats, + account_loader::LoadedTransaction, + transaction_execution_result::{ExecutedTransaction, TransactionExecutionDetails}, + transaction_processing_result::ProcessedTransaction, transaction_processor::ExecutionRecordingConfig, }, solana_vote::vote_account::VoteAccount, @@ -2386,8 +2401,8 @@ pub mod tests { vote_state::{TowerSync, VoteState, VoteStateVersions, MAX_LOCKOUT_HISTORY}, vote_transaction, }, - std::{collections::BTreeSet, sync::RwLock}, - test_case::test_case, + std::{collections::BTreeSet, slice, sync::RwLock}, + test_case::{test_case, test_matrix}, trees::tr, }; @@ -3386,17 +3401,19 @@ pub mod tests { assert_matches!(bank.transfer(4, &mint_keypair, &keypair2.pubkey()), Ok(_)); assert_matches!(bank.transfer(4, &mint_keypair, &keypair4.pubkey()), Ok(_)); + let good_tx = system_transaction::transfer( + &keypair1, + &mint_keypair.pubkey(), + 1, + bank.last_blockhash(), + ); + // construct an Entry whose 2nd transaction would cause a lock conflict with previous entry let entry_1_to_mint = next_entry( &bank.last_blockhash(), 1, vec![ - system_transaction::transfer( - &keypair1, - &mint_keypair.pubkey(), - 1, - bank.last_blockhash(), - ), + good_tx.clone(), system_transaction::transfer( &keypair4, &keypair4.pubkey(), @@ -3425,14 +3442,16 @@ pub mod tests { ], ); - assert!(process_entries_for_tests_without_scheduler( - &bank, - vec![entry_1_to_mint.clone(), entry_2_to_3_mint_to_1.clone()], - ) - .is_err()); + assert_matches!( + process_entries_for_tests_without_scheduler( + &bank, + vec![entry_1_to_mint.clone(), entry_2_to_3_mint_to_1.clone()], + ), + Err(TransactionError::BlockhashNotFound) + ); - // First transaction in first entry succeeded, so keypair1 lost 1 lamport - assert_eq!(bank.get_balance(&keypair1.pubkey()), 3); + // First transaction in first entry was rolled-back, so keypair1 didn't lost 1 lamport + assert_eq!(bank.get_balance(&keypair1.pubkey()), 4); assert_eq!(bank.get_balance(&keypair2.pubkey()), 4); // Check all accounts are unlocked @@ -3448,6 +3467,16 @@ pub mod tests { for result in batch2.lock_results() { assert!(result.is_ok()); } + drop(batch2); + + // ensure good_tx will succeed and was just rolled back above due to other failing tx + let entry_3 = next_entry(&entry_2_to_3_mint_to_1.hash, 1, vec![good_tx]); + assert_matches!( + process_entries_for_tests_without_scheduler(&bank, vec![entry_3]), + Ok(()) + ); + // First transaction in third entry succeeded, so keypair1 lost 1 lamport + assert_eq!(bank.get_balance(&keypair1.pubkey()), 3); } #[test_case(true; "rent_collected")] @@ -4423,7 +4452,7 @@ pub mod tests { &mut ExecuteTimings::default(), None, ); - let (err, signature) = get_first_error(&batch, &commit_results, false).unwrap(); + let (err, signature) = do_get_first_error(&batch, &commit_results).unwrap(); assert_eq!(err.unwrap_err(), TransactionError::AccountNotFound); assert_eq!(signature, account_not_found_sig); } @@ -5072,8 +5101,19 @@ pub mod tests { do_test_schedule_batches_for_execution(false); } - fn do_test_execute_batch_pre_commit_callback( - poh_result: PreCommitCallbackResult>, + enum TxResult { + ExecutedWithSuccess, + ExecutedWithFailure, + NotExecuted, + } + + #[test_matrix( + [TxResult::ExecutedWithSuccess, TxResult::ExecutedWithFailure, TxResult::NotExecuted], + [Ok(None), Ok(Some(4)), Err(TransactionError::CommitCancelled)] + )] + fn test_execute_batch_pre_commit_callback( + tx_result: TxResult, + poh_result: Result>, ) { solana_logger::setup(); let dummy_leader_pubkey = solana_sdk::pubkey::new_rand(); @@ -5085,14 +5125,43 @@ pub mod tests { let bank = Bank::new_for_tests(&genesis_config); let (bank, _bank_forks) = bank.wrap_with_bank_forks_for_tests(); let bank = Arc::new(bank); - let txs = create_test_transactions(&mint_keypair, &genesis_config.hash()); + let pubkey = solana_sdk::pubkey::new_rand(); + let (tx, expected_tx_result) = match tx_result { + TxResult::ExecutedWithSuccess => ( + RuntimeTransaction::from_transaction_for_tests(system_transaction::transfer( + &mint_keypair, + &pubkey, + 1, + genesis_config.hash(), + )), + Ok(()), + ), + TxResult::ExecutedWithFailure => ( + RuntimeTransaction::from_transaction_for_tests(system_transaction::transfer( + &mint_keypair, + &pubkey, + 100000000, + genesis_config.hash(), + )), + Ok(()), + ), + TxResult::NotExecuted => ( + RuntimeTransaction::from_transaction_for_tests(system_transaction::transfer( + &mint_keypair, + &pubkey, + 1, + Hash::default(), + )), + Err(TransactionError::BlockhashNotFound), + ), + }; let mut batch = TransactionBatch::new( vec![Ok(()); 1], &bank, - OwnedOrBorrowed::Borrowed(&txs[0..1]), + OwnedOrBorrowed::Borrowed(slice::from_ref(&tx)), ); batch.set_needs_unlock(false); - let poh_with_index = matches!(poh_result, Ok(Some(_))); + let poh_with_index = matches!(&poh_result, Ok(Some(_))); let batch = TransactionBatchWithIndexes { batch, transaction_indexes: vec![], @@ -5102,6 +5171,9 @@ pub mod tests { let (sender, receiver) = crossbeam_channel::unbounded(); assert_eq!(bank.transaction_count(), 0); + assert_eq!(bank.transaction_error_count(), 0); + let should_commit = poh_result.is_ok(); + let mut is_called = false; let result = execute_batch( &batch, &bank, @@ -5110,23 +5182,42 @@ pub mod tests { &mut timing, None, &prioritization_fee_cache, - Some(|| poh_result), + Some(|processing_result: &'_ Result<_>| { + is_called = true; + let ok = poh_result?; + if let Err(error) = processing_result { + Err(error.clone())?; + }; + Ok(ok) + }), ); - let should_succeed = poh_result.is_ok(); - if should_succeed { - assert_matches!(result, Ok(())); - assert_eq!(bank.transaction_count(), 1); + + // pre_commit_callback() should alwasy be called regardless of tx_result + assert!(is_called); + + if should_commit { + assert_eq!(result, expected_tx_result); + if expected_tx_result.is_ok() { + assert_eq!(bank.transaction_count(), 1); + if matches!(tx_result, TxResult::ExecutedWithFailure) { + assert_eq!(bank.transaction_error_count(), 1); + } else { + assert_eq!(bank.transaction_error_count(), 0); + } + } else { + assert_eq!(bank.transaction_count(), 0); + } } else { assert_matches!(result, Err(TransactionError::CommitCancelled)); assert_eq!(bank.transaction_count(), 0); } - if poh_with_index { + if poh_with_index && expected_tx_result.is_ok() { assert_matches!( receiver.try_recv(), Ok(TransactionStatusMessage::Batch(TransactionStatusBatch{transaction_indexes, ..})) if transaction_indexes == vec![4_usize] ); - } else if should_succeed { + } else if should_commit && expected_tx_result.is_ok() { assert_matches!( receiver.try_recv(), Ok(TransactionStatusMessage::Batch(TransactionStatusBatch{transaction_indexes, ..})) @@ -5137,21 +5228,6 @@ pub mod tests { } } - #[test] - fn test_execute_batch_pre_commit_callback_success() { - do_test_execute_batch_pre_commit_callback(Ok(None)); - } - - #[test] - fn test_execute_batch_pre_commit_callback_success_with_index() { - do_test_execute_batch_pre_commit_callback(Ok(Some(4))); - } - - #[test] - fn test_execute_batch_pre_commit_callback_failure() { - do_test_execute_batch_pre_commit_callback(Err(PreCommitCallbackFailed)); - } - #[test] fn test_confirm_slot_entries_with_fix() { const HASHES_PER_TICK: u64 = 10; @@ -5309,27 +5385,31 @@ pub mod tests { .unwrap() .set_limits(u64::MAX, block_limit, u64::MAX); let txs = vec![tx.clone(), tx]; - let commit_results = vec![ - Ok(CommittedTransaction { - status: Ok(()), - log_messages: None, - inner_instructions: None, - return_data: None, - executed_units: actual_execution_cu, - fee_details: FeeDetails::default(), - rent_debits: RentDebits::default(), - loaded_account_stats: TransactionLoadedAccountsStats { - loaded_accounts_data_size: actual_loaded_accounts_data_size, - loaded_accounts_count: 2, + let processing_results = vec![ + Ok(ProcessedTransaction::Executed(Box::new( + ExecutedTransaction { + execution_details: TransactionExecutionDetails { + status: Ok(()), + log_messages: None, + inner_instructions: None, + return_data: None, + executed_units: actual_execution_cu, + accounts_data_len_delta: 0, + }, + loaded_transaction: LoadedTransaction { + loaded_accounts_data_size: actual_loaded_accounts_data_size, + ..LoadedTransaction::default() + }, + programs_modified_by_tx: HashMap::new(), }, - }), + ))), Err(TransactionError::AccountNotFound), ]; - assert!(check_block_cost_limits(&bank, &commit_results, &txs).is_ok()); + assert!(check_block_cost_limits(&bank, &processing_results, &txs).is_ok()); assert_eq!( Err(TransactionError::WouldExceedMaxBlockCostLimit), - check_block_cost_limits(&bank, &commit_results, &txs) + check_block_cost_limits(&bank, &processing_results, &txs) ); } } diff --git a/runtime/src/bank.rs b/runtime/src/bank.rs index 1e197d2cd5d428..03a97b35249aa9 100644 --- a/runtime/src/bank.rs +++ b/runtime/src/bank.rs @@ -360,10 +360,6 @@ impl TransactionBalancesSet { } pub type TransactionBalances = Vec>; -#[derive(Clone, Copy, Debug)] -pub struct PreCommitCallbackFailed; -pub type PreCommitCallbackResult = std::result::Result; - #[derive(Serialize, Deserialize, Debug, PartialEq, Eq)] pub enum TransactionLogCollectorFilter { All, @@ -3791,52 +3787,56 @@ impl Bank { ) -> Vec { processing_results .into_iter() - .map(|processing_result| match processing_result? { - ProcessedTransaction::Executed(executed_tx) => { - let execution_details = executed_tx.execution_details; - let LoadedTransaction { - rent_debits, - accounts: loaded_accounts, - loaded_accounts_data_size, - fee_details, - .. - } = executed_tx.loaded_transaction; - - // Rent is only collected for successfully executed transactions - let rent_debits = if execution_details.was_successful() { - rent_debits - } else { - RentDebits::default() - }; + .map(|processing_result| { + let processing_result = processing_result?; + let executed_units = processing_result.executed_units(); + let loaded_accounts_data_size = processing_result.loaded_accounts_data_size(); - Ok(CommittedTransaction { - status: execution_details.status, - log_messages: execution_details.log_messages, - inner_instructions: execution_details.inner_instructions, - return_data: execution_details.return_data, - executed_units: execution_details.executed_units, - fee_details, - rent_debits, + match processing_result { + ProcessedTransaction::Executed(executed_tx) => { + let execution_details = executed_tx.execution_details; + let LoadedTransaction { + rent_debits, + accounts: loaded_accounts, + fee_details, + .. + } = executed_tx.loaded_transaction; + + // Rent is only collected for successfully executed transactions + let rent_debits = if execution_details.was_successful() { + rent_debits + } else { + RentDebits::default() + }; + + Ok(CommittedTransaction { + status: execution_details.status, + log_messages: execution_details.log_messages, + inner_instructions: execution_details.inner_instructions, + return_data: execution_details.return_data, + executed_units, + fee_details, + rent_debits, + loaded_account_stats: TransactionLoadedAccountsStats { + loaded_accounts_count: loaded_accounts.len(), + loaded_accounts_data_size, + }, + }) + } + ProcessedTransaction::FeesOnly(fees_only_tx) => Ok(CommittedTransaction { + status: Err(fees_only_tx.load_error), + log_messages: None, + inner_instructions: None, + return_data: None, + executed_units, + rent_debits: RentDebits::default(), + fee_details: fees_only_tx.fee_details, loaded_account_stats: TransactionLoadedAccountsStats { - loaded_accounts_count: loaded_accounts.len(), + loaded_accounts_count: fees_only_tx.rollback_accounts.count(), loaded_accounts_data_size, }, - }) + }), } - ProcessedTransaction::FeesOnly(fees_only_tx) => Ok(CommittedTransaction { - status: Err(fees_only_tx.load_error), - log_messages: None, - inner_instructions: None, - return_data: None, - executed_units: 0, - rent_debits: RentDebits::default(), - fee_details: fees_only_tx.fee_details, - loaded_account_stats: TransactionLoadedAccountsStats { - loaded_accounts_count: fees_only_tx.rollback_accounts.count(), - loaded_accounts_data_size: fees_only_tx.rollback_accounts.data_size() - as u32, - }, - }), }) .collect() } @@ -4524,14 +4524,14 @@ impl Bank { timings: &mut ExecuteTimings, log_messages_bytes_limit: Option, ) -> (Vec, TransactionBalancesSet) { - self.load_execute_and_commit_transactions_with_pre_commit_callback( + self.do_load_execute_and_commit_transactions_with_pre_commit_callback( batch, max_age, collect_balances, recording_config, timings, log_messages_bytes_limit, - None:: PreCommitCallbackResult<()>>, + None:: _>, ) .unwrap() } @@ -4544,10 +4544,34 @@ impl Bank { recording_config: ExecutionRecordingConfig, timings: &mut ExecuteTimings, log_messages_bytes_limit: Option, - // None is meaningfully used to skip the block producing unified scheduler special case. - // This makes the special case assert!()-ed. - pre_commit_callback: Option PreCommitCallbackResult<()>>, - ) -> PreCommitCallbackResult<(Vec, TransactionBalancesSet)> { + pre_commit_callback: impl FnOnce( + &mut ExecuteTimings, + &[TransactionProcessingResult], + ) -> Result<()>, + ) -> Result<(Vec, TransactionBalancesSet)> { + self.do_load_execute_and_commit_transactions_with_pre_commit_callback( + batch, + max_age, + collect_balances, + recording_config, + timings, + log_messages_bytes_limit, + Some(pre_commit_callback), + ) + } + + fn do_load_execute_and_commit_transactions_with_pre_commit_callback( + &self, + batch: &TransactionBatch, + max_age: usize, + collect_balances: bool, + recording_config: ExecutionRecordingConfig, + timings: &mut ExecuteTimings, + log_messages_bytes_limit: Option, + pre_commit_callback: Option< + impl FnOnce(&mut ExecuteTimings, &[TransactionProcessingResult]) -> Result<()>, + >, + ) -> Result<(Vec, TransactionBalancesSet)> { let pre_balances = if collect_balances { self.collect_balances(batch) } else { @@ -4574,22 +4598,8 @@ impl Bank { ); if let Some(pre_commit_callback) = pre_commit_callback { - // We're entering into the block-producing unified scheduler special case... - // `processing_results` should always contain exactly only 1 result in that case. - assert_eq!(processing_results.len(), 1); - - // Make `pre_commit_callback()` unconditionally take precedence over - // `processing_results[0].was_processed()`, potentially shadowing processing errors. - // That's desired because pre commit failure signalling is actually used to propagate - // poh recording failures, which is time-sensitive by nature to winds down the unified - // scheduler's active scheduling session as soon as possible upon the blockage by poh - // recorder. - // Also, bail out here rather than overwriting `processing_results[0]`. Reconciling - // various state is rather error-prone at this point. For example, processed_counts - // would need to be correctly updated... - pre_commit_callback()? + pre_commit_callback(timings, &processing_results)?; } - let commit_results = self.commit_transactions( batch.sanitized_transactions(), processing_results, diff --git a/svm/src/transaction_processing_result.rs b/svm/src/transaction_processing_result.rs index c8da2a941c1a9b..52532988f8f3d7 100644 --- a/svm/src/transaction_processing_result.rs +++ b/svm/src/transaction_processing_result.rs @@ -87,4 +87,17 @@ impl ProcessedTransaction { Self::FeesOnly { .. } => None, } } + + pub fn executed_units(&self) -> u64 { + self.execution_details() + .map(|detail| detail.executed_units) + .unwrap_or_default() + } + + pub fn loaded_accounts_data_size(&self) -> u32 { + match self { + Self::Executed(context) => context.loaded_transaction.loaded_accounts_data_size, + Self::FeesOnly(details) => details.rollback_accounts.data_size() as u32, + } + } } diff --git a/unified-scheduler-pool/Cargo.toml b/unified-scheduler-pool/Cargo.toml index 842927373ae2e2..b720642a91906c 100644 --- a/unified-scheduler-pool/Cargo.toml +++ b/unified-scheduler-pool/Cargo.toml @@ -34,10 +34,12 @@ assert_matches = { workspace = true } lazy_static = { workspace = true } solana-clock = { workspace = true } solana-entry = { workspace = true } +solana-hash = { workspace = true } solana-keypair = { workspace = true } solana-logger = { workspace = true } solana-runtime = { workspace = true, features = ["dev-context-only-utils"] } solana-system-transaction = { workspace = true } +test-case = { workspace = true } [features] dev-context-only-utils = [] diff --git a/unified-scheduler-pool/src/lib.rs b/unified-scheduler-pool/src/lib.rs index 9e9383c227b934..aa427ddbb89bb5 100644 --- a/unified-scheduler-pool/src/lib.rs +++ b/unified-scheduler-pool/src/lib.rs @@ -23,7 +23,6 @@ use { solana_poh::poh_recorder::{RecordTransactionsSummary, TransactionRecorder}, solana_pubkey::Pubkey, solana_runtime::{ - bank::PreCommitCallbackFailed, installed_scheduler_pool::{ initialized_result_with_timings, InstalledScheduler, InstalledSchedulerBox, InstalledSchedulerPool, InstalledSchedulerPoolArc, ResultWithTimings, ScheduleResult, @@ -471,7 +470,11 @@ impl TaskHandler for DefaultTaskHandler { let pre_commit_callback = match scheduling_context.mode() { BlockVerification => None, - BlockProduction => Some(|| { + BlockProduction => Some(|processing_result: &'_ Result<_>| { + if let Err(error) = processing_result { + Err(error.clone())?; + }; + let RecordTransactionsSummary { result, starting_transaction_index, @@ -483,7 +486,7 @@ impl TaskHandler for DefaultTaskHandler { .record_transactions(bank.slot(), vec![transaction.to_versioned_transaction()]); match result { Ok(()) => Ok(starting_transaction_index), - Err(_) => Err(PreCommitCallbackFailed), + Err(_) => Err(TransactionError::CommitCancelled), } }), }; @@ -1521,6 +1524,7 @@ mod tests { crate::sleepless_testing, assert_matches::assert_matches, solana_clock::{Slot, MAX_PROCESSING_AGE}, + solana_hash::Hash, solana_keypair::Keypair, solana_ledger::{ blockstore::Blockstore, @@ -1545,6 +1549,7 @@ mod tests { sync::{atomic::Ordering, Arc, RwLock}, thread::JoinHandle, }, + test_case::test_matrix, }; #[derive(Debug)] @@ -3000,7 +3005,17 @@ mod tests { assert_matches!(result, Err(TransactionError::AccountLoadedTwice)); } - fn do_test_task_handler_poh_recording(should_succeed: bool) { + enum TxResult { + ExecutedWithSuccess, + ExecutedWithFailure, + NotExecuted, + } + + #[test_matrix( + [TxResult::ExecutedWithSuccess, TxResult::ExecutedWithFailure, TxResult::NotExecuted], + [false, true] + )] + fn test_task_handler_poh_recording(tx_result: TxResult, should_succeed_to_record_to_poh: bool) { solana_logger::setup(); let GenesisConfigInfo { @@ -3012,12 +3027,35 @@ mod tests { let bank_forks = BankForks::new_rw_arc(bank); let bank = bank_forks.read().unwrap().working_bank_with_scheduler(); - let tx = system_transaction::transfer( - mint_keypair, - &solana_sdk::pubkey::new_rand(), - 2, - genesis_config.hash(), - ); + let (tx, expected_tx_result) = match tx_result { + TxResult::ExecutedWithSuccess => ( + system_transaction::transfer( + mint_keypair, + &solana_pubkey::new_rand(), + 1, + genesis_config.hash(), + ), + Ok(()), + ), + TxResult::ExecutedWithFailure => ( + system_transaction::transfer( + mint_keypair, + &solana_pubkey::new_rand(), + 1_000_000, + genesis_config.hash(), + ), + Ok(()), + ), + TxResult::NotExecuted => ( + system_transaction::transfer( + mint_keypair, + &solana_pubkey::new_rand(), + 1, + Hash::default(), + ), + Err(TransactionError::BlockhashNotFound), + ), + }; let tx = RuntimeTransaction::from_transaction_for_tests(tx); let result = &mut Ok(()); @@ -3048,7 +3086,7 @@ mod tests { // wait until the poh's working bank is cleared. // also flush signal_receiver after that. - if !should_succeed { + if !should_succeed_to_record_to_poh { while poh_recorder.read().unwrap().bank().is_some() { sleep(Duration::from_millis(100)); } @@ -3056,24 +3094,43 @@ mod tests { } assert_eq!(bank.transaction_count(), 0); + assert_eq!(bank.transaction_error_count(), 0); DefaultTaskHandler::handle(result, timings, scheduling_context, &task, handler_context); - if should_succeed { - assert_matches!(result, Ok(())); - assert_eq!(bank.transaction_count(), 1); - assert_matches!( - receiver.try_recv(), - Ok(TransactionStatusMessage::Batch( - TransactionStatusBatch { .. } - )) - ); - assert_matches!( - signal_receiver.try_recv(), - Ok((_, (solana_entry::entry::Entry {transactions, ..} , _))) - if transactions == vec![tx.to_versioned_transaction()] - ); + if should_succeed_to_record_to_poh { + if expected_tx_result.is_ok() { + assert_matches!(result, Ok(())); + assert_eq!(bank.transaction_count(), 1); + if matches!(tx_result, TxResult::ExecutedWithFailure) { + assert_eq!(bank.transaction_error_count(), 1); + } else { + assert_eq!(bank.transaction_error_count(), 0); + } + assert_matches!( + receiver.try_recv(), + Ok(TransactionStatusMessage::Batch( + TransactionStatusBatch { .. } + )) + ); + assert_matches!( + signal_receiver.try_recv(), + Ok((_, (solana_entry::entry::Entry {transactions, ..} , _))) + if transactions == vec![tx.to_versioned_transaction()] + ); + } else { + assert_eq!(result, &expected_tx_result); + assert_eq!(bank.transaction_count(), 0); + assert_eq!(bank.transaction_error_count(), 0); + assert_matches!(receiver.try_recv(), Err(_)); + assert_matches!(signal_receiver.try_recv(), Err(_)); + } } else { - assert_matches!(result, Err(TransactionError::CommitCancelled)); + if expected_tx_result.is_ok() { + assert_matches!(result, Err(TransactionError::CommitCancelled)); + } else { + assert_eq!(result, &expected_tx_result); + } + assert_eq!(bank.transaction_count(), 0); assert_matches!(receiver.try_recv(), Err(_)); assert_matches!(signal_receiver.try_recv(), Err(_)); @@ -3082,14 +3139,4 @@ mod tests { exit.store(true, Ordering::Relaxed); poh_service.join().unwrap(); } - - #[test] - fn test_task_handler_poh_recording_success() { - do_test_task_handler_poh_recording(true); - } - - #[test] - fn test_task_handler_poh_recording_failure() { - do_test_task_handler_poh_recording(false); - } } From 272a959323968337f1255f06676786895af5166a Mon Sep 17 00:00:00 2001 From: Ryo Onodera Date: Sun, 5 Jan 2025 21:24:22 +0900 Subject: [PATCH 11/12] Clean up index population --- ledger/src/blockstore_processor.rs | 9 +++++++-- unified-scheduler-pool/src/lib.rs | 15 +++++---------- 2 files changed, 12 insertions(+), 12 deletions(-) diff --git a/ledger/src/blockstore_processor.rs b/ledger/src/blockstore_processor.rs index 9afefadb4b98a7..8c028b0cbd7be0 100644 --- a/ledger/src/blockstore_processor.rs +++ b/ledger/src/blockstore_processor.rs @@ -194,8 +194,13 @@ pub fn execute_batch( assert_eq!(processing_results.len(), 1); assert!(transaction_indexes.is_empty()); - let maybe_index = extra_pre_commit_callback(&processing_results[0])?; - transaction_indexes.to_mut().extend(maybe_index); + if let Some(index) = extra_pre_commit_callback(&processing_results[0])? { + let transaction_indexes = transaction_indexes.to_mut(); + // Adjust the empty new vec with the exact needed capacity. Otherwise, excess + // cap would be reserved on `.push()` in it. + transaction_indexes.reserve_exact(1); + transaction_indexes.push(index); + } } } Ok(()) diff --git a/unified-scheduler-pool/src/lib.rs b/unified-scheduler-pool/src/lib.rs index aa427ddbb89bb5..f926945c769089 100644 --- a/unified-scheduler-pool/src/lib.rs +++ b/unified-scheduler-pool/src/lib.rs @@ -446,21 +446,16 @@ impl TaskHandler for DefaultTaskHandler { let transaction_indexes = match scheduling_context.mode() { BlockVerification => vec![index], BlockProduction => { - let mut vec = vec![]; + // Create a placeholder vec, which will be populated later if + // transaction_status_sender is Some(_). // transaction_status_sender is usually None for staked nodes because it's only // used for RPC-related additional data recording. However, a staked node could // also be running with rpc functionalities during development. So, we need to // correctly support the use case for produced blocks as well, like verified blocks // via the replaying stage. - // Refer `record_token_balances` in `execute_batch()` as this conditional treatment - // is mirrored from it. - if handler_context.transaction_status_sender.is_some() { - // Adjust the empty new vec with the exact needed capacity, which will be - // filled inside `execute_batch()` below. Otherwise, excess cap would be - // reserved on `.extend()` in it. - vec.reserve_exact(1); - } - vec + // Refer `record_token_balances` in `execute_batch()` as this treatment is mirrored + // from it. + vec![] } }; let batch_with_indexes = TransactionBatchWithIndexes { From fb71c0dccaab66db36228f0f3ab45e12c0d4377d Mon Sep 17 00:00:00 2001 From: Ryo Onodera Date: Fri, 10 Jan 2025 09:49:02 +0900 Subject: [PATCH 12/12] Feeeze-lock block-producing unified scheduler bank --- ledger/src/blockstore_processor.rs | 32 ++++++++++++++++++++---------- runtime/src/bank.rs | 27 ++++++++++++++++--------- 2 files changed, 40 insertions(+), 19 deletions(-) diff --git a/ledger/src/blockstore_processor.rs b/ledger/src/blockstore_processor.rs index 8c028b0cbd7be0..6f0f35d8f4bb7a 100644 --- a/ledger/src/blockstore_processor.rs +++ b/ledger/src/blockstore_processor.rs @@ -28,7 +28,7 @@ use { solana_rayon_threadlimit::{get_max_thread_count, get_thread_count}, solana_runtime::{ accounts_background_service::{AbsRequestSender, SnapshotRequestKind}, - bank::{Bank, TransactionBalancesSet}, + bank::{Bank, PreCommitResult, TransactionBalancesSet}, bank_forks::{BankForks, SetRootError}, bank_utils, commitment::VOTE_THRESHOLD_SIZE, @@ -153,14 +153,14 @@ fn create_thread_pool(num_threads: usize) -> ThreadPool { .expect("new rayon threadpool") } -pub fn execute_batch( - batch: &TransactionBatchWithIndexes, - bank: &Arc, - transaction_status_sender: Option<&TransactionStatusSender>, - replay_vote_sender: Option<&ReplayVoteSender>, - timings: &mut ExecuteTimings, +pub fn execute_batch<'a>( + batch: &'a TransactionBatchWithIndexes, + bank: &'a Arc, + transaction_status_sender: Option<&'a TransactionStatusSender>, + replay_vote_sender: Option<&'a ReplayVoteSender>, + timings: &'a mut ExecuteTimings, log_messages_bytes_limit: Option, - prioritization_fee_cache: &PrioritizationFeeCache, + prioritization_fee_cache: &'a PrioritizationFeeCache, // None is meaningfully used to detect this is called from the block producing unified // scheduler. If so, suppress too verbose logging for the code path. extra_pre_commit_callback: Option< @@ -182,11 +182,12 @@ pub fn execute_batch( vec![] }; - let pre_commit_callback = |timings: &mut _, processing_results: &_| { + let pre_commit_callback = |timings: &mut _, processing_results: &_| -> PreCommitResult { match extra_pre_commit_callback { None => { get_first_error(batch, processing_results)?; check_block_cost_limits_if_enabled(batch, bank, timings, processing_results)?; + Ok(None) } Some(extra_pre_commit_callback) => { // We're entering into the block-producing unified scheduler special case... @@ -194,6 +195,12 @@ pub fn execute_batch( assert_eq!(processing_results.len(), 1); assert!(transaction_indexes.is_empty()); + // From now on, we need to freeze-lock the tpu bank, in order to prevent it from + // freezing in the middle of this code-path. Otherwise, the assertion at the start + // of commit_transactions() would trigger panic because it's fatal runtime + // invariant violation. + let freeze_lock = bank.freeze_lock(); + if let Some(index) = extra_pre_commit_callback(&processing_results[0])? { let transaction_indexes = transaction_indexes.to_mut(); // Adjust the empty new vec with the exact needed capacity. Otherwise, excess @@ -201,9 +208,14 @@ pub fn execute_batch( transaction_indexes.reserve_exact(1); transaction_indexes.push(index); } + // At this point, poh should have been succeeded so it's guaranteed that the bank + // hasn't been frozen yet and we're still holding the lock. So, it's okay to pass + // down freeze_lock without any introspection here to be unconditionally dropped + // after commit_transactions(). This reasoning is same as + // solana_core::banking_stage::Consumer::execute_and_commit_transactions_locked() + Ok(Some(freeze_lock)) } } - Ok(()) }; let (commit_results, balances) = batch diff --git a/runtime/src/bank.rs b/runtime/src/bank.rs index 03a97b35249aa9..2016412917d766 100644 --- a/runtime/src/bank.rs +++ b/runtime/src/bank.rs @@ -360,6 +360,8 @@ impl TransactionBalancesSet { } pub type TransactionBalances = Vec>; +pub type PreCommitResult<'a> = Result>>; + #[derive(Serialize, Deserialize, Debug, PartialEq, Eq)] pub enum TransactionLogCollectorFilter { All, @@ -4536,8 +4538,8 @@ impl Bank { .unwrap() } - pub fn load_execute_and_commit_transactions_with_pre_commit_callback( - &self, + pub fn load_execute_and_commit_transactions_with_pre_commit_callback<'a>( + &'a self, batch: &TransactionBatch, max_age: usize, collect_balances: bool, @@ -4547,7 +4549,7 @@ impl Bank { pre_commit_callback: impl FnOnce( &mut ExecuteTimings, &[TransactionProcessingResult], - ) -> Result<()>, + ) -> PreCommitResult<'a>, ) -> Result<(Vec, TransactionBalancesSet)> { self.do_load_execute_and_commit_transactions_with_pre_commit_callback( batch, @@ -4560,8 +4562,8 @@ impl Bank { ) } - fn do_load_execute_and_commit_transactions_with_pre_commit_callback( - &self, + fn do_load_execute_and_commit_transactions_with_pre_commit_callback<'a>( + &'a self, batch: &TransactionBatch, max_age: usize, collect_balances: bool, @@ -4569,7 +4571,7 @@ impl Bank { timings: &mut ExecuteTimings, log_messages_bytes_limit: Option, pre_commit_callback: Option< - impl FnOnce(&mut ExecuteTimings, &[TransactionProcessingResult]) -> Result<()>, + impl FnOnce(&mut ExecuteTimings, &[TransactionProcessingResult]) -> PreCommitResult<'a>, >, ) -> Result<(Vec, TransactionBalancesSet)> { let pre_balances = if collect_balances { @@ -4597,15 +4599,22 @@ impl Bank { }, ); - if let Some(pre_commit_callback) = pre_commit_callback { - pre_commit_callback(timings, &processing_results)?; - } + // pre_commit_callback could initiate an atomic operation (i.e. poh recording with block + // producing unified scheduler). in that case, it returns Some(freeze_lock), which should + // unlocked only after calling commit_transactions() immediately after calling the + // callback. + let freeze_lock = if let Some(pre_commit_callback) = pre_commit_callback { + pre_commit_callback(timings, &processing_results)? + } else { + None + }; let commit_results = self.commit_transactions( batch.sanitized_transactions(), processing_results, &processed_counts, timings, ); + drop(freeze_lock); let post_balances = if collect_balances { self.collect_balances(batch) } else {