From 62acd9419052c861c588c6c8d3bf42aa4645424c Mon Sep 17 00:00:00 2001 From: kirill lykov Date: Tue, 4 Mar 2025 10:17:05 +0100 Subject: [PATCH] remove duplicates from fanout leaders (#5109) Currently, when sending transactions to future leaders, duplicate leaders may appear in consecutive slots, resulting in unnecessary spam. This PRs removes duplicates from the fanout leaders set. --- .../src/connection_workers_scheduler.rs | 49 ++++++++++--------- 1 file changed, 25 insertions(+), 24 deletions(-) diff --git a/tpu-client-next/src/connection_workers_scheduler.rs b/tpu-client-next/src/connection_workers_scheduler.rs index 107da3dd6639d4..6d303d99b0f9bd 100644 --- a/tpu-client-next/src/connection_workers_scheduler.rs +++ b/tpu-client-next/src/connection_workers_scheduler.rs @@ -217,29 +217,28 @@ impl ConnectionWorkersScheduler { } }; - let updated_leaders = leader_updater.next_leaders(leaders_fanout.connect); + let connect_leaders = leader_updater.next_leaders(leaders_fanout.connect); + let send_leaders = extract_send_leaders(&connect_leaders, leaders_fanout.send); - let (fanout_leaders, connect_leaders) = - split_leaders(&updated_leaders, &leaders_fanout); // add future leaders to the cache to hide the latency of opening // the connection. for peer in connect_leaders { - if !workers.contains(peer) { + if !workers.contains(&peer) { let stats = send_stats_per_addr.entry(peer.ip()).or_default(); let worker = Self::spawn_worker( &endpoint, - peer, + &peer, worker_channel_size, skip_check_transaction_age, max_reconnect_attempts, stats.clone(), ); - maybe_shutdown_worker(workers.push(*peer, worker)); + maybe_shutdown_worker(workers.push(peer, worker)); } } if let Err(error) = - Broadcaster::send_to_workers(&mut workers, fanout_leaders, transaction_batch).await + Broadcaster::send_to_workers(&mut workers, &send_leaders, transaction_batch).await { last_error = Some(error); break; @@ -338,21 +337,23 @@ impl WorkersBroadcaster for NonblockingBroadcaster { } } -/// Splits `leaders` into two slices based on the `fanout` configuration: -/// * the first slice contains the leaders to which transactions will be sent, -/// * the second vector contains the leaders, used to warm up connections. This -/// slice includes the first set. -fn split_leaders<'leaders>( - leaders: &'leaders [SocketAddr], - fanout: &Fanout, -) -> (&'leaders [SocketAddr], &'leaders [SocketAddr]) { - let Fanout { send, connect } = fanout; - assert!(send <= connect); - let send_count = (*send).min(leaders.len()); - let connect_count = (*connect).min(leaders.len()); - - let send_slice = &leaders[..send_count]; - let connect_slice = &leaders[..connect_count]; - - (send_slice, connect_slice) +/// Extracts a list of unique leader addresses to which transactions will be sent. +/// +/// This function selects up to `send_fanout` addresses from the `leaders` list, ensuring that +/// only unique addresses are included while maintaining their original order. +fn extract_send_leaders(leaders: &[SocketAddr], send_fanout: usize) -> Vec { + let send_count = send_fanout.min(leaders.len()); + remove_duplicates(&leaders[..send_count]) +} + +/// Removes duplicate `SocketAddr` elements from the given slice while +/// preserving their original order. +fn remove_duplicates(input: &[SocketAddr]) -> Vec { + let mut res = Vec::with_capacity(input.len()); + for address in input { + if !res.contains(address) { + res.push(*address); + } + } + res }