Skip to content

Commit 8c869b4

Browse files
Control max packet age for NACK responses
1 parent 0c35e61 commit 8c869b4

File tree

2 files changed

+59
-10
lines changed

2 files changed

+59
-10
lines changed

interceptor/src/nack/responder/mod.rs

+28-2
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,14 @@ use std::collections::HashMap;
1717
use std::future::Future;
1818
use std::pin::Pin;
1919
use std::sync::Arc;
20+
use std::time::Duration;
2021
use tokio::sync::Mutex;
2122

2223
/// GeneratorBuilder can be used to configure Responder Interceptor
2324
#[derive(Default)]
2425
pub struct ResponderBuilder {
2526
log2_size: Option<u8>,
27+
max_packet_age: Option<Duration>,
2628
}
2729

2830
impl ResponderBuilder {
@@ -32,6 +34,15 @@ impl ResponderBuilder {
3234
self.log2_size = Some(log2_size);
3335
self
3436
}
37+
38+
/// with_max_packet_age sets the max age of packets that will be resent.
39+
///
40+
/// When a resend is requested, packets that were first sent more than `max_packet_age` ago
41+
/// will not be resent.
42+
pub fn with_max_packet_age(mut self, max_packet_age: Duration) -> ResponderBuilder {
43+
self.max_packet_age = Some(max_packet_age);
44+
self
45+
}
3546
}
3647

