Skip to content

Commit

Permalink
make coalesce channel size configurable for test
Browse files Browse the repository at this point in the history
  • Loading branch information
lijunwangs committed Jan 16, 2025
1 parent 42046bc commit c226463
Show file tree
Hide file tree
Showing 4 changed files with 16 additions and 3 deletions.
9 changes: 6 additions & 3 deletions streamer/src/nonblocking/quic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,8 +103,6 @@ const TOTAL_CONNECTIONS_PER_SECOND: u64 = 2500;
/// entries used by past requests.
const CONNECTION_RATE_LIMITER_CLEANUP_SIZE_THRESHOLD: usize = 100_000;

const MAX_COALESCE_CHANNEL_SIZE: usize = 10_000_000;

// A struct to accumulate the bytes making up
// a packet, along with their offsets, and the
// packet metadata. We use this accumulator to avoid
Expand Down Expand Up @@ -184,6 +182,7 @@ pub fn spawn_server_multi(
max_connections_per_ipaddr_per_min,
wait_for_chunk_timeout,
coalesce,
coalesce_channel_size,
} = quic_server_params;
let concurrent_connections = max_staked_connections + max_unstaked_connections;
let max_concurrent_connections = concurrent_connections + concurrent_connections / 4;
Expand Down Expand Up @@ -216,6 +215,7 @@ pub fn spawn_server_multi(
stats.clone(),
wait_for_chunk_timeout,
coalesce,
coalesce_channel_size,
max_concurrent_connections,
));
Ok(SpawnNonBlockingServerResult {
Expand Down Expand Up @@ -286,6 +286,7 @@ async fn run_server(
stats: Arc<StreamerStats>,
wait_for_chunk_timeout: Duration,
coalesce: Duration,
coalesce_channel_size: usize,
max_concurrent_connections: usize,
) {
let rate_limiter = ConnectionRateLimiter::new(max_connections_per_ipaddr_per_min);
Expand All @@ -307,7 +308,9 @@ async fn run_server(
.store(endpoints.len(), Ordering::Relaxed);
let staked_connection_table: Arc<Mutex<ConnectionTable>> =
Arc::new(Mutex::new(ConnectionTable::new()));
let (sender, receiver) = async_bounded(MAX_COALESCE_CHANNEL_SIZE);
info!("Creating the packet_batch_sender, and async channel");
let (sender, receiver) = async_bounded(coalesce_channel_size);
info!("Created async channel");
tokio::spawn(packet_batch_sender(
packet_sender,
receiver,
Expand Down
4 changes: 4 additions & 0 deletions streamer/src/nonblocking/testing_utilities.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ pub struct TestServerConfig {
pub max_unstaked_connections: usize,
pub max_streams_per_ms: u64,
pub max_connections_per_ipaddr_per_min: u64,
pub coalesce_channel_size: usize,
}

impl Default for TestServerConfig {
Expand All @@ -68,6 +69,7 @@ impl Default for TestServerConfig {
max_unstaked_connections: MAX_UNSTAKED_CONNECTIONS,
max_streams_per_ms: DEFAULT_MAX_STREAMS_PER_MS,
max_connections_per_ipaddr_per_min: DEFAULT_MAX_CONNECTIONS_PER_IPADDR_PER_MINUTE,
coalesce_channel_size: 500_000, // use a smaller value for test as create a huge bounded channel can take time
}
}
}
Expand Down Expand Up @@ -121,6 +123,7 @@ pub fn setup_quic_server_with_sockets(
max_unstaked_connections,
max_streams_per_ms,
max_connections_per_ipaddr_per_min,
coalesce_channel_size,
}: TestServerConfig,
) -> SpawnTestServerResult {
let exit = Arc::new(AtomicBool::new(false));
Expand All @@ -136,6 +139,7 @@ pub fn setup_quic_server_with_sockets(
max_connections_per_ipaddr_per_min,
wait_for_chunk_timeout: DEFAULT_WAIT_FOR_CHUNK_TIMEOUT,
coalesce: DEFAULT_TPU_COALESCE,
coalesce_channel_size,
};
let SpawnNonBlockingServerResult {
endpoints: _,
Expand Down
5 changes: 5 additions & 0 deletions streamer/src/quic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,9 @@ pub struct SpawnServerResult {
pub key_updater: Arc<EndpointKeyUpdater>,
}

/// Controls the the channel size for the PacketBatch coalesce
pub(crate) const MAX_COALESCE_CHANNEL_SIZE: usize = 1_000_000;

/// Returns default server configuration along with its PEM certificate chain.
#[allow(clippy::field_reassign_with_default)] // https://github.com/rust-lang/rust-clippy/issues/6527
pub(crate) fn configure_server(
Expand Down Expand Up @@ -573,6 +576,7 @@ pub struct QuicServerParams {
pub max_connections_per_ipaddr_per_min: u64,
pub wait_for_chunk_timeout: Duration,
pub coalesce: Duration,
pub coalesce_channel_size: usize,
}

impl Default for QuicServerParams {
Expand All @@ -585,6 +589,7 @@ impl Default for QuicServerParams {
max_connections_per_ipaddr_per_min: DEFAULT_MAX_CONNECTIONS_PER_IPADDR_PER_MINUTE,
wait_for_chunk_timeout: DEFAULT_WAIT_FOR_CHUNK_TIMEOUT,
coalesce: DEFAULT_TPU_COALESCE,
coalesce_channel_size: MAX_COALESCE_CHANNEL_SIZE,
}
}
}
Expand Down
1 change: 1 addition & 0 deletions vortexor/src/vortexor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ impl Vortexor {
max_connections_per_ipaddr_per_min,
wait_for_chunk_timeout: DEFAULT_WAIT_FOR_CHUNK_TIMEOUT,
coalesce: tpu_coalesce,
..Default::default()
};

let TpuSockets {
Expand Down

0 comments on commit c226463

Please sign in to comment.