Skip to content

Commit

Permalink
Return receiver in scheduler::run
Browse files Browse the repository at this point in the history
  • Loading branch information
KirillLykov committed Jan 14, 2025
1 parent 4d1779b commit 97ed5c2
Show file tree
Hide file tree
Showing 4 changed files with 27 additions and 19 deletions.
11 changes: 8 additions & 3 deletions tls-utils/src/quic_client_certificate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,13 @@ pub struct QuicClientCertificate {
}

impl QuicClientCertificate {
pub fn new(keypair: &Keypair) -> Self {
let (certificate, key) = new_dummy_x509_certificate(keypair);
Self { certificate, key }
pub fn new(keypair: Option<&Keypair>) -> Self {
if let Some(keypair) = keypair {
let (certificate, key) = new_dummy_x509_certificate(keypair);
Self { certificate, key }
} else {
let (certificate, key) = new_dummy_x509_certificate(&Keypair::new());
Self { certificate, key }
}
}
}
19 changes: 10 additions & 9 deletions tpu-client-next/src/connection_workers_scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,11 @@ pub struct ConnectionWorkersSchedulerConfig {
pub leaders_fanout: Fanout,
}

pub type TransactionStatsAndReceiver = (
SendTransactionStatsPerAddr,
mpsc::Receiver<TransactionBatch>,
);

impl ConnectionWorkersScheduler {
/// Starts the scheduler, which manages the distribution of transactions to
/// the network's upcoming leaders.
Expand All @@ -113,8 +118,8 @@ impl ConnectionWorkersScheduler {
mut leader_updater: Box<dyn LeaderUpdater>,
mut transaction_receiver: mpsc::Receiver<TransactionBatch>,
cancel: CancellationToken,
) -> Result<SendTransactionStatsPerAddr, ConnectionWorkersSchedulerError> {
let endpoint = Self::setup_endpoint(bind, validator_identity)?;
) -> Result<TransactionStatsAndReceiver, ConnectionWorkersSchedulerError> {
let endpoint = Self::setup_endpoint(bind, validator_identity.as_ref())?;
debug!("Client endpoint bind address: {:?}", endpoint.local_addr());
let mut workers = WorkersCache::new(num_connections, cancel.clone());
let mut send_stats_per_addr = SendTransactionStatsPerAddr::new();
Expand Down Expand Up @@ -184,19 +189,15 @@ impl ConnectionWorkersScheduler {

endpoint.close(0u32.into(), b"Closing connection");
leader_updater.stop().await;
Ok(send_stats_per_addr)
Ok((send_stats_per_addr, transaction_receiver))
}

/// Sets up the QUIC endpoint for the scheduler to handle connections.
fn setup_endpoint(
bind: SocketAddr,
validator_identity: Option<Keypair>,
identity: Option<&Keypair>,
) -> Result<Endpoint, ConnectionWorkersSchedulerError> {
let client_certificate = if let Some(validator_identity) = validator_identity {
Arc::new(QuicClientCertificate::new(&validator_identity))
} else {
Arc::new(QuicClientCertificate::new(&Keypair::new()))
};
let client_certificate = QuicClientCertificate::new(identity);
let client_config = create_client_config(client_certificate);
let endpoint = create_client_endpoint(bind, client_config)?;
Ok(endpoint)
Expand Down
2 changes: 1 addition & 1 deletion tpu-client-next/src/quic_networking.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ pub use {
solana_tls_utils::QuicClientCertificate,
};

pub(crate) fn create_client_config(client_certificate: Arc<QuicClientCertificate>) -> ClientConfig {
pub(crate) fn create_client_config(client_certificate: QuicClientCertificate) -> ClientConfig {
// adapted from QuicLazyInitializedEndpoint::create_endpoint
let mut crypto = tls_client_config_builder()
.with_client_auth_cert(
Expand Down
14 changes: 8 additions & 6 deletions tpu-client-next/tests/connection_workers_scheduler_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,13 @@ use {
streamer::StakedNodes,
},
solana_tpu_client_next::{
connection_workers_scheduler::{ConnectionWorkersSchedulerConfig, Fanout},
connection_workers_scheduler::{
ConnectionWorkersSchedulerConfig, Fanout, TransactionStatsAndReceiver,
},
leader_updater::create_leader_updater,
send_transaction_stats::SendTransactionStatsNonAtomic,
transaction_batch::TransactionBatch,
ConnectionWorkersScheduler, ConnectionWorkersSchedulerError, SendTransactionStatsPerAddr,
ConnectionWorkersScheduler, ConnectionWorkersSchedulerError,
},
std::{
collections::HashMap,
Expand Down Expand Up @@ -65,7 +67,7 @@ async fn setup_connection_worker_scheduler(
transaction_receiver: Receiver<TransactionBatch>,
validator_identity: Option<Keypair>,
) -> (
JoinHandle<Result<SendTransactionStatsPerAddr, ConnectionWorkersSchedulerError>>,
JoinHandle<Result<TransactionStatsAndReceiver, ConnectionWorkersSchedulerError>>,
CancellationToken,
) {
let json_rpc_url = "http://127.0.0.1:8899";
Expand Down Expand Up @@ -95,10 +97,10 @@ async fn setup_connection_worker_scheduler(

async fn join_scheduler(
scheduler_handle: JoinHandle<
Result<SendTransactionStatsPerAddr, ConnectionWorkersSchedulerError>,
Result<TransactionStatsAndReceiver, ConnectionWorkersSchedulerError>,
>,
) -> SendTransactionStatsNonAtomic {
let stats_per_ip = scheduler_handle
let (stats_per_ip, _) = scheduler_handle
.await
.unwrap()
.expect("Scheduler should stop successfully.");
Expand Down Expand Up @@ -533,7 +535,7 @@ async fn test_no_host() {

// While attempting to establish a connection with a nonexistent host, we fill the worker's
// channel.
let stats = scheduler_handle
let (stats, _) = scheduler_handle
.await
.expect("Scheduler should stop successfully")
.expect("Scheduler execution was successful");
Expand Down

0 comments on commit 97ed5c2

Please sign in to comment.