Skip to content

Commit f48cb9f

Browse files
committed
Return receiver in scheduler::run
1 parent b6fcfef commit f48cb9f

File tree

4 files changed

+37
-30
lines changed

4 files changed

+37
-30
lines changed

tls-utils/src/quic_client_certificate.rs

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,13 @@ pub struct QuicClientCertificate {
1010
}
1111

1212
impl QuicClientCertificate {
13-
pub fn new(keypair: &Keypair) -> Self {
14-
let (certificate, key) = new_dummy_x509_certificate(keypair);
15-
Self { certificate, key }
13+
pub fn new(keypair: Option<&Keypair>) -> Self {
14+
if let Some(keypair) = keypair {
15+
let (certificate, key) = new_dummy_x509_certificate(keypair);
16+
Self { certificate, key }
17+
} else {
18+
let (certificate, key) = new_dummy_x509_certificate(&Keypair::new());
19+
Self { certificate, key }
20+
}
1621
}
1722
}

tpu-client-next/src/connection_workers_scheduler.rs

Lines changed: 13 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ pub struct ConnectionWorkersSchedulerConfig {
7070

7171
/// Optional stake identity keypair used in the endpoint certificate for
7272
/// identifying the sender.
73-
pub stake_identity: Option<Keypair>,
73+
pub identity: Option<Keypair>,
7474

7575
/// The number of connections to be maintained by the scheduler.
7676
pub num_connections: usize,
@@ -90,6 +90,11 @@ pub struct ConnectionWorkersSchedulerConfig {
9090
pub leaders_fanout: Fanout,
9191
}
9292

93+
pub type TransactionStatsAndReceiver = (
94+
SendTransactionStatsPerAddr,
95+
mpsc::Receiver<TransactionBatch>,
96+
);
97+
9398
impl ConnectionWorkersScheduler {
9499
/// Starts the scheduler, which manages the distribution of transactions to
95100
/// the network's upcoming leaders.
@@ -103,7 +108,7 @@ impl ConnectionWorkersScheduler {
103108
pub async fn run(
104109
ConnectionWorkersSchedulerConfig {
105110
bind,
106-
stake_identity: validator_identity,
111+
identity,
107112
num_connections,
108113
skip_check_transaction_age,
109114
worker_channel_size,
@@ -113,14 +118,14 @@ impl ConnectionWorkersScheduler {
113118
mut leader_updater: Box<dyn LeaderUpdater>,
114119
mut transaction_receiver: mpsc::Receiver<TransactionBatch>,
115120
cancel: CancellationToken,
116-
) -> Result<SendTransactionStatsPerAddr, ConnectionWorkersSchedulerError> {
117-
let endpoint = Self::setup_endpoint(bind, validator_identity)?;
121+
) -> Result<TransactionStatsAndReceiver, ConnectionWorkersSchedulerError> {
122+
let endpoint = Self::setup_endpoint(bind, identity.as_ref())?;
118123
debug!("Client endpoint bind address: {:?}", endpoint.local_addr());
119124
let mut workers = WorkersCache::new(num_connections, cancel.clone());
120125
let mut send_stats_per_addr = SendTransactionStatsPerAddr::new();
121126

122127
loop {
123-
let transaction_batch = tokio::select! {
128+
let transaction_batch: TransactionBatch = tokio::select! {
124129
recv_res = transaction_receiver.recv() => match recv_res {
125130
Some(txs) => txs,
126131
None => {
@@ -184,19 +189,15 @@ impl ConnectionWorkersScheduler {
184189

185190
endpoint.close(0u32.into(), b"Closing connection");
186191
leader_updater.stop().await;
187-
Ok(send_stats_per_addr)
192+
Ok((send_stats_per_addr, transaction_receiver))
188193
}
189194

190195
/// Sets up the QUIC endpoint for the scheduler to handle connections.
191196
fn setup_endpoint(
192197
bind: SocketAddr,
193-
validator_identity: Option<Keypair>,
198+
identity: Option<&Keypair>,
194199
) -> Result<Endpoint, ConnectionWorkersSchedulerError> {
195-
let client_certificate = if let Some(validator_identity) = validator_identity {
196-
Arc::new(QuicClientCertificate::new(&validator_identity))
197-
} else {
198-
Arc::new(QuicClientCertificate::new(&Keypair::new()))
199-
};
200+
let client_certificate = QuicClientCertificate::new(identity);
200201
let client_config = create_client_config(client_certificate);
201202
let endpoint = create_client_endpoint(bind, client_config)?;
202203
Ok(endpoint)

tpu-client-next/src/quic_networking.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ pub use {
1818
solana_tls_utils::QuicClientCertificate,
1919
};
2020

21-
pub(crate) fn create_client_config(client_certificate: Arc<QuicClientCertificate>) -> ClientConfig {
21+
pub(crate) fn create_client_config(client_certificate: QuicClientCertificate) -> ClientConfig {
2222
// adapted from QuicLazyInitializedEndpoint::create_endpoint
2323
let mut crypto = tls_client_config_builder()
2424
.with_client_auth_cert(

tpu-client-next/tests/connection_workers_scheduler_test.rs

Lines changed: 15 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -15,11 +15,13 @@ use {
1515
streamer::StakedNodes,
1616
},
1717
solana_tpu_client_next::{
18-
connection_workers_scheduler::{ConnectionWorkersSchedulerConfig, Fanout},
18+
connection_workers_scheduler::{
19+
ConnectionWorkersSchedulerConfig, Fanout, TransactionStatsAndReceiver,
20+
},
1921
leader_updater::create_leader_updater,
2022
send_transaction_stats::SendTransactionStatsNonAtomic,
2123
transaction_batch::TransactionBatch,
22-
ConnectionWorkersScheduler, ConnectionWorkersSchedulerError, SendTransactionStatsPerAddr,
24+
ConnectionWorkersScheduler, ConnectionWorkersSchedulerError,
2325
},
2426
std::{
2527
collections::HashMap,
@@ -40,10 +42,10 @@ use {
4042
tokio_util::sync::CancellationToken,
4143
};
4244

43-
fn test_config(validator_identity: Option<Keypair>) -> ConnectionWorkersSchedulerConfig {
45+
fn test_config(identity: Option<Keypair>) -> ConnectionWorkersSchedulerConfig {
4446
ConnectionWorkersSchedulerConfig {
4547
bind: SocketAddr::new(Ipv4Addr::new(127, 0, 0, 1).into(), 0),
46-
stake_identity: validator_identity,
48+
identity,
4749
num_connections: 1,
4850
skip_check_transaction_age: false,
4951
// At the moment we have only one strategy to send transactions: we try
@@ -63,9 +65,9 @@ fn test_config(validator_identity: Option<Keypair>) -> ConnectionWorkersSchedule
6365
async fn setup_connection_worker_scheduler(
6466
tpu_address: SocketAddr,
6567
transaction_receiver: Receiver<TransactionBatch>,
66-
validator_identity: Option<Keypair>,
68+
identity: Option<Keypair>,
6769
) -> (
68-
JoinHandle<Result<SendTransactionStatsPerAddr, ConnectionWorkersSchedulerError>>,
70+
JoinHandle<Result<TransactionStatsAndReceiver, ConnectionWorkersSchedulerError>>,
6971
CancellationToken,
7072
) {
7173
let json_rpc_url = "http://127.0.0.1:8899";
@@ -82,7 +84,7 @@ async fn setup_connection_worker_scheduler(
8284
.expect("Leader updates was successfully created");
8385

8486
let cancel = CancellationToken::new();
85-
let config = test_config(validator_identity);
87+
let config = test_config(identity);
8688
let scheduler = tokio::spawn(ConnectionWorkersScheduler::run(
8789
config,
8890
leader_updater,
@@ -95,10 +97,10 @@ async fn setup_connection_worker_scheduler(
9597

9698
async fn join_scheduler(
9799
scheduler_handle: JoinHandle<
98-
Result<SendTransactionStatsPerAddr, ConnectionWorkersSchedulerError>,
100+
Result<TransactionStatsAndReceiver, ConnectionWorkersSchedulerError>,
99101
>,
100102
) -> SendTransactionStatsNonAtomic {
101-
let stats_per_ip = scheduler_handle
103+
let (stats_per_ip, _) = scheduler_handle
102104
.await
103105
.unwrap()
104106
.expect("Scheduler should stop successfully.");
@@ -400,8 +402,8 @@ async fn test_connection_pruned_and_reopened() {
400402
/// connection and verify that all the txs has been received.
401403
#[tokio::test]
402404
async fn test_staked_connection() {
403-
let validator_identity = Keypair::new();
404-
let stakes = HashMap::from([(validator_identity.pubkey(), 100_000)]);
405+
let identity = Keypair::new();
406+
let stakes = HashMap::from([(identity.pubkey(), 100_000)]);
405407
let staked_nodes = StakedNodes::new(Arc::new(stakes), HashMap::<Pubkey, u64>::default());
406408

407409
let SpawnTestServerResult {
@@ -432,8 +434,7 @@ async fn test_staked_connection() {
432434
} = spawn_tx_sender(tx_size, expected_num_txs, Duration::from_millis(100));
433435

434436
let (scheduler_handle, _scheduler_cancel) =
435-
setup_connection_worker_scheduler(server_address, tx_receiver, Some(validator_identity))
436-
.await;
437+
setup_connection_worker_scheduler(server_address, tx_receiver, Some(identity)).await;
437438

438439
// Check results
439440
let actual_num_packets = count_received_packets_for(receiver, tx_size, TEST_MAX_TIME).await;
@@ -533,7 +534,7 @@ async fn test_no_host() {
533534

534535
// While attempting to establish a connection with a nonexistent host, we fill the worker's
535536
// channel.
536-
let stats = scheduler_handle
537+
let (stats, _) = scheduler_handle
537538
.await
538539
.expect("Scheduler should stop successfully")
539540
.expect("Scheduler execution was successful");

0 commit comments

Comments
 (0)