Skip to content

Commit 62acd94

Browse files
authored
remove duplicates from fanout leaders (#5109)
Currently, when sending transactions to future leaders, duplicate leaders may appear in consecutive slots, resulting in unnecessary spam. This PRs removes duplicates from the fanout leaders set.
1 parent 561023d commit 62acd94

File tree

1 file changed

+25
-24
lines changed

1 file changed

+25
-24
lines changed

tpu-client-next/src/connection_workers_scheduler.rs

Lines changed: 25 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -217,29 +217,28 @@ impl ConnectionWorkersScheduler {
217217
}
218218
};
219219

220-
let updated_leaders = leader_updater.next_leaders(leaders_fanout.connect);
220+
let connect_leaders = leader_updater.next_leaders(leaders_fanout.connect);
221+
let send_leaders = extract_send_leaders(&connect_leaders, leaders_fanout.send);
221222

222-
let (fanout_leaders, connect_leaders) =
223-
split_leaders(&updated_leaders, &leaders_fanout);
224223
// add future leaders to the cache to hide the latency of opening
225224
// the connection.
226225
for peer in connect_leaders {
227-
if !workers.contains(peer) {
226+
if !workers.contains(&peer) {
228227
let stats = send_stats_per_addr.entry(peer.ip()).or_default();
229228
let worker = Self::spawn_worker(
230229
&endpoint,
231-
peer,
230+
&peer,
232231
worker_channel_size,
233232
skip_check_transaction_age,
234233
max_reconnect_attempts,
235234
stats.clone(),
236235
);
237-
maybe_shutdown_worker(workers.push(*peer, worker));
236+
maybe_shutdown_worker(workers.push(peer, worker));
238237
}
239238
}
240239

241240
if let Err(error) =
242-
Broadcaster::send_to_workers(&mut workers, fanout_leaders, transaction_batch).await
241+
Broadcaster::send_to_workers(&mut workers, &send_leaders, transaction_batch).await
243242
{
244243
last_error = Some(error);
245244
break;
@@ -338,21 +337,23 @@ impl WorkersBroadcaster for NonblockingBroadcaster {
338337
}
339338
}
340339

341-
/// Splits `leaders` into two slices based on the `fanout` configuration:
342-
/// * the first slice contains the leaders to which transactions will be sent,
343-
/// * the second vector contains the leaders, used to warm up connections. This
344-
/// slice includes the first set.
345-
fn split_leaders<'leaders>(
346-
leaders: &'leaders [SocketAddr],
347-
fanout: &Fanout,
348-
) -> (&'leaders [SocketAddr], &'leaders [SocketAddr]) {
349-
let Fanout { send, connect } = fanout;
350-
assert!(send <= connect);
351-
let send_count = (*send).min(leaders.len());
352-
let connect_count = (*connect).min(leaders.len());
353-
354-
let send_slice = &leaders[..send_count];
355-
let connect_slice = &leaders[..connect_count];
356-
357-
(send_slice, connect_slice)
340+
/// Extracts a list of unique leader addresses to which transactions will be sent.
341+
///
342+
/// This function selects up to `send_fanout` addresses from the `leaders` list, ensuring that
343+
/// only unique addresses are included while maintaining their original order.
344+
fn extract_send_leaders(leaders: &[SocketAddr], send_fanout: usize) -> Vec<SocketAddr> {
345+
let send_count = send_fanout.min(leaders.len());
346+
remove_duplicates(&leaders[..send_count])
347+
}
348+
349+
/// Removes duplicate `SocketAddr` elements from the given slice while
350+
/// preserving their original order.
351+
fn remove_duplicates(input: &[SocketAddr]) -> Vec<SocketAddr> {
352+
let mut res = Vec::with_capacity(input.len());
353+
for address in input {
354+
if !res.contains(address) {
355+
res.push(*address);
356+
}
357+
}
358+
res
358359
}

0 commit comments

Comments
 (0)