Skip to content

Commit

Permalink
Replace requested_handshakes map with immediate request
Browse files Browse the repository at this point in the history
This avoids issues with bounding the map, and allows us to schedule
handshakes immediately. We avoid a building herd of re-requested
handshakes by rescheduling a handshake by handshake period just after
our attempt to handshake.
  • Loading branch information
Mark-Simulacrum committed Feb 6, 2025
1 parent b871ad0 commit bd8e1bb
Show file tree
Hide file tree
Showing 5 changed files with 72 additions and 65 deletions.
9 changes: 2 additions & 7 deletions dc/s2n-quic-dc/src/path/secret/map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,13 +90,8 @@ impl Map {
self.store.contains(peer)
}

/// Check whether we would like to (re-)handshake with this peer.
///
/// Note that this is distinct from `contains`, we may already have *some* credentials for a
/// peer but still be interested in handshaking (e.g., due to periodic refresh of the
/// credentials).
pub fn needs_handshake(&self, peer: &SocketAddr) -> bool {
self.store.needs_handshake(peer)
pub fn register_request_handshake(&self, cb: Box<dyn Fn(SocketAddr) + Send + Sync>) {
self.store.register_request_handshake(cb);
}

/// Gets the [`Peer`] entry for the given address
Expand Down
27 changes: 4 additions & 23 deletions dc/s2n-quic-dc/src/path/secret/map/cleaner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ impl Cleaner {
let mut address_entries_initial = 0usize;
let mut address_entries_retired = 0usize;
let mut address_entries_active = 0usize;
let mut handshake_requests = 0usize;

// For non-retired entries, if it's time for them to handshake again, request a
// handshake to happen. This handshake will currently happen on the next request for this
Expand All @@ -115,7 +116,9 @@ impl Cleaner {
current_epoch.saturating_sub(retired_at) < eviction_cycles
} else {
if entry.rehandshake_time() <= now {
handshake_requests += 1;
state.request_handshake(*entry.peer());
entry.rehandshake_time_reschedule(state.rehandshake_period());
}

// always retain
Expand Down Expand Up @@ -152,28 +155,6 @@ impl Cleaner {
retained
});

// Iteration order should be effectively random, so this effectively just prunes the list
// periodically. 5000 is chosen arbitrarily to make sure this isn't a memory leak. Note
// that peers the application is actively interested in will typically bypass this list, so
// this is mostly a risk of delaying regular re-handshaking with very large cardinalities.
//
// FIXME: Long or mid-term it likely makes sense to replace this data structure with a
// fuzzy set of some kind and/or just moving to immediate background handshake attempts.
const MAX_REQUESTED_HANDSHAKES: usize = 5000;

let mut handshake_requests = 0usize;
let mut handshake_requests_retired = 0usize;
state.requested_handshakes.pin().retain(|_| {
handshake_requests += 1;
let retain = handshake_requests < MAX_REQUESTED_HANDSHAKES;

if !retain {
handshake_requests_retired += 1;
}

retain
});

let id_entries = id_entries_initial - id_entries_retired;
let address_entries = address_entries_initial - address_entries_retired;

Expand All @@ -192,7 +173,7 @@ impl Cleaner {
address_entries_initial_utilization: utilization(address_entries_initial),
address_entries_retired,
handshake_requests,
handshake_requests_retired,
handshake_requests_retired: 0,
},
);
}
Expand Down
37 changes: 28 additions & 9 deletions dc/s2n-quic-dc/src/path/secret/map/entry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use s2n_quic_core::{dc, varint::VarInt};
use std::{
net::SocketAddr,
sync::{
atomic::{AtomicU8, Ordering},
atomic::{AtomicU32, AtomicU8, Ordering},
Arc,
},
time::{Duration, Instant},
Expand All @@ -34,7 +34,7 @@ mod tests;
#[derive(Debug)]
pub(super) struct Entry {
creation_time: Instant,
rehandshake_delta_secs: u32,
rehandshake_delta_secs: AtomicU32,
peer: SocketAddr,
secret: schedule::Secret,
retired: IsRetired,
Expand Down Expand Up @@ -72,6 +72,7 @@ impl SizeOf for Entry {
}

impl SizeOf for AtomicU8 {}
impl SizeOf for AtomicU32 {}

impl Entry {
pub fn new(
Expand All @@ -88,20 +89,19 @@ impl Entry {
.fetch_min(crate::stream::MAX_DATAGRAM_SIZE as _, Ordering::Relaxed);

assert!(rehandshake_time.as_secs() <= u32::MAX as u64);
Self {
let entry = Self {
creation_time: Instant::now(),
// Schedule another handshake sometime in [5 minutes, rehandshake_time] from now.
rehandshake_delta_secs: rand::thread_rng().gen_range(
std::cmp::min(rehandshake_time.as_secs(), 360)..rehandshake_time.as_secs(),
) as u32,
rehandshake_delta_secs: AtomicU32::new(0),
peer,
secret,
retired: Default::default(),
sender,
receiver,
parameters,
accessed: AtomicU8::new(0),
}
};
entry.rehandshake_time_reschedule(rehandshake_time);
entry
}

#[cfg(any(test, feature = "testing"))]
Expand Down Expand Up @@ -246,7 +246,26 @@ impl Entry {
}

pub fn rehandshake_time(&self) -> Instant {
self.creation_time + Duration::from_secs(u64::from(self.rehandshake_delta_secs))
self.creation_time
+ Duration::from_secs(u64::from(
self.rehandshake_delta_secs.load(Ordering::Relaxed),
))
}

/// Reschedule the handshake some time into the future.
pub fn rehandshake_time_reschedule(&self, rehandshake_period: Duration) {
// The goal of rescheduling is to avoid continuously re-handshaking for N (possibly stale)
// peers every cleaner loop, instead we defer handshakes out again. This effectively acts
// as a (slow) retry mechanism.
let delta = rand::thread_rng().gen_range(
std::cmp::min(rehandshake_period.as_secs(), 360)..rehandshake_period.as_secs(),
) as u32;
// This can't practically overflow -- each time we add we push out the next add by at least
// that much time. The fastest this loops is then running once every 360 seconds and adding
// 360 each time. That takes (2**32/360)*360 to fill u32, which happens after 136 years of
// continuous execution.
self.rehandshake_delta_secs
.fetch_add(delta, Ordering::Relaxed);
}

pub fn age(&self) -> Duration {
Expand Down
60 changes: 36 additions & 24 deletions dc/s2n-quic-dc/src/path/secret/map/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use s2n_quic_core::{
use std::{
hash::{BuildHasherDefault, Hasher},
net::{Ipv4Addr, SocketAddr},
sync::{Arc, Mutex, Weak},
sync::{Arc, Mutex, RwLock, Weak},
time::Duration,
};

Expand Down Expand Up @@ -64,11 +64,6 @@ where
// needed.
pub(super) peers: fixed_map::Map<SocketAddr, Arc<Entry>>,

// Stores the set of SocketAddr for which we received a UnknownPathSecret packet.
// When handshake_with is called we will allow a new handshake if this contains a socket, this
// is a temporary solution until we implement proper background handshaking.
pub(super) requested_handshakes: flurry::HashSet<SocketAddr>,

// All known entries.
pub(super) ids: fixed_map::Map<Id, Arc<Entry>, BuildHasherDefault<NoopIdHasher>>,

Expand All @@ -78,6 +73,9 @@ where
// FIXME: This will get replaced with sending on a handshake socket associated with the map.
pub(super) control_socket: Arc<std::net::UdpSocket>,

#[allow(clippy::type_complexity)]
pub(super) request_handshake: RwLock<Option<Box<dyn Fn(SocketAddr) + Send + Sync>>>,

cleaner: Cleaner,

init_time: Timestamp,
Expand Down Expand Up @@ -131,13 +129,13 @@ where
rehandshake_period: Duration::from_secs(3600 * 24),
peers: fixed_map::Map::with_capacity(capacity, Default::default()),
ids: fixed_map::Map::with_capacity(capacity, Default::default()),
requested_handshakes: Default::default(),
cleaner: Cleaner::new(),
signer,
control_socket,
init_time,
clock,
subscriber,
request_handshake: RwLock::new(None),
};

let state = Arc::new(state);
Expand All @@ -152,19 +150,36 @@ where
}

pub fn request_handshake(&self, peer: SocketAddr) {
// The length is reset as part of cleanup to 5000.
let handshakes = self.requested_handshakes.pin();
if handshakes.len() <= 6000 {
handshakes.insert(peer);
self.subscriber()
.on_path_secret_map_background_handshake_requested(
event::builder::PathSecretMapBackgroundHandshakeRequested {
peer_address: SocketAddress::from(peer).into_event(),
},
);
self.subscriber()
.on_path_secret_map_background_handshake_requested(
event::builder::PathSecretMapBackgroundHandshakeRequested {
peer_address: SocketAddress::from(peer).into_event(),
},
);

// Normally we'd expect callers to use the Subscriber to register interest in this, but the
// Map is typically created *before* the s2n_quic::Client with the dc provider registered.
//
// Users of the state tracker typically register the callback when creating a new s2n-quic
// client to handshake into this map.
if let Some(callback) = self
.request_handshake
.read()
.unwrap_or_else(|e| e.into_inner())
.as_deref()
{
(callback)(peer);
}
}

fn register_request_handshake(&self, cb: Box<dyn Fn(SocketAddr) + Send + Sync>) {
// FIXME: Maybe panic if already initialized?
*self
.request_handshake
.write()
.unwrap_or_else(|e| e.into_inner()) = Some(cb);
}

fn handle_unknown_secret(
&self,
packet: &control::unknown_path_secret::Packet,
Expand Down Expand Up @@ -370,17 +385,10 @@ where
self.peers.contains_key(peer)
}

fn needs_handshake(&self, peer: &SocketAddr) -> bool {
self.requested_handshakes.pin().contains(peer)
}

fn on_new_path_secrets(&self, entry: Arc<Entry>) {
let id = *entry.id();
let peer = entry.peer();

// On insert clear our interest in a handshake.
self.requested_handshakes.pin().remove(peer);

let (same, other) = self.ids.insert(id, entry.clone());
if same.is_some() {
// FIXME: Make insertion fallible and fail handshakes instead?
Expand Down Expand Up @@ -445,6 +453,10 @@ where
});
}

fn register_request_handshake(&self, cb: Box<dyn Fn(SocketAddr) + Send + Sync>) {
self.register_request_handshake(cb);
}

fn get_by_addr_untracked(&self, peer: &SocketAddr) -> Option<ReadGuard<Arc<Entry>>> {
self.peers.get_by_key(peer)
}
Expand Down
4 changes: 2 additions & 2 deletions dc/s2n-quic-dc/src/path/secret/map/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,6 @@ pub trait Store: 'static + Send + Sync {

fn contains(&self, peer: &SocketAddr) -> bool;

fn needs_handshake(&self, peer: &SocketAddr) -> bool;

fn get_by_addr_untracked(&self, peer: &SocketAddr) -> Option<ReadGuard<Arc<Entry>>>;

fn get_by_addr_tracked(&self, peer: &SocketAddr) -> Option<ReadGuard<Arc<Entry>>>;
Expand All @@ -47,6 +45,8 @@ pub trait Store: 'static + Send + Sync {

fn rehandshake_period(&self) -> Duration;

fn register_request_handshake(&self, cb: Box<dyn Fn(SocketAddr) + Send + Sync>);

fn check_dedup(
&self,
entry: &Entry,
Expand Down

0 comments on commit bd8e1bb

Please sign in to comment.