3748
impl InterceptorBuilder for ResponderBuilder {
@@ -43,6 +54,7 @@ impl InterceptorBuilder for ResponderBuilder {
4354
} else {
4455
13 // 8192 = 1 << 13
4556
},
57+
max_packet_age: self.max_packet_age,
4658
streams: Arc::new(Mutex::new(HashMap::new())),
4759
}),
4860
}))
@@ -51,13 +63,15 @@ impl InterceptorBuilder for ResponderBuilder {
5163

5264
pub struct ResponderInternal {
5365
log2_size: u8,
66+
max_packet_age: Option<Duration>,
5467
streams: Arc<Mutex<HashMap<u32, Arc<ResponderStream>>>>,
5568
}
5669

5770
impl ResponderInternal {
5871
async fn resend_packets(
5972
streams: Arc<Mutex<HashMap<u32, Arc<ResponderStream>>>>,
6073
nack: TransportLayerNack,
74+
max_packet_age: Option<Duration>,
6175
) {
6276
let stream = {
6377
let m = streams.lock().await;
@@ -73,10 +87,19 @@ impl ResponderInternal {
7387
n.range(Box::new(
7488
move |seq: u16| -> Pin<Box<dyn Future<Output = bool> + Send + 'static>> {
7589
let stream3 = Arc::clone(&stream2);
90+
7691
Box::pin(async move {
7792
if let Some(p) = stream3.get(seq).await {
93+
let should_send = max_packet_age
94+
.map(|max_age| p.age() < max_age)
95+
.unwrap_or(true);
96+
97+
if !should_send {
98+
return true;
99+
}
100+
78101
let a = Attributes::new();
79-
if let Err(err) = stream3.next_rtp_writer.write(&p, &a).await {
102+
if let Err(err) = stream3.next_rtp_writer.write(&p.packet, &a).await {
80103
log::warn!("failed resending nacked packet: {}", err);
81104
}
82105
}
@@ -92,6 +115,7 @@ impl ResponderInternal {
92115

93116
pub struct ResponderRtcpReader {
94117
parent_rtcp_reader: Arc<dyn RTCPReader + Send + Sync>,
118+
max_packet_age: Option<Duration>,
95119
internal: Arc<ResponderInternal>,
96120
}
97121

@@ -106,8 +130,9 @@ impl RTCPReader for ResponderRtcpReader {
106130
if let Some(nack) = p.as_any().downcast_ref::<TransportLayerNack>() {
107131
let nack = nack.clone();
108132
let streams = Arc::clone(&self.internal.streams);
133+
let max_packet_age = self.max_packet_age;
109134
tokio::spawn(async move {
110-
ResponderInternal::resend_packets(streams, nack).await;
135+
ResponderInternal::resend_packets(streams, nack, max_packet_age).await;
111136
});
112137
}
113138
}
@@ -138,6 +163,7 @@ impl Interceptor for Responder {
138163
) -> Arc<dyn RTCPReader + Send + Sync> {
139164
Arc::new(ResponderRtcpReader {
140165
internal: Arc::clone(&self.internal),
166+
max_packet_age: self.internal.max_packet_age,
141167
parent_rtcp_reader: reader,
142168
}) as Arc<dyn RTCPReader + Send + Sync>
143169
}

interceptor/src/nack/responder/responder_stream.rs

+31-8
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,11 @@ use crate::{Attributes, RTPWriter};
44

55
use async_trait::async_trait;
66
use std::sync::Arc;
7+
use std::time::{Duration, Instant};
78
use tokio::sync::Mutex;
89

910
struct ResponderStreamInternal {
10-
packets: Vec<Option<rtp::packet::Packet>>,
11+
packets: Vec<Option<SentPacket>>,
1112
size: u16,
1213
last_added: u16,
1314
started: bool,
@@ -26,7 +27,7 @@ impl ResponderStreamInternal {
2627
fn add(&mut self, packet: &rtp::packet::Packet) {
2728
let seq = packet.header.sequence_number;
2829
if !self.started {
29-
self.packets[(seq % self.size) as usize] = Some(packet.clone());
30+
self.packets[(seq % self.size) as usize] = Some(packet.clone().into());
3031
self.last_added = seq;
3132
self.started = true;
3233
return;
@@ -43,11 +44,11 @@ impl ResponderStreamInternal {
4344
}
4445
}
4546

46-
self.packets[(seq % self.size) as usize] = Some(packet.clone());
47+
self.packets[(seq % self.size) as usize] = Some(packet.clone().into());
4748
self.last_added = seq;
4849
}
4950

50-
fn get(&self, seq: u16) -> Option<&rtp::packet::Packet> {
51+
fn get(&self, seq: u16) -> Option<&SentPacket> {
5152
let diff = self.last_added.wrapping_sub(seq);
5253
if diff >= UINT16SIZE_HALF {
5354
return None;
@@ -79,7 +80,7 @@ impl ResponderStream {
7980
internal.add(pkt);
8081
}
8182

82-
pub(super) async fn get(&self, seq: u16) -> Option<rtp::packet::Packet> {
83+
pub(super) async fn get(&self, seq: u16) -> Option<SentPacket> {
8384
let internal = self.internal.lock().await;
8485
internal.get(seq).cloned()
8586
}
@@ -96,6 +97,28 @@ impl RTPWriter for ResponderStream {
9697
}
9798
}
9899

100+
#[derive(Clone)]
101+
/// A packet that has been sent, or at least been queued to send.
102+
pub struct SentPacket {
103+
pub(super) packet: rtp::packet::Packet,
104+
sent_at: Instant,
105+
}
106+
107+
impl SentPacket {
108+
pub(super) fn age(&self) -> Duration {
109+
self.sent_at.elapsed()
110+
}
111+
}
112+
113+
impl From<rtp::packet::Packet> for SentPacket {
114+
fn from(packet: rtp::packet::Packet) -> Self {
115+
Self {
116+
packet,
117+
sent_at: Instant::now(),
118+
}
119+
}
120+
}
121+
99122
#[cfg(test)]
100123
mod test {
101124
use super::*;
@@ -127,9 +150,9 @@ mod test {
127150
let seq = start.wrapping_add(*n);
128151
if let Some(packet) = sb.get(seq) {
129152
assert_eq!(
130-
packet.header.sequence_number, seq,
153+
packet.packet.header.sequence_number, seq,
131154
"packet for {} returned with incorrect SequenceNumber: {}",
132-
seq, packet.header.sequence_number
155+
seq, packet.packet.header.sequence_number
133156
);
134157
} else {
135158
assert!(false, "packet not found: {}", seq);
@@ -144,7 +167,7 @@ mod test {
144167
assert!(
145168
false,
146169
"packet found for {}: {}",
147-
seq, packet.header.sequence_number
170+
seq, packet.packet.header.sequence_number
148171
);
149172
}
150173
}

0 commit comments

Comments
 (0)