From 02e25ca0854ebe178989c8b98cd45ad2403c348f Mon Sep 17 00:00:00 2001 From: galactus <96341601+godmodegalactus@users.noreply.github.com> Date: Wed, 27 Mar 2024 11:54:37 +0100 Subject: [PATCH] Replacing exit signal with exit notification (#372) * Replacing exit signal with exit notification * Deprecitating nightly version * Increase connection size --- .github/workflows/clippy_test.yml | 6 +- cluster-endpoints/src/grpc_multiplex.rs | 3 - lite-rpc/src/bridge.rs | 1 + run_clippy_fmt.sh | 4 +- services/src/quic_connection.rs | 70 ++++++++++++------- services/src/quic_connection_utils.rs | 30 +++++--- .../src/tpu_utils/tpu_connection_manager.rs | 62 +++++----------- 7 files changed, 86 insertions(+), 90 deletions(-) diff --git a/.github/workflows/clippy_test.yml b/.github/workflows/clippy_test.yml index f267d618..625a9515 100644 --- a/.github/workflows/clippy_test.yml +++ b/.github/workflows/clippy_test.yml @@ -28,7 +28,7 @@ jobs: - uses: actions-rust-lang/setup-rust-toolchain@v1 with: # use toolchain version from rust-toolchain.toml - toolchain: nightly-2024-01-05 + toolchain: nightly-2023-10-05 components: rustfmt, clippy cache: true # avoid the default "-D warnings" which thrashes cache @@ -48,5 +48,5 @@ jobs: - name: Run fmt+clippy run: | - cargo +nightly-2024-01-05 fmt --all --check - cargo +nightly-2024-01-05 clippy --locked --workspace --all-targets -- -D warnings + cargo +nightly-2023-10-05 fmt --all --check + cargo +nightly-2023-10-05 clippy --locked --workspace --all-targets -- -D warnings diff --git a/cluster-endpoints/src/grpc_multiplex.rs b/cluster-endpoints/src/grpc_multiplex.rs index 0a9d1be4..a0287f12 100644 --- a/cluster-endpoints/src/grpc_multiplex.rs +++ b/cluster-endpoints/src/grpc_multiplex.rs @@ -16,7 +16,6 @@ use solana_lite_rpc_core::AnyhowJoinHandle; use solana_sdk::clock::Slot; use solana_sdk::commitment_config::CommitmentConfig; use std::collections::{BTreeSet, HashMap, HashSet}; -use std::sync::atomic::AtomicBool; use std::sync::Arc; use std::time::Duration; use tokio::sync::broadcast::Receiver; @@ -140,7 +139,6 @@ pub fn create_grpc_multiplex_blocks_subscription( let (processed_block_sender, processed_block_reciever) = async_channel::unbounded::(); - let exit_signal = Arc::new(AtomicBool::new(false)); let exit_notify = Arc::new(Notify::new()); let processed_blocks_tasks = create_grpc_multiplex_processed_block_stream( &grpc_sources, @@ -247,7 +245,6 @@ pub fn create_grpc_multiplex_blocks_subscription( } } } - exit_signal.store(true, std::sync::atomic::Ordering::Relaxed); exit_notify.notify_waiters(); futures::future::join_all(processed_blocks_tasks).await; } diff --git a/lite-rpc/src/bridge.rs b/lite-rpc/src/bridge.rs index d4760964..af80f868 100644 --- a/lite-rpc/src/bridge.rs +++ b/lite-rpc/src/bridge.rs @@ -108,6 +108,7 @@ impl LiteBridge { let http_server_handle = ServerBuilder::default() .http_only() + .max_connections(1_000_000) .build(http_addr.clone()) .await? .start(rpc)?; diff --git a/run_clippy_fmt.sh b/run_clippy_fmt.sh index ed379a8b..4b1c613b 100755 --- a/run_clippy_fmt.sh +++ b/run_clippy_fmt.sh @@ -1,2 +1,2 @@ -cargo +nightly-2024-01-05 fmt --all -cargo +nightly-2024-01-05 clippy --locked --workspace --all-targets -- -D warnings \ No newline at end of file +cargo +nightly-2023-10-05 fmt --all +cargo +nightly-2023-10-05 clippy --locked --workspace --all-targets -- -D warnings \ No newline at end of file diff --git a/services/src/quic_connection.rs b/services/src/quic_connection.rs index 82e68202..3678307f 100644 --- a/services/src/quic_connection.rs +++ b/services/src/quic_connection.rs @@ -14,7 +14,7 @@ use std::{ Arc, }, }; -use tokio::sync::{OwnedSemaphorePermit, RwLock, Semaphore}; +use tokio::sync::{Notify, OwnedSemaphorePermit, RwLock, Semaphore}; pub type EndpointPool = RotatingQueue; @@ -40,7 +40,7 @@ pub struct QuicConnection { identity: Pubkey, socket_address: SocketAddr, connection_params: QuicConnectionParameters, - exit_signal: Arc, + exit_notify: Arc, timeout_counters: Arc, has_connected_once: Arc, } @@ -51,7 +51,7 @@ impl QuicConnection { endpoint: Endpoint, socket_address: SocketAddr, connection_params: QuicConnectionParameters, - exit_signal: Arc, + exit_notify: Arc, ) -> Self { Self { connection: Arc::new(RwLock::new(None)), @@ -60,7 +60,7 @@ impl QuicConnection { identity, socket_address, connection_params, - exit_signal, + exit_notify, timeout_counters: Arc::new(AtomicU64::new(0)), has_connected_once: Arc::new(AtomicBool::new(false)), } @@ -74,7 +74,7 @@ impl QuicConnection { self.socket_address, self.connection_params.connection_timeout, self.connection_params.connection_retry_count, - self.exit_signal.clone(), + self.exit_notify.clone(), ) .await } @@ -127,32 +127,48 @@ impl QuicConnection { pub async fn send_transaction(&self, tx: Vec) { let connection_retry_count = self.connection_params.connection_retry_count; for _ in 0..connection_retry_count { - if self.exit_signal.load(Ordering::Relaxed) { - // return - return; - } - let mut do_retry = false; - let connection = self.get_connection().await; + let exit_notify = self.exit_notify.clone(); + + let connection = tokio::select! { + conn = self.get_connection() => { + conn + }, + _ = exit_notify.notified() => { + break; + } + }; if let Some(connection) = connection { TRIED_SEND_TRANSCTION_TRIED.inc(); let current_stable_id = connection.stable_id() as u64; - match QuicConnectionUtils::open_unistream( - connection, - self.connection_params.unistream_timeout, - ) - .await - { + let open_uni_result = tokio::select! { + res = QuicConnectionUtils::open_unistream( + connection, + self.connection_params.unistream_timeout, + ) => { + res + }, + _ = exit_notify.notified() => { + break; + } + }; + match open_uni_result { Ok(send_stream) => { - match QuicConnectionUtils::write_all( - send_stream, - &tx, - self.identity, - self.connection_params, - ) - .await - { + let write_add_result = tokio::select! { + res = QuicConnectionUtils::write_all( + send_stream, + &tx, + self.identity, + self.connection_params, + ) => { + res + }, + _ = exit_notify.notified() => { + break; + } + }; + match write_add_result { Ok(()) => { SEND_TRANSCTION_SUCESSFUL.inc(); } @@ -231,7 +247,7 @@ impl QuicConnectionPool { endpoints: EndpointPool, socket_address: SocketAddr, connection_parameters: QuicConnectionParameters, - exit_signal: Arc, + exit_notify: Arc, nb_connection: usize, max_number_of_unistream_connection: usize, ) -> Self { @@ -243,7 +259,7 @@ impl QuicConnectionPool { endpoints.get().expect("Should get and endpoint"), socket_address, connection_parameters, - exit_signal.clone(), + exit_notify.clone(), )); } Self { diff --git a/services/src/quic_connection_utils.rs b/services/src/quic_connection_utils.rs index dc864610..cc3a1da0 100644 --- a/services/src/quic_connection_utils.rs +++ b/services/src/quic_connection_utils.rs @@ -11,13 +11,10 @@ use solana_lite_rpc_core::network_utils::apply_gso_workaround; use solana_sdk::pubkey::Pubkey; use std::{ net::{IpAddr, Ipv4Addr, SocketAddr}, - sync::{ - atomic::{AtomicBool, Ordering}, - Arc, - }, + sync::Arc, time::Duration, }; -use tokio::time::timeout; +use tokio::{sync::Notify, time::timeout}; lazy_static::lazy_static! { static ref NB_QUIC_0RTT_ATTEMPTED: GenericGauge = @@ -222,15 +219,29 @@ impl QuicConnectionUtils { addr: SocketAddr, connection_timeout: Duration, connection_retry_count: usize, - exit_signal: Arc, + exit_notified: Arc, ) -> Option { for _ in 0..connection_retry_count { let conn = if already_connected { NB_QUIC_0RTT_ATTEMPTED.inc(); - Self::make_connection_0rtt(endpoint.clone(), addr, connection_timeout).await + tokio::select! { + res = Self::make_connection_0rtt(endpoint.clone(), addr, connection_timeout) => { + res + }, + _ = exit_notified.notified() => { + break; + } + } } else { NB_QUIC_CONN_ATTEMPTED.inc(); - Self::make_connection(endpoint.clone(), addr, connection_timeout).await + tokio::select! { + res = Self::make_connection(endpoint.clone(), addr, connection_timeout) => { + res + }, + _ = exit_notified.notified() => { + break; + } + } }; match conn { Ok(conn) => { @@ -239,9 +250,6 @@ impl QuicConnectionUtils { } Err(e) => { trace!("Could not connect to {} because of error {}", identity, e); - if exit_signal.load(Ordering::Relaxed) { - break; - } } } } diff --git a/services/src/tpu_utils/tpu_connection_manager.rs b/services/src/tpu_utils/tpu_connection_manager.rs index d3e837f9..ea4552f3 100644 --- a/services/src/tpu_utils/tpu_connection_manager.rs +++ b/services/src/tpu_utils/tpu_connection_manager.rs @@ -13,15 +13,7 @@ use solana_lite_rpc_core::{ }; use solana_sdk::pubkey::Pubkey; use solana_streamer::nonblocking::quic::compute_max_allowed_uni_streams; -use std::{ - collections::HashMap, - net::SocketAddr, - sync::{ - atomic::{AtomicBool, Ordering}, - Arc, - }, - time::Duration, -}; +use std::{collections::HashMap, net::SocketAddr, sync::Arc, time::Duration}; use tokio::sync::{ broadcast::{Receiver, Sender}, Notify, @@ -55,9 +47,9 @@ struct ActiveConnection { endpoints: RotatingQueue, identity: Pubkey, tpu_address: SocketAddr, - exit_signal: Arc, data_cache: DataCache, connection_parameters: QuicConnectionParameters, + exit_notifier: Arc, } impl ActiveConnection { @@ -72,9 +64,9 @@ impl ActiveConnection { endpoints, tpu_address, identity, - exit_signal: Arc::new(AtomicBool::new(false)), data_cache, connection_parameters, + exit_notifier: Arc::new(Notify::new()), } } @@ -99,13 +91,12 @@ impl ActiveConnection { identity_stakes.stakes, identity_stakes.total_stakes, ); - let exit_signal = self.exit_signal.clone(); let connection_pool = QuicConnectionPool::new( identity, self.endpoints.clone(), addr, self.connection_parameters, - exit_signal.clone(), + exit_notifier.clone(), max_number_of_connections, max_uni_stream_connections, ); @@ -116,12 +107,19 @@ impl ActiveConnection { let priorization_heap = priorization_heap.clone(); let data_cache = self.data_cache.clone(); let fill_notify = fill_notify.clone(); - let exit_signal = exit_signal.clone(); + let exit_notifier = exit_notifier.clone(); tokio::spawn(async move { let mut current_blockheight = data_cache.block_information_store.get_last_blockheight(); - while !exit_signal.load(Ordering::Relaxed) { - let tx = transaction_reciever.recv().await; + loop { + let tx = tokio::select! { + tx = transaction_reciever.recv() => { + tx + }, + _ = exit_notifier.notified() => { + break; + } + }; match tx { Ok(transaction_sent_info) => { if data_cache @@ -172,20 +170,10 @@ impl ActiveConnection { }; 'main_loop: loop { - // exit signal set - if exit_signal.load(Ordering::Relaxed) { - break; - } - tokio::select! { _ = fill_notify.notified() => { 'process_heap: loop { - // exit signal set - if exit_signal.load(Ordering::Relaxed) { - break 'main_loop; - } - let Some(tx) = priorization_heap.pop().await else { // wait to get notification from fill event break 'process_heap; @@ -248,14 +236,9 @@ impl ActiveConnection { } } -struct ActiveConnectionWithExitNotifier { - pub active_connection: ActiveConnection, - pub exit_notifier: Arc, -} - pub struct TpuConnectionManager { endpoints: RotatingQueue, - identity_to_active_connection: Arc>>, + identity_to_active_connection: Arc>, } impl TpuConnectionManager { @@ -301,13 +284,8 @@ impl TpuConnectionManager { exit_notifier.clone(), identity_stakes, ); - self.identity_to_active_connection.insert( - *identity, - Arc::new(ActiveConnectionWithExitNotifier { - active_connection, - exit_notifier, - }), - ); + self.identity_to_active_connection + .insert(*identity, active_connection); } } @@ -316,11 +294,7 @@ impl TpuConnectionManager { if !connections_to_keep.contains_key(key) { trace!("removing a connection for {}", key.to_string()); // ignore error for exit channel - value - .active_connection - .exit_signal - .store(true, Ordering::Relaxed); - value.exit_notifier.notify_one(); + value.exit_notifier.notify_waiters(); false } else { true