diff --git a/dc/s2n-quic-dc/src/path/secret/map.rs b/dc/s2n-quic-dc/src/path/secret/map.rs index 4d60e5fbf..55f782f3c 100644 --- a/dc/s2n-quic-dc/src/path/secret/map.rs +++ b/dc/s2n-quic-dc/src/path/secret/map.rs @@ -90,13 +90,8 @@ impl Map { self.store.contains(peer) } - /// Check whether we would like to (re-)handshake with this peer. - /// - /// Note that this is distinct from `contains`, we may already have *some* credentials for a - /// peer but still be interested in handshaking (e.g., due to periodic refresh of the - /// credentials). - pub fn needs_handshake(&self, peer: &SocketAddr) -> bool { - self.store.needs_handshake(peer) + pub fn register_request_handshake(&self, cb: Box) { + self.store.register_request_handshake(cb); } /// Gets the [`Peer`] entry for the given address diff --git a/dc/s2n-quic-dc/src/path/secret/map/cleaner.rs b/dc/s2n-quic-dc/src/path/secret/map/cleaner.rs index 1250eaf1c..da6180dc8 100644 --- a/dc/s2n-quic-dc/src/path/secret/map/cleaner.rs +++ b/dc/s2n-quic-dc/src/path/secret/map/cleaner.rs @@ -103,6 +103,7 @@ impl Cleaner { let mut address_entries_initial = 0usize; let mut address_entries_retired = 0usize; let mut address_entries_active = 0usize; + let mut handshake_requests = 0usize; // For non-retired entries, if it's time for them to handshake again, request a // handshake to happen. This handshake will currently happen on the next request for this @@ -115,7 +116,9 @@ impl Cleaner { current_epoch.saturating_sub(retired_at) < eviction_cycles } else { if entry.rehandshake_time() <= now { + handshake_requests += 1; state.request_handshake(*entry.peer()); + entry.rehandshake_time_reschedule(state.rehandshake_period()); } // always retain @@ -152,28 +155,6 @@ impl Cleaner { retained }); - // Iteration order should be effectively random, so this effectively just prunes the list - // periodically. 5000 is chosen arbitrarily to make sure this isn't a memory leak. Note - // that peers the application is actively interested in will typically bypass this list, so - // this is mostly a risk of delaying regular re-handshaking with very large cardinalities. - // - // FIXME: Long or mid-term it likely makes sense to replace this data structure with a - // fuzzy set of some kind and/or just moving to immediate background handshake attempts. - const MAX_REQUESTED_HANDSHAKES: usize = 5000; - - let mut handshake_requests = 0usize; - let mut handshake_requests_retired = 0usize; - state.requested_handshakes.pin().retain(|_| { - handshake_requests += 1; - let retain = handshake_requests < MAX_REQUESTED_HANDSHAKES; - - if !retain { - handshake_requests_retired += 1; - } - - retain - }); - let id_entries = id_entries_initial - id_entries_retired; let address_entries = address_entries_initial - address_entries_retired; @@ -192,7 +173,7 @@ impl Cleaner { address_entries_initial_utilization: utilization(address_entries_initial), address_entries_retired, handshake_requests, - handshake_requests_retired, + handshake_requests_retired: 0, }, ); } diff --git a/dc/s2n-quic-dc/src/path/secret/map/entry.rs b/dc/s2n-quic-dc/src/path/secret/map/entry.rs index a130bf2b6..5492146ab 100644 --- a/dc/s2n-quic-dc/src/path/secret/map/entry.rs +++ b/dc/s2n-quic-dc/src/path/secret/map/entry.rs @@ -22,7 +22,7 @@ use s2n_quic_core::{dc, varint::VarInt}; use std::{ net::SocketAddr, sync::{ - atomic::{AtomicU8, Ordering}, + atomic::{AtomicU32, AtomicU8, Ordering}, Arc, }, time::{Duration, Instant}, @@ -34,7 +34,7 @@ mod tests; #[derive(Debug)] pub(super) struct Entry { creation_time: Instant, - rehandshake_delta_secs: u32, + rehandshake_delta_secs: AtomicU32, peer: SocketAddr, secret: schedule::Secret, retired: IsRetired, @@ -72,6 +72,7 @@ impl SizeOf for Entry { } impl SizeOf for AtomicU8 {} +impl SizeOf for AtomicU32 {} impl Entry { pub fn new( @@ -88,12 +89,9 @@ impl Entry { .fetch_min(crate::stream::MAX_DATAGRAM_SIZE as _, Ordering::Relaxed); assert!(rehandshake_time.as_secs() <= u32::MAX as u64); - Self { + let entry = Self { creation_time: Instant::now(), - // Schedule another handshake sometime in [5 minutes, rehandshake_time] from now. - rehandshake_delta_secs: rand::thread_rng().gen_range( - std::cmp::min(rehandshake_time.as_secs(), 360)..rehandshake_time.as_secs(), - ) as u32, + rehandshake_delta_secs: AtomicU32::new(0), peer, secret, retired: Default::default(), @@ -101,7 +99,9 @@ impl Entry { receiver, parameters, accessed: AtomicU8::new(0), - } + }; + entry.rehandshake_time_reschedule(rehandshake_time); + entry } #[cfg(any(test, feature = "testing"))] @@ -246,7 +246,26 @@ impl Entry { } pub fn rehandshake_time(&self) -> Instant { - self.creation_time + Duration::from_secs(u64::from(self.rehandshake_delta_secs)) + self.creation_time + + Duration::from_secs(u64::from( + self.rehandshake_delta_secs.load(Ordering::Relaxed), + )) + } + + /// Reschedule the handshake some time into the future. + pub fn rehandshake_time_reschedule(&self, rehandshake_period: Duration) { + // The goal of rescheduling is to avoid continuously re-handshaking for N (possibly stale) + // peers every cleaner loop, instead we defer handshakes out again. This effectively acts + // as a (slow) retry mechanism. + let delta = rand::thread_rng().gen_range( + std::cmp::min(rehandshake_period.as_secs(), 360)..rehandshake_period.as_secs(), + ) as u32; + // This can't practically overflow -- each time we add we push out the next add by at least + // that much time. The fastest this loops is then running once every 360 seconds and adding + // 360 each time. That takes (2**32/360)*360 to fill u32, which happens after 136 years of + // continuous execution. + self.rehandshake_delta_secs + .fetch_add(delta, Ordering::Relaxed); } pub fn age(&self) -> Duration { diff --git a/dc/s2n-quic-dc/src/path/secret/map/state.rs b/dc/s2n-quic-dc/src/path/secret/map/state.rs index b9d17c3a1..fdbfd262c 100644 --- a/dc/s2n-quic-dc/src/path/secret/map/state.rs +++ b/dc/s2n-quic-dc/src/path/secret/map/state.rs @@ -17,7 +17,7 @@ use s2n_quic_core::{ use std::{ hash::{BuildHasherDefault, Hasher}, net::{Ipv4Addr, SocketAddr}, - sync::{Arc, Mutex, Weak}, + sync::{Arc, Mutex, RwLock, Weak}, time::Duration, }; @@ -64,11 +64,6 @@ where // needed. pub(super) peers: fixed_map::Map>, - // Stores the set of SocketAddr for which we received a UnknownPathSecret packet. - // When handshake_with is called we will allow a new handshake if this contains a socket, this - // is a temporary solution until we implement proper background handshaking. - pub(super) requested_handshakes: flurry::HashSet, - // All known entries. pub(super) ids: fixed_map::Map, BuildHasherDefault>, @@ -78,6 +73,9 @@ where // FIXME: This will get replaced with sending on a handshake socket associated with the map. pub(super) control_socket: Arc, + #[allow(clippy::type_complexity)] + pub(super) request_handshake: RwLock>>, + cleaner: Cleaner, init_time: Timestamp, @@ -131,13 +129,13 @@ where rehandshake_period: Duration::from_secs(3600 * 24), peers: fixed_map::Map::with_capacity(capacity, Default::default()), ids: fixed_map::Map::with_capacity(capacity, Default::default()), - requested_handshakes: Default::default(), cleaner: Cleaner::new(), signer, control_socket, init_time, clock, subscriber, + request_handshake: RwLock::new(None), }; let state = Arc::new(state); @@ -152,19 +150,36 @@ where } pub fn request_handshake(&self, peer: SocketAddr) { - // The length is reset as part of cleanup to 5000. - let handshakes = self.requested_handshakes.pin(); - if handshakes.len() <= 6000 { - handshakes.insert(peer); - self.subscriber() - .on_path_secret_map_background_handshake_requested( - event::builder::PathSecretMapBackgroundHandshakeRequested { - peer_address: SocketAddress::from(peer).into_event(), - }, - ); + self.subscriber() + .on_path_secret_map_background_handshake_requested( + event::builder::PathSecretMapBackgroundHandshakeRequested { + peer_address: SocketAddress::from(peer).into_event(), + }, + ); + + // Normally we'd expect callers to use the Subscriber to register interest in this, but the + // Map is typically created *before* the s2n_quic::Client with the dc provider registered. + // + // Users of the state tracker typically register the callback when creating a new s2n-quic + // client to handshake into this map. + if let Some(callback) = self + .request_handshake + .read() + .unwrap_or_else(|e| e.into_inner()) + .as_deref() + { + (callback)(peer); } } + fn register_request_handshake(&self, cb: Box) { + // FIXME: Maybe panic if already initialized? + *self + .request_handshake + .write() + .unwrap_or_else(|e| e.into_inner()) = Some(cb); + } + fn handle_unknown_secret( &self, packet: &control::unknown_path_secret::Packet, @@ -370,17 +385,10 @@ where self.peers.contains_key(peer) } - fn needs_handshake(&self, peer: &SocketAddr) -> bool { - self.requested_handshakes.pin().contains(peer) - } - fn on_new_path_secrets(&self, entry: Arc) { let id = *entry.id(); let peer = entry.peer(); - // On insert clear our interest in a handshake. - self.requested_handshakes.pin().remove(peer); - let (same, other) = self.ids.insert(id, entry.clone()); if same.is_some() { // FIXME: Make insertion fallible and fail handshakes instead? @@ -445,6 +453,10 @@ where }); } + fn register_request_handshake(&self, cb: Box) { + self.register_request_handshake(cb); + } + fn get_by_addr_untracked(&self, peer: &SocketAddr) -> Option>> { self.peers.get_by_key(peer) } diff --git a/dc/s2n-quic-dc/src/path/secret/map/store.rs b/dc/s2n-quic-dc/src/path/secret/map/store.rs index 8a98c265c..3637f00bb 100644 --- a/dc/s2n-quic-dc/src/path/secret/map/store.rs +++ b/dc/s2n-quic-dc/src/path/secret/map/store.rs @@ -27,8 +27,6 @@ pub trait Store: 'static + Send + Sync { fn contains(&self, peer: &SocketAddr) -> bool; - fn needs_handshake(&self, peer: &SocketAddr) -> bool; - fn get_by_addr_untracked(&self, peer: &SocketAddr) -> Option>>; fn get_by_addr_tracked(&self, peer: &SocketAddr) -> Option>>; @@ -47,6 +45,8 @@ pub trait Store: 'static + Send + Sync { fn rehandshake_period(&self) -> Duration; + fn register_request_handshake(&self, cb: Box); + fn check_dedup( &self, entry: &Entry,