Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: No time-based loss detection before largest_acked & fixes #1998

Draft
wants to merge 76 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 24 commits
Commits
Show all changes
76 commits
Select commit Hold shift + click to select a range
5aae826
tmp
larseggert Jul 21, 2024
776fcf4
Cleanup
larseggert Jul 21, 2024
7334fac
Finalize test
larseggert Jul 21, 2024
b6e05e3
Argh
larseggert Jul 22, 2024
2af5add
Fix
larseggert Jul 22, 2024
49e2836
Suggestion from @mxinden
larseggert Jul 22, 2024
4f1fdcb
Merge branch 'main' into 0rtt-tmp
larseggert Jul 22, 2024
84c628b
Merge branch 'main' into 0rtt-tmp
larseggert Jul 22, 2024
150881f
Suggestion from @martinthomson
larseggert Jul 22, 2024
4bf8205
Wording
larseggert Jul 22, 2024
a0b7ddf
Only do this when we don't have a largest_acked
larseggert Jul 22, 2024
4d22083
Minimize diff
larseggert Jul 22, 2024
3a36c97
Merge branch 'main' into 0rtt-tmp
larseggert Jul 22, 2024
4f40dee
Expose `on_congestion_event`
larseggert Jul 23, 2024
d1bc820
Add comment
larseggert Jul 23, 2024
5e1ea67
Apply suggestions from code review
larseggert Jul 23, 2024
0781c72
More suggestions from @mxinden; thanks
larseggert Jul 23, 2024
3bb4c9d
Merge branch 'main' into 0rtt-tmp
larseggert Jul 25, 2024
b651b3c
Merge branch 'main' into 0rtt-tmp
larseggert Jul 30, 2024
76c32f3
Potential fix?
larseggert Jul 31, 2024
8e99822
Merge branch '0rtt-tmp' of github.com:larseggert/neqo into 0rtt-tmp
larseggert Jul 31, 2024
bb5769e
Revert "Potential fix?"
larseggert Jul 31, 2024
76944b5
Merge branch 'main' into 0rtt-tmp
larseggert Aug 1, 2024
92f07f9
Merge branch 'main' into 0rtt-tmp
larseggert Aug 9, 2024
cd84b61
fmt
larseggert Aug 9, 2024
1b6a81d
Fixes
larseggert Aug 9, 2024
56fd7dd
Merge branch 'main' into 0rtt-tmp
larseggert Aug 9, 2024
52f0ecc
Merge branch 'main' into 0rtt-tmp
larseggert Aug 13, 2024
faa52c8
Merge branch 'main' into 0rtt-tmp
larseggert Aug 13, 2024
bd6bd7e
Merge branch 'main' into 0rtt-tmp
larseggert Aug 15, 2024
9156292
Robustify (?)
larseggert Aug 16, 2024
fca99af
Nits
larseggert Aug 16, 2024
467abe5
Merge branch 'main' into 0rtt-tmp
larseggert Aug 16, 2024
4d0adfd
Merge branch 'main' into 0rtt-tmp
larseggert Aug 21, 2024
6bc89bb
Merge branch 'main' into 0rtt-tmp
larseggert Aug 22, 2024
53c7bc7
Use my patched simulator image (for now)
larseggert Aug 22, 2024
d8f6a24
Merge branch 'main' into 0rtt-tmp
larseggert Sep 5, 2024
97b6be8
Revert QNS image
larseggert Sep 5, 2024
b29a832
Fixes
larseggert Sep 5, 2024
34fa79c
Add debugs suggested by @mxinden
larseggert Sep 6, 2024
3e32945
POssible fix
larseggert Sep 6, 2024
17190e9
More drastic fix
larseggert Sep 6, 2024
bfde87f
fmt
larseggert Sep 6, 2024
e1c18c5
Remove debug logging
larseggert Sep 6, 2024
02bede7
Update test
larseggert Sep 9, 2024
9356346
Do not encode long RTT guesses in resumption tokens
larseggert Sep 9, 2024
de91e32
trace
larseggert Sep 9, 2024
f6113fa
Merge branch 'main' into 0rtt-tmp
larseggert Sep 9, 2024
91a3ae6
Also RTX 0-RTT
larseggert Sep 9, 2024
a756007
Merge branch '0rtt-tmp' of github.com:larseggert/neqo into 0rtt-tmp
larseggert Sep 9, 2024
8d88c24
fmt
larseggert Sep 9, 2024
63aa0bd
debug
larseggert Sep 9, 2024
4fa5b10
No pacing for zerortt test
larseggert Sep 9, 2024
1c83522
log resumption
larseggert Sep 9, 2024
d1d3898
Log type of saved datagram
larseggert Sep 10, 2024
15b7521
Separate out the `resumption` test
larseggert Sep 10, 2024
094d78e
Merge branch 'main' into 0rtt-tmp
larseggert Sep 10, 2024
838942e
fmt
larseggert Sep 10, 2024
3b7fe10
Discard Initial keys after first Handshake packet is sent
larseggert Sep 10, 2024
c2e20ff
Revert
larseggert Sep 11, 2024
ddabf57
Fix?
larseggert Sep 11, 2024
6749838
Remove unused fn
larseggert Sep 11, 2024
8f4c566
ACK
larseggert Sep 11, 2024
73f2929
Maybe Handshake ping
larseggert Sep 11, 2024
af765b6
picoquic bug
larseggert Sep 12, 2024
60af704
Merge branch 'main' into 0rtt-tmp
larseggert Sep 13, 2024
36a77b0
Merge branch '0rtt-tmp' of github.com:larseggert/neqo into 0rtt-tmp
larseggert Sep 13, 2024
f3f2775
Merge branch 'main' into 0rtt-tmp
larseggert Sep 13, 2024
758f214
Merge branch '0rtt-tmp' of github.com:larseggert/neqo into 0rtt-tmp
larseggert Sep 13, 2024
8b3663d
Merge
larseggert Sep 13, 2024
230714d
Merge
larseggert Sep 13, 2024
8fd763e
Rename test and turn off grease
larseggert Sep 13, 2024
0385874
Cleanups
larseggert Sep 13, 2024
7004906
Minimize diff
larseggert Sep 16, 2024
88c8a1f
Merge branch 'main' into 0rtt-tmp
larseggert Sep 18, 2024
d94af34
Merge fixes
larseggert Sep 18, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 35 additions & 35 deletions neqo-transport/src/cc/classic_cc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -337,6 +337,41 @@ impl<T: WindowAdjustment> CongestionControl for ClassicCongestionControl<T> {
congestion || persistent_congestion
}

