Skip to content

Commit ead8d6d

Browse files
committed
Don't send receiver reports during simulcast probe
libwebrtc will stop sending SDES headers after the first receiver report on a SSRC. This breaks probing if the receiver report is sent before probing is completed. This change delays receiver reports for a SSRC until the first non-probe packet is received.
1 parent 4fe9221 commit ead8d6d

File tree

5 files changed

+111
-14
lines changed

5 files changed

+111
-14
lines changed

interceptor/src/lib.rs

+3
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,9 @@ pub mod twcc;
2424

2525
pub use error::Error;
2626

27+
/// Attribute indicating the stream is probing incoming packets.
28+
pub const ATTR_READ_PROBE: usize = 2295978936;
29+
2730
/// Attributes are a generic key/value store used by interceptors
2831
pub type Attributes = HashMap<usize, usize>;
2932

interceptor/src/report/receiver/mod.rs

+11-5
Original file line numberDiff line numberDiff line change
@@ -110,11 +110,11 @@ impl ReceiverReport {
110110
m.values().cloned().collect()
111111
};
112112
for stream in streams {
113-
let pkt = stream.generate_report(now);
114-
115-
let a = Attributes::new();
116-
if let Err(err) = rtcp_writer.write(&[Box::new(pkt)], &a).await{
117-
log::warn!("failed sending: {}", err);
113+
if let Some(pkt) = stream.generate_report(now) {
114+
let a = Attributes::new();
115+
if let Err(err) = rtcp_writer.write(&[Box::new(pkt)], &a).await{
116+
log::warn!("failed sending: {}", err);
117+
}
118118
}
119119
}
120120
}
@@ -186,11 +186,17 @@ impl Interceptor for ReceiverReport {
186186
info: &StreamInfo,
187187
reader: Arc<dyn RTPReader + Send + Sync>,
188188
) -> Arc<dyn RTPReader + Send + Sync> {
189+
let wait_for_probe = info
190+
.attributes
191+
.get(&crate::ATTR_READ_PROBE)
192+
.is_some_and(|v| *v != 0);
193+
189194
let stream = Arc::new(ReceiverStream::new(
190195
info.ssrc,
191196
info.clock_rate,
192197
reader,
193198
self.internal.now.clone(),
199+
wait_for_probe,
194200
));
195201
{
196202
let mut streams = self.internal.streams.lock().await;

interceptor/src/report/receiver/receiver_stream.rs

+20-6
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ struct ReceiverStreamInternal {
1313

1414
packets: Vec<u64>,
1515
started: bool,
16+
wait_for_probe: bool,
1617
seq_num_cycles: u16,
1718
last_seq_num: i32,
1819
last_report_seq_num: i32,
@@ -40,7 +41,7 @@ impl ReceiverStreamInternal {
4041
(self.packets[pos / 64] & (1 << (pos % 64))) != 0
4142
}
4243

43-
fn process_rtp(&mut self, now: SystemTime, pkt: &rtp::packet::Packet) {
44+
fn process_rtp(&mut self, now: SystemTime, pkt: &rtp::packet::Packet, is_probe: bool) {
4445
if !self.started {
4546
// first frame
4647
self.started = true;
@@ -79,6 +80,7 @@ impl ReceiverStreamInternal {
7980

8081
self.last_rtp_time_rtp = pkt.header.timestamp;
8182
self.last_rtp_time_time = now;
83+
self.wait_for_probe &= is_probe;
8284
}
8385

8486
fn process_sender_report(&mut self, now: SystemTime, sr: &rtcp::sender_report::SenderReport) {
@@ -158,6 +160,7 @@ impl ReceiverStream {
158160
clock_rate: u32,
159161
reader: Arc<dyn RTPReader + Send + Sync>,
160162
now: Option<FnTimeGen>,
163+
wait_for_probe: bool,
161164
) -> Self {
162165
let receiver_ssrc = rand::random::<u32>();
163166
ReceiverStream {
@@ -171,6 +174,7 @@ impl ReceiverStream {
171174

172175
packets: vec![0u64; 128],
173176
started: false,
177+
wait_for_probe,
174178
seq_num_cycles: 0,
175179
last_seq_num: 0,
176180
last_report_seq_num: 0,
@@ -184,9 +188,9 @@ impl ReceiverStream {
184188
}
185189
}
186190

187-
pub(crate) fn process_rtp(&self, now: SystemTime, pkt: &rtp::packet::Packet) {
191+
pub(crate) fn process_rtp(&self, now: SystemTime, pkt: &rtp::packet::Packet, is_probe: bool) {
188192
let mut internal = self.internal.lock();
189-
internal.process_rtp(now, pkt);
193+
internal.process_rtp(now, pkt, is_probe);
190194
}
191195

192196
pub(crate) fn process_sender_report(
@@ -198,9 +202,17 @@ impl ReceiverStream {
198202
internal.process_sender_report(now, sr);
199203
}
200204

201-
pub(crate) fn generate_report(&self, now: SystemTime) -> rtcp::receiver_report::ReceiverReport {
205+
pub(crate) fn generate_report(
206+
&self,
207+
now: SystemTime,
208+
) -> Option<rtcp::receiver_report::ReceiverReport> {
202209
let mut internal = self.internal.lock();
203-
internal.generate_report(now)
210+
211+
if internal.wait_for_probe {
212+
return None;
213+
}
214+
215+
Some(internal.generate_report(now))
204216
}
205217
}
206218

@@ -213,14 +225,16 @@ impl RTPReader for ReceiverStream {
213225
buf: &mut [u8],
214226
a: &Attributes,
215227
) -> Result<(rtp::packet::Packet, Attributes)> {
228+
let is_probe = a.get(&crate::ATTR_READ_PROBE).is_some_and(|v| *v != 0);
229+
216230
let (pkt, attr) = self.parent_rtp_reader.read(buf, a).await?;
217231

218232
let now = if let Some(f) = &self.now {
219233
f()
220234
} else {
221235
SystemTime::now()
222236
};
223-
self.process_rtp(now, &pkt);
237+
self.process_rtp(now, &pkt, is_probe);
224238

225239
Ok((pkt, attr))
226240
}

interceptor/src/report/receiver/receiver_test.rs

+67
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,73 @@ async fn test_receiver_interceptor_before_any_packet() -> Result<()> {
5858
Ok(())
5959
}
6060

61+
#[tokio::test(start_paused = true)]
62+
async fn test_receiver_interceptor_read_probe() -> Result<()> {
63+
let mt = Arc::new(MockTime::default());
64+
let time_gen = {
65+
let mt = Arc::clone(&mt);
66+
Arc::new(move || mt.now())
67+
};
68+
69+
let icpr: Arc<dyn Interceptor + Send + Sync> = ReceiverReport::builder()
70+
.with_interval(Duration::from_millis(50))
71+
.with_now_fn(time_gen)
72+
.build("")?;
73+
74+
let stream = MockStream::new(
75+
&StreamInfo {
76+
ssrc: 123456,
77+
clock_rate: 90000,
78+
attributes: [(crate::ATTR_READ_PROBE, 1)].into_iter().collect(),
79+
..Default::default()
80+
},
81+
icpr,
82+
)
83+
.await;
84+
85+
// no report initially
86+
tokio::time::timeout(Duration::from_millis(60), stream.written_rtcp())
87+
.await
88+
.expect_err("expected no report");
89+
90+
stream
91+
.receive_rtp(rtp::packet::Packet {
92+
header: rtp::header::Header {
93+
sequence_number: 7,
94+
..Default::default()
95+
},
96+
..Default::default()
97+
})
98+
.await;
99+
100+
let pkts = stream.written_rtcp().await.unwrap();
101+
assert_eq!(pkts.len(), 1);
102+
if let Some(rr) = pkts[0]
103+
.as_any()
104+
.downcast_ref::<rtcp::receiver_report::ReceiverReport>()
105+
{
106+
assert_eq!(rr.reports.len(), 1);
107+
assert_eq!(
108+
rr.reports[0],
109+
rtcp::reception_report::ReceptionReport {
110+
ssrc: 123456,
111+
last_sequence_number: 7,
112+
last_sender_report: 0,
113+
fraction_lost: 0,
114+
total_lost: 0,
115+
delay: 0,
116+
jitter: 0,
117+
}
118+
)
119+
} else {
120+
panic!();
121+
}
122+
123+
stream.close().await?;
124+
125+
Ok(())
126+
}
127+
61128
#[tokio::test]
62129
async fn test_receiver_interceptor_after_rtp_packets() -> Result<()> {
63130
let mt = Arc::new(MockTime::default());

webrtc/src/peer_connection/peer_connection_internal.rs

+10-3
Original file line numberDiff line numberDiff line change
@@ -1072,23 +1072,30 @@ impl PeerConnectionInternal {
10721072
None => return Err(Error::ErrInterceptorNotBind),
10731073
};
10741074

1075-
let stream_info = create_stream_info(
1075+
let mut stream_info = create_stream_info(
10761076
"".to_owned(),
10771077
ssrc,
10781078
params.codecs[0].payload_type,
10791079
params.codecs[0].capability.clone(),
10801080
&params.header_extensions,
10811081
None,
10821082
);
1083+
1084+
// indicate this stream starts with probing
1085+
stream_info
1086+
.attributes
1087+
.insert(interceptor::ATTR_READ_PROBE, 1);
1088+
10831089
let (rtp_read_stream, rtp_interceptor, rtcp_read_stream, rtcp_interceptor) = self
10841090
.dtls_transport
10851091
.streams_for_ssrc(ssrc, &stream_info, &icpr)
10861092
.await?;
10871093

1088-
let a = Attributes::new();
10891094
for _ in 0..=SIMULCAST_PROBE_COUNT {
10901095
if mid.is_empty() || (rid.is_empty() && rsid.is_empty()) {
1091-
let (pkt, _) = rtp_interceptor.read(&mut buf, &a).await?;
1096+
let (pkt, a) = rtp_interceptor
1097+
.read(&mut buf, &stream_info.attributes)
1098+
.await?;
10921099
let (m, r, rs, _) = handle_unknown_rtp_packet(
10931100
&pkt.header,
10941101
mid_extension_id as u8,

0 commit comments

Comments
 (0)