Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Wire unified scheduler into banking experimentally #3946

Draft
wants to merge 93 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
93 commits
Select commit Hold shift + click to select a range
b6ec634
Wire unified scheduler into banking experimentally
ryoqun Dec 4, 2024
038559e
Move SchedulingMode out of solana-sdk
ryoqun Dec 11, 2024
5227546
Remove needless type equality binds
ryoqun Dec 13, 2024
7e8feaa
Move is_trashed() to SchedulerInner
ryoqun Dec 13, 2024
256c21b
Fix CI
ryoqun Dec 14, 2024
5633cd6
Improve bank_forks/unified scheduler docs in core
ryoqun Dec 14, 2024
7e0926e
Clean do_take_resumed_scheduler() a bit
ryoqun Dec 14, 2024
ff28823
Reword a bit
ryoqun Dec 15, 2024
cf8d658
Revert "Create RootBankCache earlier for unified scheduler"
ryoqun Dec 17, 2024
6921b38
Initialize unified scheduler at once earliest
ryoqun Dec 17, 2024
da3f787
Convert session_ending condition to assert!()
ryoqun Dec 19, 2024
21fcc1f
Revert update_bank_forks_and_poh_recorder_for_new_tpu_bank()
ryoqun Dec 19, 2024
f319902
Use unblocking mechanism instead of nested locks
ryoqun Dec 19, 2024
656444c
fix ci?
ryoqun Dec 20, 2024
1ea4bbf
fix ci??
ryoqun Dec 20, 2024
030b979
rebase fixups
ryoqun Jan 11, 2025
3df47ba
rebase fixups
ryoqun Jan 11, 2025
6a34d0c
rebase fixups
ryoqun Jan 12, 2025
a0e63ee
Remove solana-sdk dep from unified-scheduler-pool
ryoqun Jan 13, 2025
a0535cd
Reorder Derives on SchedulingMode
ryoqun Jan 13, 2025
d4bd053
Clean up block producing unified scheduler setup
ryoqun Jan 13, 2025
b4bfe88
Revert pub(crate) on decision_maker
ryoqun Jan 16, 2025
af401de
Simplify unified_receiver()
ryoqun Jan 16, 2025
7aa76e9
Merge commit '11a84a4e1dda3ea792ee2bcf4ce21ef3a009c6fb' into HEAD
ryoqun Jan 17, 2025
47d1ce6
Merge commit '8f9d3d402da0e63c48817c36d6e09a352c6d6084' into HEAD
ryoqun Jan 17, 2025
8935353
cargo fmt
ryoqun Jan 17, 2025
0d3de56
Add test: test_scheduler_producing_blocks()
ryoqun Jan 17, 2025
46217ab
Merge commit 'e508373f2f29ed2ab95e496c2accc14390c8482f' into HEAD
ryoqun Jan 19, 2025
c529866
Merge commit 'c59d2c1eb290a2fb5d04cf0ffef18068d3f2f7ea' into HEAD
ryoqun Feb 6, 2025
f777541
Merge commit '79692775a3c3c81104f56ad91a2154d3fc599e73' into HEAD
ryoqun Feb 6, 2025
2a86c03
Merge commit '2d4d7c1e047f43fd5cefbb8a7d84b3bd71b46c6d' into HEAD
ryoqun Feb 6, 2025
bd52562
Merge commit 'b61839b53c23a75b729dffeb3d25f1c46b9259b3' into HEAD
ryoqun Feb 6, 2025
d47f631
Merge commit '5a7852ac66730b516079f90e88f32c8f27a56afd' into HEAD
ryoqun Feb 6, 2025
4b67090
save
ryoqun Feb 6, 2025
a5e4006
save
ryoqun Feb 7, 2025
d617c5a
Clean up spawn code
ryoqun Feb 7, 2025
f198da8
save
ryoqun Feb 7, 2025
7f667de
save
ryoqun Feb 7, 2025
e49cd74
save
ryoqun Feb 7, 2025
6bedb0f
save
ryoqun Feb 7, 2025
f7e1c56
save
ryoqun Feb 7, 2025
a80e6b9
save
ryoqun Feb 7, 2025
cdf565b
save
ryoqun Feb 7, 2025
9915026
save
ryoqun Feb 7, 2025
b5d9bc2
save
ryoqun Feb 7, 2025
6100850
save
ryoqun Feb 7, 2025
857454e
save
ryoqun Feb 7, 2025
da2bdb7
save
ryoqun Feb 7, 2025
4d5e1c3
save
ryoqun Feb 7, 2025
b38cdd0
save
ryoqun Feb 7, 2025
e4b84ed
save
ryoqun Feb 7, 2025
a2a3580
save
ryoqun Feb 7, 2025
71c4227
save
ryoqun Feb 7, 2025
5fd9483
save
ryoqun Feb 7, 2025
5e65c97
save
ryoqun Feb 7, 2025
49c71e4
save
ryoqun Feb 7, 2025
49d2c26
save
ryoqun Feb 7, 2025
9ed24a5
save
ryoqun Feb 7, 2025
4de9929
save
ryoqun Feb 7, 2025
ce0e724
save
ryoqun Feb 7, 2025
69b9eb5
save
ryoqun Feb 7, 2025
2c0bcb2
save
ryoqun Feb 7, 2025
ca51b5a
save
ryoqun Feb 7, 2025
12357f5
save
ryoqun Feb 7, 2025
33a9eae
save
ryoqun Feb 7, 2025
a32f428
save
ryoqun Feb 7, 2025
9eb4219
save
ryoqun Feb 7, 2025
df0caf0
save
ryoqun Feb 7, 2025
116f52c
save
ryoqun Feb 7, 2025
8592d47
Merge commit '2974f02e53d' into HEAD
ryoqun Feb 28, 2025
8981079
Merge commit '7b0adc574b8' into HEAD
ryoqun Feb 28, 2025
4f3e379
Merge commit '130b1d22296' into HEAD
ryoqun Feb 28, 2025
a1bf892
Merge commit '86faa211d98' into HEAD
ryoqun Feb 28, 2025
6eb0d8c
Merge commit '64a88714b18' into HEAD
ryoqun Feb 28, 2025
c1be69a
Disable test
ryoqun Feb 28, 2025
1c9c4f1
kick ci
ryoqun Feb 28, 2025
cc408ba
Merge commit 'cd2aa1b6e90' into HEAD
ryoqun Feb 28, 2025
958d492
save
ryoqun Feb 28, 2025
6387ef1
save
ryoqun Feb 28, 2025
16506eb
fix ci
ryoqun Mar 3, 2025
2523799
fix ci && prettier diff
ryoqun Mar 3, 2025
ee42538
more clean up and enable local cluster test
ryoqun Mar 3, 2025
3497d7e
another clean up....
ryoqun Mar 3, 2025
67e2e54
save
ryoqun Mar 3, 2025
91aa8e8
Rename HandlerContext::{count,parallelism}
ryoqun Mar 3, 2025
1b35957
last touch...
ryoqun Mar 3, 2025
aca5764
Define ThreadManager::can_finish_session()
ryoqun Mar 4, 2025
8cf150a
save
ryoqun Mar 4, 2025
1ff13da
Mode char
ryoqun Mar 4, 2025
88ed6b0
save
ryoqun Mar 4, 2025
f99a282
kick ci
ryoqun Mar 5, 2025
7db6c88
kick ci
ryoqun Mar 5, 2025
fe6b8cc
kick ci
ryoqun Mar 5, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions banking-bench/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ solana-runtime = { workspace = true, features = ["dev-context-only-utils"] }
solana-sdk = { workspace = true }
solana-streamer = { workspace = true }
solana-tpu-client = { workspace = true }
solana-unified-scheduler-logic = { workspace = true }
solana-unified-scheduler-pool = { workspace = true }
solana-version = { workspace = true }

