Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

v2.1: tpu-client-next: return receiver in scheduler::run (backport of #4454) #4521

Merged
merged 1 commit into from
Jan 24, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 16 additions & 12 deletions tpu-client-next/src/connection_workers_scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,20 +90,28 @@ pub struct ConnectionWorkersSchedulerConfig {
pub leaders_fanout: Fanout,
}

pub type TransactionStatsAndReceiver = (
SendTransactionStatsPerAddr,
mpsc::Receiver<TransactionBatch>,
);

impl ConnectionWorkersScheduler {
/// Starts the scheduler, which manages the distribution of transactions to
/// the network's upcoming leaders.
///
/// Runs the main loop that handles worker scheduling and management for
/// connections. Returns the error quic statistics per connection address or
/// an error.
/// an error along with receiver for transactions. The receiver returned
/// back to the user because in some cases we need to re-utilize the same
/// receiver for the new scheduler. For example, this happens when the
/// identity for the validator is updated.
///
/// Importantly, if some transactions were not delivered due to network
/// problems, they will not be retried when the problem is resolved.
pub async fn run(
ConnectionWorkersSchedulerConfig {
bind,
stake_identity: validator_identity,
stake_identity,
num_connections,
skip_check_transaction_age,
worker_channel_size,
Expand All @@ -113,14 +121,14 @@ impl ConnectionWorkersScheduler {
mut leader_updater: Box<dyn LeaderUpdater>,
mut transaction_receiver: mpsc::Receiver<TransactionBatch>,
cancel: CancellationToken,
) -> Result<SendTransactionStatsPerAddr, ConnectionWorkersSchedulerError> {
let endpoint = Self::setup_endpoint(bind, validator_identity)?;
) -> Result<TransactionStatsAndReceiver, ConnectionWorkersSchedulerError> {
let endpoint = Self::setup_endpoint(bind, stake_identity.as_ref())?;
debug!("Client endpoint bind address: {:?}", endpoint.local_addr());
let mut workers = WorkersCache::new(num_connections, cancel.clone());
let mut send_stats_per_addr = SendTransactionStatsPerAddr::new();

loop {
let transaction_batch = tokio::select! {
let transaction_batch: TransactionBatch = tokio::select! {
recv_res = transaction_receiver.recv() => match recv_res {
Some(txs) => txs,
None => {
Expand Down Expand Up @@ -184,19 +192,15 @@ impl ConnectionWorkersScheduler {

endpoint.close(0u32.into(), b"Closing connection");
leader_updater.stop().await;
Ok(send_stats_per_addr)
Ok((send_stats_per_addr, transaction_receiver))
}

/// Sets up the QUIC endpoint for the scheduler to handle connections.
fn setup_endpoint(
bind: SocketAddr,
validator_identity: Option<Keypair>,
stake_identity: Option<&Keypair>,
) -> Result<Endpoint, ConnectionWorkersSchedulerError> {
let client_certificate = if let Some(validator_identity) = validator_identity {
Arc::new(QuicClientCertificate::new(&validator_identity))
} else {
Arc::new(QuicClientCertificate::new(&Keypair::new()))
};
let client_certificate = QuicClientCertificate::new(stake_identity);
let client_config = create_client_config(client_certificate);
let endpoint = create_client_endpoint(bind, client_config)?;
Ok(endpoint)
Expand Down
2 changes: 1 addition & 1 deletion tpu-client-next/src/quic_networking.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ pub use {
quic_client_certificate::QuicClientCertificate,
};

pub(crate) fn create_client_config(client_certificate: Arc<QuicClientCertificate>) -> ClientConfig {
pub(crate) fn create_client_config(client_certificate: QuicClientCertificate) -> ClientConfig {
// adapted from QuicLazyInitializedEndpoint::create_endpoint
let mut crypto = rustls::ClientConfig::builder()
.dangerous()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,12 @@ pub struct QuicClientCertificate {
}

impl QuicClientCertificate {
pub fn new(keypair: &Keypair) -> Self {
pub fn new(keypair: Option<&Keypair>) -> Self {
let keypair = if let Some(keypair) = keypair {
keypair
} else {
&Keypair::new()
};
let (certificate, key) = new_dummy_x509_certificate(keypair);
Self { certificate, key }
}
Expand Down
29 changes: 15 additions & 14 deletions tpu-client-next/tests/connection_workers_scheduler_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,13 @@ use {
streamer::StakedNodes,
},
solana_tpu_client_next::{
connection_workers_scheduler::{ConnectionWorkersSchedulerConfig, Fanout},
connection_workers_scheduler::{
ConnectionWorkersSchedulerConfig, Fanout, TransactionStatsAndReceiver,
},
leader_updater::create_leader_updater,
send_transaction_stats::SendTransactionStatsNonAtomic,
transaction_batch::TransactionBatch,
ConnectionWorkersScheduler, ConnectionWorkersSchedulerError, SendTransactionStatsPerAddr,
ConnectionWorkersScheduler, ConnectionWorkersSchedulerError,
},
std::{
collections::HashMap,
Expand All @@ -41,10 +43,10 @@ use {
tokio_util::sync::CancellationToken,
};

fn test_config(validator_identity: Option<Keypair>) -> ConnectionWorkersSchedulerConfig {
fn test_config(stake_identity: Option<Keypair>) -> ConnectionWorkersSchedulerConfig {
ConnectionWorkersSchedulerConfig {
bind: SocketAddr::new(Ipv4Addr::new(127, 0, 0, 1).into(), 0),
stake_identity: validator_identity,
stake_identity,
num_connections: 1,
skip_check_transaction_age: false,
// At the moment we have only one strategy to send transactions: we try
Expand All @@ -64,9 +66,9 @@ fn test_config(validator_identity: Option<Keypair>) -> ConnectionWorkersSchedule
async fn setup_connection_worker_scheduler(
tpu_address: SocketAddr,
transaction_receiver: Receiver<TransactionBatch>,
validator_identity: Option<Keypair>,
stake_identity: Option<Keypair>,
) -> (
JoinHandle<Result<SendTransactionStatsPerAddr, ConnectionWorkersSchedulerError>>,
JoinHandle<Result<TransactionStatsAndReceiver, ConnectionWorkersSchedulerError>>,
CancellationToken,
) {
let json_rpc_url = "http://127.0.0.1:8899";
Expand All @@ -83,7 +85,7 @@ async fn setup_connection_worker_scheduler(
.expect("Leader updates was successfully created");

let cancel = CancellationToken::new();
let config = test_config(validator_identity);
let config = test_config(stake_identity);
let scheduler = tokio::spawn(ConnectionWorkersScheduler::run(
config,
leader_updater,
Expand All @@ -96,10 +98,10 @@ async fn setup_connection_worker_scheduler(

async fn join_scheduler(
scheduler_handle: JoinHandle<
Result<SendTransactionStatsPerAddr, ConnectionWorkersSchedulerError>,
Result<TransactionStatsAndReceiver, ConnectionWorkersSchedulerError>,
>,
) -> SendTransactionStatsNonAtomic {
let stats_per_ip = scheduler_handle
let (stats_per_ip, _) = scheduler_handle
.await
.unwrap()
.expect("Scheduler should stop successfully.");
Expand Down Expand Up @@ -401,8 +403,8 @@ async fn test_connection_pruned_and_reopened() {
/// connection and verify that all the txs has been received.
#[tokio::test]
async fn test_staked_connection() {
let validator_identity = Keypair::new();
let stakes = HashMap::from([(validator_identity.pubkey(), 100_000)]);
let stake_identity = Keypair::new();
let stakes = HashMap::from([(stake_identity.pubkey(), 100_000)]);
let staked_nodes = StakedNodes::new(Arc::new(stakes), HashMap::<Pubkey, u64>::default());

let SpawnTestServerResult {
Expand Down Expand Up @@ -433,8 +435,7 @@ async fn test_staked_connection() {
} = spawn_tx_sender(tx_size, expected_num_txs, Duration::from_millis(100));

let (scheduler_handle, _scheduler_cancel) =
setup_connection_worker_scheduler(server_address, tx_receiver, Some(validator_identity))
.await;
setup_connection_worker_scheduler(server_address, tx_receiver, Some(stake_identity)).await;

// Check results
let actual_num_packets = count_received_packets_for(receiver, tx_size, TEST_MAX_TIME).await;
Expand Down Expand Up @@ -534,7 +535,7 @@ async fn test_no_host() {

// While attempting to establish a connection with a nonexistent host, we fill the worker's
// channel.
let stats = scheduler_handle
let (stats, _) = scheduler_handle
.await
.expect("Scheduler should stop successfully")
.expect("Scheduler execution was successful");
Expand Down
Loading