Skip to content

Commit 484d95b

Browse files
Add test for NACK responder max age
1 parent 0dfbcb4 commit 484d95b

File tree

4 files changed

+120
-27
lines changed

4 files changed

+120
-27
lines changed

interceptor/src/mock/mock_stream.rs

+10-1
Original file line numberDiff line numberDiff line change
@@ -210,12 +210,21 @@ impl MockStream {
210210
last
211211
}
212212

213-
/// written_rtp returns a channel containing rtp packets written, modified by the interceptor
213+
/// Wait for a written RTP packet to appear after traversing interceptor chains.
214214
pub async fn written_rtp(&self) -> Option<rtp::packet::Packet> {
215215
let mut rtp_out_modified_rx = self.rtp_out_modified_rx.lock().await;
216216
rtp_out_modified_rx.recv().await
217217
}
218218

219+
/// Assert that a RTP packet has traversed interceptor chains.
220+
///
221+
/// Like [`writte_rtp`] but does not wait.
222+
pub async fn written_rtp_expected(&self) -> Option<rtp::packet::Packet> {
223+
let mut rtp_out_modified_rx = self.rtp_out_modified_rx.lock().await;
224+
225+
rtp_out_modified_rx.try_recv().ok()
226+
}
227+
219228
/// read_rtcp returns a channel containing the rtcp batched read, modified by the interceptor
220229
pub async fn read_rtcp(
221230
&self,

interceptor/src/nack/responder/mod.rs

+14-9
Original file line numberDiff line numberDiff line change
@@ -89,19 +89,24 @@ impl ResponderInternal {
8989
let stream3 = Arc::clone(&stream2);
9090

9191
Box::pin(async move {
92-
if let Some(p) = stream3.get(seq).await {
93-
let should_send = max_packet_age
94-
.map(|max_age| p.age() < max_age)
95-
.unwrap_or(true);
96-
92+
let p = match stream3.get(seq).await {
93+
None => return true,
94+
Some(p) => p,
95+
};
96+
97+
if let Some(max_packet_age) = max_packet_age {
98+
let packet_age = p.age();
99+
let should_send = packet_age < max_packet_age;
97100
if !should_send {
101+
log::debug!("Not resending packet {} as it's older than the configured max age {}s. Packet was initially sent {}s ago", p.packet.header.sequence_number, max_packet_age.as_secs_f64(), packet_age.as_secs_f64());
98102
return true;
99103
}
104+
}
100105

101-
let a = Attributes::new();
102-
if let Err(err) = stream3.next_rtp_writer.write(&p.packet, &a).await {
103-
log::warn!("failed resending nacked packet: {}", err);
104-
}
106+
107+
let a = Attributes::new();
108+
if let Err(err) = stream3.next_rtp_writer.write(&p.packet, &a).await {
109+
log::warn!("failed resending nacked packet: {}", err);
105110
}
106111

107112
true

interceptor/src/nack/responder/responder_stream.rs

+5-2
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,12 @@ use crate::error::Result;
22
use crate::nack::UINT16SIZE_HALF;
33
use crate::{Attributes, RTPWriter};
44

5-
use async_trait::async_trait;
65
use std::sync::Arc;
7-
use std::time::{Duration, Instant};
6+
use std::time::Duration;
7+
8+
use async_trait::async_trait;
89
use tokio::sync::Mutex;
10+
use tokio::time::Instant;
911

1012
struct ResponderStreamInternal {
1113
packets: Vec<Option<SentPacket>>,
@@ -101,6 +103,7 @@ impl RTPWriter for ResponderStream {
101103
/// A packet that has been sent, or at least been queued to send.
102104
pub struct SentPacket {
103105
pub(super) packet: rtp::packet::Packet,
106+
// We use tokio's instant because it's mockable.
104107
sent_at: Instant,
105108
}
106109

interceptor/src/nack/responder/responder_test.rs

+91-15
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,11 @@
11
use super::*;
22
use crate::mock::mock_stream::MockStream;
33
use crate::stream_info::RTCPFeedback;
4-
use crate::test::timeout_or_fail;
54
use tokio::time::Duration;
65

76
use rtcp::transport_feedbacks::transport_layer_nack::{NackPair, TransportLayerNack};
87

9-
#[tokio::test]
8+
#[tokio::test(start_paused = true)]
109
async fn test_responder_interceptor() -> Result<()> {
1110
let icpr: Arc<dyn Interceptor + Send + Sync> =
1211
Responder::builder().with_log2_size(3).build("")?;
@@ -35,9 +34,13 @@ async fn test_responder_interceptor() -> Result<()> {
3534
})
3635
.await?;
3736

38-
let p = timeout_or_fail(Duration::from_millis(10), stream.written_rtp())
37+
// Let the packet be pulled through interceptor chains
38+
tokio::task::yield_now().await;
39+
40+
let p = stream
41+
.written_rtp_expected()
3942
.await
40-
.expect("A packet");
43+
.expect("Packet should have been written");
4144
assert_eq!(seq_num, p.header.sequence_number);
4245
}
4346

@@ -53,24 +56,97 @@ async fn test_responder_interceptor() -> Result<()> {
5356
],
5457
})])
5558
.await;
59+
tokio::time::advance(Duration::from_millis(50)).await;
60+
// Let the NACK task do its thing
61+
tokio::task::yield_now().await;
5662

5763
// seq number 13 was never sent, so it can't be resent
5864
for seq_num in [11, 12, 15] {
59-
if let Ok(r) = tokio::time::timeout(Duration::from_millis(50), stream.written_rtp()).await {
60-
if let Some(p) = r {
61-
assert_eq!(seq_num, p.header.sequence_number);
62-
} else {
63-
assert!(
64-
false,
65-
"seq_num {} is not sent due to channel closed",
66-
seq_num
67-
);
68-
}
65+
let p = stream
66+
.written_rtp_expected()
67+
.await
68+
.expect("Packet should have been written");
69+
assert_eq!(seq_num, p.header.sequence_number);
70+
}
71+
72+
let result = stream.written_rtp_expected().await;
73+
assert!(result.is_none(), "no more rtp packets expected");
74+
75+
stream.close().await?;
76+
77+
Ok(())
78+
}
79+
80+
#[tokio::test(start_paused = true)]
81+
async fn test_responder_interceptor_with_max_age() -> Result<()> {
82+
let icpr: Arc<dyn Interceptor + Send + Sync> = Responder::builder()
83+
.with_log2_size(3)
84+
.with_max_packet_age(Duration::from_millis(400))
85+
.build("")?;
86+
87+
let stream = MockStream::new(
88+
&StreamInfo {
89+
ssrc: 1,
90+
rtcp_feedback: vec![RTCPFeedback {
91+
typ: "nack".to_owned(),
92+
..Default::default()
93+
}],
94+
..Default::default()
95+
},
96+
icpr,
97+
)
98+
.await;
99+
100+
for seq_num in [10, 11, 12, 14, 15] {
101+
stream
102+
.write_rtp(&rtp::packet::Packet {
103+
header: rtp::header::Header {
104+
sequence_number: seq_num,
105+
..Default::default()
106+
},
107+
..Default::default()
108+
})
109+
.await?;
110+
tokio::time::advance(Duration::from_millis(30)).await;
111+
tokio::task::yield_now().await;
112+
113+
let p = stream.written_rtp().await.expect("A packet");
114+
assert_eq!(seq_num, p.header.sequence_number);
115+
}
116+
117+
// Advance time 300ms. Packets 10 and 11 will now have been sent 450ms and 420ms ago
118+
// respectively.
119+
tokio::time::advance(Duration::from_millis(300)).await;
120+
121+
stream
122+
.receive_rtcp(vec![Box::new(TransportLayerNack {
123+
media_ssrc: 1,
124+
sender_ssrc: 2,
125+
nacks: vec![
126+
NackPair {
127+
packet_id: 10,
128+
lost_packets: 0b10111,
129+
}, // sequence numbers: 11, 12, 13, 15
130+
],
131+
})])
132+
.await;
133+
tokio::task::yield_now().await;
134+
135+
// seq number 13 was never sent and seq number 10 and 11 is too late to resend now.
136+
for seq_num in [12, 15] {
137+
if let Some(p) = stream.written_rtp().await {
138+
assert_eq!(seq_num, p.header.sequence_number);
69139
} else {
70-
assert!(false, "seq_num {} is not sent yet", seq_num);
140+
assert!(
141+
false,
142+
"seq_num {} is not sent due to channel closed",
143+
seq_num
144+
);
71145
}
72146
}
73147

148+
// Resume time
149+
tokio::time::resume();
74150
let result = tokio::time::timeout(Duration::from_millis(10), stream.written_rtp()).await;
75151
assert!(result.is_err(), "no more rtp packets expected");
76152

0 commit comments

Comments
 (0)