[features]
Expand Down
45 changes: 41 additions & 4 deletions banking-bench/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,10 @@ use {
rand::{thread_rng, Rng},
rayon::prelude::*,
solana_core::{
banking_stage::{update_bank_forks_and_poh_recorder_for_new_tpu_bank, BankingStage},
banking_stage::{
unified_scheduler::ensure_banking_stage_setup,
update_bank_forks_and_poh_recorder_for_new_tpu_bank, BankingStage,
},
banking_trace::{BankingTracer, Channels, BANKING_TRACE_DIR_DEFAULT_BYTE_LIMIT},
validator::{BlockProductionMethod, TransactionStructure},
},
Expand Down Expand Up @@ -36,6 +39,8 @@ use {
transaction::Transaction,
},
solana_streamer::socket::SocketAddrSpace,
solana_unified_scheduler_logic::SchedulingMode,
solana_unified_scheduler_pool::{DefaultSchedulerPool, SupportedSchedulingMode},
std::{
sync::{atomic::Ordering, Arc, RwLock},
thread::sleep,
Expand Down Expand Up @@ -452,16 +457,35 @@ fn main() {
ClusterInfo::new(node.info, keypair, SocketAddrSpace::Unspecified)
};
let cluster_info = Arc::new(cluster_info);
let banking_tracer_channels = if matches!(
block_production_method,
BlockProductionMethod::UnifiedScheduler
) {
let pool = DefaultSchedulerPool::new(
SupportedSchedulingMode::Either(SchedulingMode::BlockProduction),
None,
None,
None,
Some(replay_vote_sender.clone()),
prioritization_fee_cache.clone(),
);
let channels = banking_tracer.create_channels_for_scheduler_pool(&pool);
ensure_banking_stage_setup(&pool, &bank_forks, &channels, &cluster_info, &poh_recorder);
bank_forks.write().unwrap().install_scheduler_pool(pool);
channels
} else {
banking_tracer.create_channels(false)
};
let Channels {
non_vote_sender,
non_vote_receiver,
tpu_vote_sender,
tpu_vote_receiver,
gossip_vote_sender,
gossip_vote_receiver,
} = banking_tracer.create_channels(false);
} = banking_tracer_channels;
let banking_stage = BankingStage::new_num_threads(
block_production_method,
block_production_method.clone(),
transaction_struct,
&cluster_info,
&poh_recorder,
Expand All @@ -476,6 +500,18 @@ fn main() {
&prioritization_fee_cache,
);

// This bench processes transactions, starting from the very first bank, so special-casing is
// needed for unified scheduler.
if matches!(
block_production_method,
BlockProductionMethod::UnifiedScheduler
) {
bank = bank_forks
.write()
.unwrap()
.reinstall_block_production_scheduler_into_working_genesis_bank();
}

// This is so that the signal_receiver does not go out of scope after the closure.
// If it is dropped before poh_service, then poh_service will error when
// calling send() on the channel.
Expand Down Expand Up @@ -536,10 +572,11 @@ fn main() {
tx_total_us += now.elapsed().as_micros() as u64;

let mut poh_time = Measure::start("poh_time");
poh_recorder
let cleared_bank = poh_recorder
.write()
.unwrap()
.reset(bank.clone(), Some((bank.slot(), bank.slot() + 1)));
assert_matches!(cleared_bank, None);
poh_time.stop();

let mut new_bank_time = Measure::start("new_bank");
Expand Down
3 changes: 2 additions & 1 deletion core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ bytes = { workspace = true }
chrono = { workspace = true, features = ["default", "serde"] }
crossbeam-channel = { workspace = true }
dashmap = { workspace = true, features = ["rayon", "raw-api"] }
derive_more = { workspace = true }
etcd-client = { workspace = true, features = ["tls"] }
futures = { workspace = true }
histogram = { workspace = true }
Expand Down Expand Up @@ -94,6 +95,7 @@ solana-tls-utils = { workspace = true }
solana-tpu-client = { workspace = true }
solana-transaction-status = { workspace = true }
solana-turbine = { workspace = true }
solana-unified-scheduler-logic = { workspace = true }
solana-unified-scheduler-pool = { workspace = true }
solana-version = { workspace = true }
solana-vote = { workspace = true }
Expand Down Expand Up @@ -125,7 +127,6 @@ solana-rpc = { workspace = true, features = ["dev-context-only-utils"] }
solana-sdk = { workspace = true, features = ["dev-context-only-utils"] }
solana-stake-program = { workspace = true }
solana-system-program = { workspace = true }
solana-unified-scheduler-logic = { workspace = true }
solana-unified-scheduler-pool = { workspace = true, features = [
"dev-context-only-utils",
] }
Expand Down
20 changes: 19 additions & 1 deletion core/src/banking_simulation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
use {
crate::{
banking_stage::{
unified_scheduler::ensure_banking_stage_setup,
update_bank_forks_and_poh_recorder_for_new_tpu_bank, BankingStage, LikeClusterInfo,
},
banking_trace::{
Expand Down Expand Up @@ -45,6 +46,7 @@ use {
},
solana_streamer::socket::SocketAddrSpace,
solana_turbine::broadcast_stage::{BroadcastStage, BroadcastStageType},
solana_unified_scheduler_pool::DefaultSchedulerPool,
std::{
collections::BTreeMap,
fmt::Display,
Expand Down Expand Up @@ -691,6 +693,7 @@ impl BankingSimulator {
blockstore: Arc<Blockstore>,
block_production_method: BlockProductionMethod,
transaction_struct: TransactionStructure,
unified_scheduler_pool: Option<Arc<DefaultSchedulerPool>>,
) -> (SenderLoop, SimulatorLoop, SimulatorThreads) {
let parent_slot = self.parent_slot().unwrap();
let mut packet_batches_by_time = self.banking_trace_events.packet_batches_by_time;
Expand Down Expand Up @@ -777,14 +780,27 @@ impl BankingSimulator {
let cluster_info_for_banking = Arc::new(DummyClusterInfo {
id: simulated_leader.into(),
});
let banking_tracer_channels = if let Some(pool) = unified_scheduler_pool {
let channels = retracer.create_channels_for_scheduler_pool(&pool);
ensure_banking_stage_setup(
&pool,
&bank_forks,
&channels,
&cluster_info_for_banking,
&poh_recorder,
);
channels
} else {
retracer.create_channels(false)
};
let Channels {
non_vote_sender,
non_vote_receiver,
tpu_vote_sender,
tpu_vote_receiver,
gossip_vote_sender,
gossip_vote_receiver,
} = retracer.create_channels(false);
} = banking_tracer_channels;

let (replay_vote_sender, _replay_vote_receiver) = unbounded();
let (retransmit_slots_sender, retransmit_slots_receiver) = unbounded();
Expand Down Expand Up @@ -907,13 +923,15 @@ impl BankingSimulator {
blockstore: Arc<Blockstore>,
block_production_method: BlockProductionMethod,
transaction_struct: TransactionStructure,
unified_scheduler_pool: Option<Arc<DefaultSchedulerPool>>,
) -> Result<(), SimulateError> {
let (sender_loop, simulator_loop, simulator_threads) = self.prepare_simulation(
genesis_config,
bank_forks,
blockstore,
block_production_method,
transaction_struct,
unified_scheduler_pool,
);

sender_loop.log_starting();
Expand Down
12 changes: 10 additions & 2 deletions core/src/banking_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ use {
vote_sender_types::ReplayVoteSender,
},
solana_sdk::{pubkey::Pubkey, timing::AtomicInterval},
solana_unified_scheduler_logic::SchedulingMode,
std::{
cmp, env,
ops::Deref,
Expand Down Expand Up @@ -391,6 +392,9 @@ impl BankingStage {
prioritization_fee_cache,
)
}
BlockProductionMethod::UnifiedScheduler => Self {
bank_thread_hdls: vec![],
},
}
}

Expand Down Expand Up @@ -735,11 +739,15 @@ pub(crate) fn update_bank_forks_and_poh_recorder_for_new_tpu_bank(
tpu_bank: Bank,
track_transaction_indexes: bool,
) {
let tpu_bank = bank_forks.write().unwrap().insert(tpu_bank);
let tpu_bank = bank_forks
.write()
.unwrap()
.insert_with_scheduling_mode(SchedulingMode::BlockProduction, tpu_bank);
poh_recorder
.write()
.unwrap()
.set_bank(tpu_bank, track_transaction_indexes);
.set_bank(tpu_bank.clone_with_scheduler(), track_transaction_indexes);
tpu_bank.unblock_block_production();
}

#[cfg(test)]
Expand Down
21 changes: 19 additions & 2 deletions core/src/banking_stage/decision_maker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,9 @@ use {
},
pubkey::Pubkey,
},
solana_unified_scheduler_pool::{BankingStageMonitor, BankingStageStatus},
std::{
sync::{Arc, RwLock},
sync::{atomic::Ordering::Relaxed, Arc, RwLock},
time::{Duration, Instant},
},
};
Expand All @@ -31,9 +32,10 @@ impl BufferedPacketsDecision {
}
}

#[derive(Clone)]
#[derive(Clone, derive_more::Debug)]
pub struct DecisionMaker {
my_pubkey: Pubkey,
#[debug("{poh_recorder:p}")]
poh_recorder: Arc<RwLock<PohRecorder>>,

cached_decision: Option<BufferedPacketsDecision>,
Expand Down Expand Up @@ -136,6 +138,21 @@ impl DecisionMaker {
}
}

impl BankingStageMonitor for DecisionMaker {
fn status(&mut self) -> BankingStageStatus {
if self.poh_recorder.read().unwrap().is_exited.load(Relaxed) {
BankingStageStatus::Exited
} else if matches!(
self.make_consume_or_forward_decision(),
BufferedPacketsDecision::Forward,
) {
BankingStageStatus::Inactive
} else {
BankingStageStatus::Active
}
}
}

#[cfg(test)]
mod tests {
use {
Expand Down
1 change: 0 additions & 1 deletion core/src/banking_stage/packet_deserializer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,6 @@ impl PacketDeserializer {
})
}

#[allow(dead_code)]
pub(crate) fn deserialize_packets_with_indexes(
packet_batch: &PacketBatch,
) -> impl Iterator<Item = (ImmutableDeserializedPacket, usize)> + '_ {
Expand Down
15 changes: 12 additions & 3 deletions core/src/banking_stage/unified_scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ use {
packet_deserializer::PacketDeserializer,
LikeClusterInfo,
},
crate::banking_trace::Channels,
crate::{banking_stage::BankingStage, banking_trace::Channels},
agave_banking_stage_ingress_types::BankingPacketBatch,
solana_poh::poh_recorder::PohRecorder,
solana_runtime::{bank_forks::BankForks, root_bank_cache::RootBankCache},
Expand All @@ -49,16 +49,22 @@ pub(crate) fn ensure_banking_stage_setup(
cluster_info: &impl LikeClusterInfo,
poh_recorder: &Arc<RwLock<PohRecorder>>,
) {
if !pool.block_production_supported() {
return;
}

let mut root_bank_cache = RootBankCache::new(bank_forks.clone());
let unified_receiver = channels.unified_receiver().clone();
let mut decision_maker = DecisionMaker::new(cluster_info.id(), poh_recorder.clone());
let block_producing_scheduler_handler_threads = BankingStage::num_threads() as usize;
let banking_stage_monitor = Box::new(decision_maker.clone());
let transaction_recorder = poh_recorder.read().unwrap().new_recorder();

let banking_packet_handler = Box::new(
move |helper: &BankingStageHelper, batches: BankingPacketBatch| {
let decision = decision_maker.make_consume_or_forward_decision();
if matches!(decision, BufferedPacketsDecision::Forward) {
return;
return Ok(());
}
let bank = root_bank_cache.root_bank();
for batch in batches.iter() {
Expand All @@ -80,14 +86,17 @@ pub(crate) fn ensure_banking_stage_setup(
let index = task_id_base + packet_index;

let task = helper.create_new_task(transaction, index);
helper.send_new_task(task);
helper.send_new_task(task)?
}
}
Ok(())
},
);

pool.register_banking_stage(
unified_receiver,
block_producing_scheduler_handler_threads,
banking_stage_monitor,
banking_packet_handler,
transaction_recorder,
);
Expand Down
Loading
Loading