Skip to content

Commit

Permalink
Make tpu coalescer channel bounded (#4478)
Browse files Browse the repository at this point in the history
* bound the batch sender to avoid excessive memory usage

* bound the batch sender to avoid excessive memory usage

* define a constant for the bounded channel size

* make coalesce channel size configurable for test

* removed some debug log

* set MAX_COALESCE_CHANNEL_SIZE to 10_000_000

* rename MAX_COALESCE_CHANNEL_SIZE to DEFAULT_MAX_COALESCE_CHANNEL_SIZE

* made the test coalesce_channel_size to 100K

* tweak channel size for tests
  • Loading branch information
lijunwangs authored Jan 23, 2025
1 parent d0e3fea commit d63bd1c
Show file tree
Hide file tree
Showing 6 changed files with 29 additions and 5 deletions.
2 changes: 2 additions & 0 deletions core/src/validator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -527,12 +527,14 @@ impl ValidatorTpuConfig {
pub fn new_for_tests(tpu_enable_udp: bool) -> Self {
let tpu_quic_server_config = QuicServerParams {
max_connections_per_ipaddr_per_min: 32,
coalesce_channel_size: 100_000, // smaller channel size for faster test
..Default::default()
};

let tpu_fwd_quic_server_config = QuicServerParams {
max_connections_per_ipaddr_per_min: 32,
max_unstaked_connections: 0,
coalesce_channel_size: 100_000, // smaller channel size for faster test
..Default::default()
};

Expand Down
4 changes: 4 additions & 0 deletions quic-client/tests/quic_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ mod tests {
max_connections_per_peer: 1,
max_staked_connections: 10,
max_unstaked_connections: 10,
coalesce_channel_size: 100_000, // smaller channel size for faster test
..QuicServerParams::default()
},
)
Expand Down Expand Up @@ -170,6 +171,7 @@ mod tests {
max_staked_connections: 10,
max_unstaked_connections: 10,
wait_for_chunk_timeout: Duration::from_secs(1),
coalesce_channel_size: 100_000, // smaller channel size for faster test
..QuicServerParams::default()
},
)
Expand Down Expand Up @@ -233,6 +235,7 @@ mod tests {
max_connections_per_peer: 1,
max_staked_connections: 10,
max_unstaked_connections: 10,
coalesce_channel_size: 100_000, // smaller channel size for faster test
..QuicServerParams::default()
},
)
Expand Down Expand Up @@ -262,6 +265,7 @@ mod tests {
max_connections_per_peer: 1,
max_staked_connections: 10,
max_unstaked_connections: 10,
coalesce_channel_size: 100_000, // smaller channel size for faster test
..QuicServerParams::default()
},
)
Expand Down
11 changes: 7 additions & 4 deletions streamer/src/nonblocking/quic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,7 @@ use {
quic::{configure_server, QuicServerError, QuicServerParams, StreamerStats},
streamer::StakedNodes,
},
async_channel::{
unbounded as async_unbounded, Receiver as AsyncReceiver, Sender as AsyncSender,
},
async_channel::{bounded as async_bounded, Receiver as AsyncReceiver, Sender as AsyncSender},
bytes::Bytes,
crossbeam_channel::Sender,
futures::{stream::FuturesUnordered, Future, StreamExt as _},
Expand Down Expand Up @@ -191,6 +189,7 @@ pub fn spawn_server_multi(
max_connections_per_ipaddr_per_min,
wait_for_chunk_timeout,
coalesce,
coalesce_channel_size,
} = quic_server_params;
let concurrent_connections = max_staked_connections + max_unstaked_connections;
let max_concurrent_connections = concurrent_connections + concurrent_connections / 4;
Expand Down Expand Up @@ -223,6 +222,7 @@ pub fn spawn_server_multi(
stats.clone(),
wait_for_chunk_timeout,
coalesce,
coalesce_channel_size,
max_concurrent_connections,
));
Ok(SpawnNonBlockingServerResult {
Expand Down Expand Up @@ -293,6 +293,7 @@ async fn run_server(
stats: Arc<StreamerStats>,
wait_for_chunk_timeout: Duration,
coalesce: Duration,
coalesce_channel_size: usize,
max_concurrent_connections: usize,
) {
let rate_limiter = ConnectionRateLimiter::new(max_connections_per_ipaddr_per_min);
Expand All @@ -314,7 +315,7 @@ async fn run_server(
.store(endpoints.len(), Ordering::Relaxed);
let staked_connection_table: Arc<Mutex<ConnectionTable>> =
Arc::new(Mutex::new(ConnectionTable::new()));
let (sender, receiver) = async_unbounded();
let (sender, receiver) = async_bounded(coalesce_channel_size);
tokio::spawn(packet_batch_sender(
packet_sender,
receiver,
Expand Down Expand Up @@ -1980,6 +1981,7 @@ pub mod test {
staked_nodes,
QuicServerParams {
max_unstaked_connections: 0, // Do not allow any connection from unstaked clients/nodes
coalesce_channel_size: 100_000, // smaller channel size for faster test
..QuicServerParams::default()
},
)
Expand Down Expand Up @@ -2013,6 +2015,7 @@ pub mod test {
staked_nodes,
QuicServerParams {
max_connections_per_peer: 2,
coalesce_channel_size: 100_000, // smaller channel size for faster test
..QuicServerParams::default()
},
)
Expand Down
4 changes: 4 additions & 0 deletions streamer/src/nonblocking/testing_utilities.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ pub struct TestServerConfig {
pub max_unstaked_connections: usize,
pub max_streams_per_ms: u64,
pub max_connections_per_ipaddr_per_min: u64,
pub coalesce_channel_size: usize,
}

impl Default for TestServerConfig {
Expand All @@ -69,6 +70,7 @@ impl Default for TestServerConfig {
max_unstaked_connections: DEFAULT_MAX_UNSTAKED_CONNECTIONS,
max_streams_per_ms: DEFAULT_MAX_STREAMS_PER_MS,
max_connections_per_ipaddr_per_min: DEFAULT_MAX_CONNECTIONS_PER_IPADDR_PER_MINUTE,
coalesce_channel_size: 100_000, // use a smaller value for test as create a huge bounded channel can take time
}
}
}
Expand Down Expand Up @@ -122,6 +124,7 @@ pub fn setup_quic_server_with_sockets(
max_unstaked_connections,
max_streams_per_ms,
max_connections_per_ipaddr_per_min,
coalesce_channel_size,
}: TestServerConfig,
) -> SpawnTestServerResult {
let exit = Arc::new(AtomicBool::new(false));
Expand All @@ -137,6 +140,7 @@ pub fn setup_quic_server_with_sockets(
max_connections_per_ipaddr_per_min,
wait_for_chunk_timeout: DEFAULT_WAIT_FOR_CHUNK_TIMEOUT,
coalesce: DEFAULT_TPU_COALESCE,
coalesce_channel_size,
};
let SpawnNonBlockingServerResult {
endpoints: _,
Expand Down
12 changes: 11 additions & 1 deletion streamer/src/quic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,9 @@ pub struct SpawnServerResult {
pub key_updater: Arc<EndpointKeyUpdater>,
}

/// Controls the the channel size for the PacketBatch coalesce
pub(crate) const DEFAULT_MAX_COALESCE_CHANNEL_SIZE: usize = 10_000_000;

/// Returns default server configuration along with its PEM certificate chain.
#[allow(clippy::field_reassign_with_default)] // https://github.com/rust-lang/rust-clippy/issues/6527
pub(crate) fn configure_server(
Expand Down Expand Up @@ -594,6 +597,7 @@ pub struct QuicServerParams {
pub max_connections_per_ipaddr_per_min: u64,
pub wait_for_chunk_timeout: Duration,
pub coalesce: Duration,
pub coalesce_channel_size: usize,
}

impl Default for QuicServerParams {
Expand All @@ -606,6 +610,7 @@ impl Default for QuicServerParams {
max_connections_per_ipaddr_per_min: DEFAULT_MAX_CONNECTIONS_PER_IPADDR_PER_MINUTE,
wait_for_chunk_timeout: DEFAULT_WAIT_FOR_CHUNK_TIMEOUT,
coalesce: DEFAULT_TPU_COALESCE,
coalesce_channel_size: DEFAULT_MAX_COALESCE_CHANNEL_SIZE,
}
}
}
Expand Down Expand Up @@ -685,7 +690,10 @@ mod test {
sender,
exit.clone(),
staked_nodes,
QuicServerParams::default(),
QuicServerParams {
coalesce_channel_size: 100_000, // smaller channel size for faster test
..Default::default()
},
)
.unwrap();
(t, exit, receiver, server_address)
Expand Down Expand Up @@ -742,6 +750,7 @@ mod test {
staked_nodes,
QuicServerParams {
max_connections_per_peer: 2,
coalesce_channel_size: 100_000, // smaller channel size for faster test
..QuicServerParams::default()
},
)
Expand Down Expand Up @@ -787,6 +796,7 @@ mod test {
staked_nodes,
QuicServerParams {
max_unstaked_connections: 0,
coalesce_channel_size: 100_000, // smaller channel size for faster test
..QuicServerParams::default()
},
)
Expand Down
1 change: 1 addition & 0 deletions vortexor/src/vortexor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ impl Vortexor {
max_connections_per_ipaddr_per_min,
wait_for_chunk_timeout: DEFAULT_WAIT_FOR_CHUNK_TIMEOUT,
coalesce: tpu_coalesce,
..Default::default()
};

let TpuSockets {
Expand Down

0 comments on commit d63bd1c

Please sign in to comment.