diff --git a/streamer/src/nonblocking/quic.rs b/streamer/src/nonblocking/quic.rs index cdf769cc65fda0..5dd0b9470f01a9 100644 --- a/streamer/src/nonblocking/quic.rs +++ b/streamer/src/nonblocking/quic.rs @@ -103,8 +103,6 @@ const TOTAL_CONNECTIONS_PER_SECOND: u64 = 2500; /// entries used by past requests. const CONNECTION_RATE_LIMITER_CLEANUP_SIZE_THRESHOLD: usize = 100_000; -const MAX_COALESCE_CHANNEL_SIZE: usize = 10_000_000; - // A struct to accumulate the bytes making up // a packet, along with their offsets, and the // packet metadata. We use this accumulator to avoid @@ -184,6 +182,7 @@ pub fn spawn_server_multi( max_connections_per_ipaddr_per_min, wait_for_chunk_timeout, coalesce, + coalesce_channel_size, } = quic_server_params; let concurrent_connections = max_staked_connections + max_unstaked_connections; let max_concurrent_connections = concurrent_connections + concurrent_connections / 4; @@ -216,6 +215,7 @@ pub fn spawn_server_multi( stats.clone(), wait_for_chunk_timeout, coalesce, + coalesce_channel_size, max_concurrent_connections, )); Ok(SpawnNonBlockingServerResult { @@ -286,6 +286,7 @@ async fn run_server( stats: Arc, wait_for_chunk_timeout: Duration, coalesce: Duration, + coalesce_channel_size: usize, max_concurrent_connections: usize, ) { let rate_limiter = ConnectionRateLimiter::new(max_connections_per_ipaddr_per_min); @@ -307,7 +308,9 @@ async fn run_server( .store(endpoints.len(), Ordering::Relaxed); let staked_connection_table: Arc> = Arc::new(Mutex::new(ConnectionTable::new())); - let (sender, receiver) = async_bounded(MAX_COALESCE_CHANNEL_SIZE); + info!("Creating the packet_batch_sender, and async channel"); + let (sender, receiver) = async_bounded(coalesce_channel_size); + info!("Created async channel"); tokio::spawn(packet_batch_sender( packet_sender, receiver, diff --git a/streamer/src/nonblocking/testing_utilities.rs b/streamer/src/nonblocking/testing_utilities.rs index 78adfd3171b52c..bc1ec2f7c9f1e6 100644 --- a/streamer/src/nonblocking/testing_utilities.rs +++ b/streamer/src/nonblocking/testing_utilities.rs @@ -58,6 +58,7 @@ pub struct TestServerConfig { pub max_unstaked_connections: usize, pub max_streams_per_ms: u64, pub max_connections_per_ipaddr_per_min: u64, + pub coalesce_channel_size: usize, } impl Default for TestServerConfig { @@ -68,6 +69,7 @@ impl Default for TestServerConfig { max_unstaked_connections: MAX_UNSTAKED_CONNECTIONS, max_streams_per_ms: DEFAULT_MAX_STREAMS_PER_MS, max_connections_per_ipaddr_per_min: DEFAULT_MAX_CONNECTIONS_PER_IPADDR_PER_MINUTE, + coalesce_channel_size: 500_000, // use a smaller value for test as create a huge bounded channel can take time } } } @@ -121,6 +123,7 @@ pub fn setup_quic_server_with_sockets( max_unstaked_connections, max_streams_per_ms, max_connections_per_ipaddr_per_min, + coalesce_channel_size, }: TestServerConfig, ) -> SpawnTestServerResult { let exit = Arc::new(AtomicBool::new(false)); @@ -136,6 +139,7 @@ pub fn setup_quic_server_with_sockets( max_connections_per_ipaddr_per_min, wait_for_chunk_timeout: DEFAULT_WAIT_FOR_CHUNK_TIMEOUT, coalesce: DEFAULT_TPU_COALESCE, + coalesce_channel_size, }; let SpawnNonBlockingServerResult { endpoints: _, diff --git a/streamer/src/quic.rs b/streamer/src/quic.rs index 4a290d17c43714..2647ddbe28fa29 100644 --- a/streamer/src/quic.rs +++ b/streamer/src/quic.rs @@ -46,6 +46,9 @@ pub struct SpawnServerResult { pub key_updater: Arc, } +/// Controls the the channel size for the PacketBatch coalesce +pub(crate) const MAX_COALESCE_CHANNEL_SIZE: usize = 1_000_000; + /// Returns default server configuration along with its PEM certificate chain. #[allow(clippy::field_reassign_with_default)] // https://github.com/rust-lang/rust-clippy/issues/6527 pub(crate) fn configure_server( @@ -573,6 +576,7 @@ pub struct QuicServerParams { pub max_connections_per_ipaddr_per_min: u64, pub wait_for_chunk_timeout: Duration, pub coalesce: Duration, + pub coalesce_channel_size: usize, } impl Default for QuicServerParams { @@ -585,6 +589,7 @@ impl Default for QuicServerParams { max_connections_per_ipaddr_per_min: DEFAULT_MAX_CONNECTIONS_PER_IPADDR_PER_MINUTE, wait_for_chunk_timeout: DEFAULT_WAIT_FOR_CHUNK_TIMEOUT, coalesce: DEFAULT_TPU_COALESCE, + coalesce_channel_size: MAX_COALESCE_CHANNEL_SIZE, } } } diff --git a/vortexor/src/vortexor.rs b/vortexor/src/vortexor.rs index 21a81c2d06da44..5a2c94aad14220 100644 --- a/vortexor/src/vortexor.rs +++ b/vortexor/src/vortexor.rs @@ -113,6 +113,7 @@ impl Vortexor { max_connections_per_ipaddr_per_min, wait_for_chunk_timeout: DEFAULT_WAIT_FOR_CHUNK_TIMEOUT, coalesce: tpu_coalesce, + ..Default::default() }; let TpuSockets {