/// Handle a congestion event.
/// Returns true if this was a true congestion event.
fn on_congestion_event(&mut self, last_packet: &SentPacket) -> bool {
// Start a new congestion event if lost or ECN CE marked packet was sent
// after the start of the previous congestion recovery period.
if !self.after_recovery_start(last_packet) {
return false;
}

let (cwnd, acked_bytes) = self.cc_algorithm.reduce_cwnd(
self.congestion_window,
self.acked_bytes,
self.max_datagram_size(),
);
self.congestion_window = max(cwnd, self.cwnd_min());
self.acked_bytes = acked_bytes;
self.ssthresh = self.congestion_window;
qdebug!(
[self],
"Cong event -> recovery; cwnd {}, ssthresh {}",
self.congestion_window,
self.ssthresh
);
qlog::metrics_updated(
&self.qlog,
&[
QlogMetric::CongestionWindow(self.congestion_window),
QlogMetric::SsThresh(self.ssthresh),
QlogMetric::InRecovery(true),
],
);
self.set_state(State::RecoveryStart);
true
}

/// Report received ECN CE mark(s) to the congestion controller as a
/// congestion event.
///
Expand Down Expand Up @@ -537,41 +572,6 @@ impl<T: WindowAdjustment> ClassicCongestionControl<T> {
!self.state.transient() && self.recovery_start.map_or(true, |pn| packet.pn() >= pn)
}

/// Handle a congestion event.
/// Returns true if this was a true congestion event.
fn on_congestion_event(&mut self, last_packet: &SentPacket) -> bool {
// Start a new congestion event if lost or ECN CE marked packet was sent
// after the start of the previous congestion recovery period.
if !self.after_recovery_start(last_packet) {
return false;
}

let (cwnd, acked_bytes) = self.cc_algorithm.reduce_cwnd(
self.congestion_window,
self.acked_bytes,
self.max_datagram_size(),
);
self.congestion_window = max(cwnd, self.cwnd_min());
self.acked_bytes = acked_bytes;
self.ssthresh = self.congestion_window;
qdebug!(
[self],
"Cong event -> recovery; cwnd {}, ssthresh {}",
self.congestion_window,
self.ssthresh
);
qlog::metrics_updated(
&self.qlog,
&[
QlogMetric::CongestionWindow(self.congestion_window),
QlogMetric::SsThresh(self.ssthresh),
QlogMetric::InRecovery(true),
],
);
self.set_state(State::RecoveryStart);
true
}

fn app_limited(&self) -> bool {
if self.bytes_in_flight >= self.congestion_window {
false
Expand Down
5 changes: 5 additions & 0 deletions neqo-transport/src/cc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,11 @@ pub trait CongestionControl: Display + Debug {
lost_packets: &[SentPacket],
) -> bool;

/// Initiate a congestion response.
///
/// Returns true if the congestion window was reduced.
fn on_congestion_event(&mut self, last_packet: &SentPacket) -> bool;

/// Returns true if the congestion window was reduced.
fn on_ecn_ce_received(&mut self, largest_acked_pkt: &SentPacket) -> bool;

Expand Down
6 changes: 2 additions & 4 deletions neqo-transport/src/connection/tests/handshake.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,7 @@ use neqo_crypto::{
};
#[cfg(not(feature = "disable-encryption"))]
use test_fixture::datagram;
use test_fixture::{
assertions, assertions::assert_coalesced_0rtt, fixture_init, now, split_datagram, DEFAULT_ADDR,
};
use test_fixture::{assertions, fixture_init, now, split_datagram, DEFAULT_ADDR};

use super::{
super::{Connection, Output, State},
Expand Down Expand Up @@ -401,10 +399,10 @@ fn reorder_05rtt_with_0rtt() {
// Now PTO at the client and cause the server to re-send handshake packets.
now += AT_LEAST_PTO;
let c3 = client.process(None, now).dgram();
assert_coalesced_0rtt(c3.as_ref().unwrap());

now += RTT / 2;
let s3 = server.process(c3.as_ref(), now).dgram().unwrap();
assertions::assert_no_1rtt(&s3[..]);

// The client should be able to process the 0.5 RTT now.
// This should contain an ACK, so we are processing an ACK from the past.
Expand Down
11 changes: 4 additions & 7 deletions neqo-transport/src/connection/tests/recovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -174,10 +174,12 @@ fn pto_initial() {
assert_eq!(delay, INITIAL_PTO);

// Resend initial after PTO.
let cwnd_prior: usize = cwnd(&client);
now += delay;
let pkt2 = client.process(None, now).dgram();
assert!(pkt2.is_some());
assert_eq!(pkt2.unwrap().len(), client.plpmtu());
assert_eq!(cwnd_prior, 2 * cwnd(&client)); // cwnd has halved

let delay = client.process(None, now).callback();
// PTO has doubled.
Expand Down Expand Up @@ -294,21 +296,16 @@ fn pto_handshake_complete() {
// Check that the other packets (pkt2, pkt3) are Handshake packets.
// The server discarded the Handshake keys already, therefore they are dropped.
// Note that these don't include 1-RTT packets, because 1-RTT isn't send on PTO.
let (pkt2_hs, pkt2_1rtt) = split_datagram(&pkt2.unwrap());
assert_handshake(&pkt2_hs);
assert!(pkt2_1rtt.is_some());
let dropped_before1 = server.stats().dropped_rx;
let server_frames = server.stats().frame_rx.all;
server.process_input(&pkt2_hs, now);
server.process_input(&pkt2.unwrap(), now);
assert_eq!(1, server.stats().dropped_rx - dropped_before1);
assert_eq!(server.stats().frame_rx.all, server_frames);

server.process_input(&pkt2_1rtt.unwrap(), now);
let server_frames2 = server.stats().frame_rx.all;
let dropped_before2 = server.stats().dropped_rx;
server.process_input(&pkt3_hs, now);
assert_eq!(1, server.stats().dropped_rx - dropped_before2);
assert_eq!(server.stats().frame_rx.all, server_frames2);
assert_eq!(server.stats().frame_rx.all, server_frames);

now += HALF_RTT;

Expand Down
7 changes: 7 additions & 0 deletions neqo-transport/src/path.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1018,6 +1018,13 @@ impl Path {
}
}

/// Initiate a congestion response.
///
/// Returns true if the congestion window was reduced.
pub fn on_congestion_event(&mut self, last_packet: &SentPacket) -> bool {
self.sender.on_congestion_event(last_packet)
}

/// Determine whether we should be setting a PTO for this path. This is true when either the
/// path is valid or when there is enough remaining in the amplification limit to fit a
/// full-sized path (i.e., the path MTU).
Expand Down
77 changes: 48 additions & 29 deletions neqo-transport/src/recovery/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -321,13 +321,15 @@ impl LossRecoverySpace {
);
self.first_ooo_time = None;

let largest_acked = self.largest_acked;
let Some(largest_acked) = self.largest_acked else {
return;
};

for packet in self
.sent_packets
.iter_mut()
// BTreeMap iterates in order of ascending PN
.take_while(|p| p.pn() < largest_acked.unwrap_or(PacketNumber::MAX))
.take_while(|p| p.pn() < largest_acked)
{
// Packets sent before now - loss_delay are deemed lost.
if packet.time_sent() + loss_delay <= now {
Expand All @@ -337,17 +339,15 @@ impl LossRecoverySpace {
packet.time_sent(),
loss_delay
);
} else if largest_acked >= Some(packet.pn() + PACKET_THRESHOLD) {
} else if largest_acked >= packet.pn() + PACKET_THRESHOLD {
qtrace!(
"lost={}, is >= {} from largest acked {:?}",
packet.pn(),
PACKET_THRESHOLD,
largest_acked
);
} else {
if largest_acked.is_some() {
self.first_ooo_time = Some(packet.time_sent());
}
self.first_ooo_time = Some(packet.time_sent());
// No more packets can be declared lost after this one.
break;
};
Expand Down Expand Up @@ -840,34 +840,49 @@ impl LossRecovery {
/// When it has, mark a few packets as "lost" for the purposes of having frames
/// regenerated in subsequent packets. The packets aren't truly lost, so
/// we have to clone the `SentPacket` instance.
fn maybe_fire_pto(&mut self, rtt: &RttEstimate, now: Instant, lost: &mut Vec<SentPacket>) {
fn maybe_fire_pto(&mut self, path: &PathRef, now: Instant, lost: &mut Vec<SentPacket>) {
let mut pto_space = None;
// The spaces in which we will allow probing.
let mut allow_probes = PacketNumberSpaceSet::default();
for pn_space in PacketNumberSpace::iter() {
if let Some(t) = self.pto_time(rtt, *pn_space) {
allow_probes[*pn_space] = true;
if t <= now {
qdebug!([self], "PTO timer fired for {}", pn_space);
let space = self.spaces.get_mut(*pn_space).unwrap();
lost.extend(
space
.pto_packets(PtoState::pto_packet_count(
*pn_space,
self.stats.borrow().packets_rx,
))
.cloned(),
);

pto_space = pto_space.or(Some(*pn_space));
}
if self
.pto_time(path.borrow().rtt(), *pn_space)
.map_or(true, |t| now < t)
{
continue;
}
allow_probes[*pn_space] = true;
qdebug!([self], "PTO timer fired for {}", pn_space);
let space = self.spaces.get_mut(*pn_space).unwrap();
lost.extend(
space
.pto_packets(PtoState::pto_packet_count(
*pn_space,
self.stats.borrow().packets_rx,
))
.cloned(),
);

pto_space = pto_space.or(Some(*pn_space));
}

// This has to happen outside the loop. Increasing the PTO count here causes the
// pto_time to increase which might cause PTO for later packet number spaces to not fire.
if let Some(pn_space) = pto_space {
qtrace!([self], "PTO {}, probing {:?}", pn_space, allow_probes);
// Packets are only declared as lost, relative to the
// `largest_acked`. If we hit a PTO while we don't have a
// largest_acked yet, also do a congestion control reaction (because
// otherwise none would happen).
if self
.spaces
.get(pn_space)
.map_or(false, |space| space.largest_acked.is_none())
{
if let Some(last) = lost.last() {
path.borrow_mut().on_congestion_event(last);
}
}
self.fire_pto(pn_space, allow_probes);
}
}
Expand Down Expand Up @@ -898,7 +913,7 @@ impl LossRecovery {
}
self.stats.borrow_mut().lost += lost_packets.len();

self.maybe_fire_pto(primary_path.borrow().rtt(), now, &mut lost_packets);
self.maybe_fire_pto(primary_path, now, &mut lost_packets);
lost_packets
}

Expand Down Expand Up @@ -967,7 +982,6 @@ mod tests {
ecn::EcnCount,
packet::{PacketNumber, PacketType},
path::{Path, PathRef},
rtt::RttEstimate,
stats::{Stats, StatsCell},
};

Expand All @@ -978,8 +992,8 @@ mod tests {

const ON_SENT_SIZE: usize = 100;
/// An initial RTT for using with `setup_lr`.
const TEST_RTT: Duration = ms(80);
const TEST_RTTVAR: Duration = ms(40);
const TEST_RTT: Duration = ms(7000);
const TEST_RTTVAR: Duration = ms(3500);

struct Fixture {
lr: LossRecovery,
Expand Down Expand Up @@ -1050,6 +1064,7 @@ mod tests {
ConnectionIdEntry::new(0, ConnectionId::from(&[1, 2, 3]), [0; 16]),
);
path.set_primary(true);
path.rtt_mut().set_initial(TEST_RTT);
Self {
lr: LossRecovery::new(StatsCell::default(), FAST_PTO_SCALE),
path: Rc::new(RefCell::new(path)),
Expand Down Expand Up @@ -1540,7 +1555,11 @@ mod tests {

// Expiring state after the PTO on the ApplicationData space has
// expired should result in setting a PTO state.
let default_pto = RttEstimate::default().pto(PacketNumberSpace::ApplicationData);
let default_pto = lr
.path
.borrow()
.rtt()
.pto(PacketNumberSpace::ApplicationData);
let expected_pto = pn_time(2) + default_pto;
lr.discard(PacketNumberSpace::Handshake, expected_pto);
let profile = lr.send_profile(now());
Expand Down Expand Up @@ -1572,7 +1591,7 @@ mod tests {
ON_SENT_SIZE,
));

let handshake_pto = RttEstimate::default().pto(PacketNumberSpace::Handshake);
let handshake_pto = lr.path.borrow().rtt().pto(PacketNumberSpace::Handshake);
let expected_pto = now() + handshake_pto;
assert_eq!(lr.pto_time(PacketNumberSpace::Initial), Some(expected_pto));
let profile = lr.send_profile(now());
Expand Down
7 changes: 7 additions & 0 deletions neqo-transport/src/sender.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,13 @@ impl PacketSender {
self.maybe_update_pacer_mtu();
}

/// Initiate a congestion response.
///
/// Returns true if the congestion window was reduced.
pub fn on_congestion_event(&mut self, last_packet: &SentPacket) -> bool {
self.cc.on_congestion_event(last_packet)
}

/// Called when packets are lost. Returns true if the congestion window was reduced.
pub fn on_packets_lost(
&mut self,
Expand Down
2 changes: 1 addition & 1 deletion test-fixture/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ pub fn fixture_init() {

// This needs to be > 2ms to avoid it being rounded to zero.
// NSS operates in milliseconds and halves any value it is provided.
pub const ANTI_REPLAY_WINDOW: Duration = Duration::from_millis(10);
pub const ANTI_REPLAY_WINDOW: Duration = Duration::from_millis(1000);

/// A baseline time for all tests. This needs to be earlier than what `now()` produces
/// because of the need to have a span of time elapse for anti-replay purposes.
Expand Down
Loading