Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve Redirect to WebRTC compatibility #459

Merged
merged 4 commits into from
Dec 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/lib/controls/onvif/camera.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use serde::{Deserialize, Serialize};
use tokio::sync::RwLock;
use tracing::*;

use crate::{stream::gst::utils::get_encode_from_rtspsrc, video::types::Format};
use crate::{stream::gst::utils::get_encode_from_stream_uri, video::types::Format};

use super::manager::OnvifDevice;

Expand Down Expand Up @@ -243,7 +243,7 @@ impl OnvifCamera {
trace!("Using credentials {credentials:?}");
}

let Some(encode) = get_encode_from_rtspsrc(&stream_uri).await else {
let Some(encode) = get_encode_from_stream_uri(&stream_uri).await else {
warn!("Failed getting encoding from RTSP stream at {stream_uri}");
continue;
};
Expand Down
95 changes: 63 additions & 32 deletions src/lib/stream/gst/utils.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use anyhow::{anyhow, Result};
use gst::prelude::*;
use tokio::sync::mpsc;
use tracing::*;

use crate::video::types::VideoEncodeType;
Expand Down Expand Up @@ -110,32 +111,63 @@ pub async fn wait_for_element_state_async(
}

#[instrument(level = "debug")]
pub async fn get_encode_from_rtspsrc(stream_uri: &url::Url) -> Option<VideoEncodeType> {
pub async fn get_encode_from_stream_uri(stream_uri: &url::Url) -> Option<VideoEncodeType> {
use gst::prelude::*;

let description = format!(
concat!(
"rtspsrc location={location} is-live=true latency=0",
" ! fakesink name=fakesink sync=false"
),
location = stream_uri.to_string(),
);
let description = match stream_uri.scheme() {
"rtsp" => {
format!(
concat!(
"rtspsrc location={location} is-live=true latency=0",
" ! typefind name=typefinder minimum=1",
" ! fakesink name=fakesink sync=false"
),
location = stream_uri.to_string(),
)
}
"udp" => {
format!(
concat!(
"udpsrc address={address} port={port} close-socket=false auto-multicast=true",
" ! typefind name=typefinder minimum=1",
" ! fakesink name=fakesink sync=false"
),
address = stream_uri.host()?,
port = stream_uri.port()?,
)
}
unsupported => {
warn!("Scheme {unsupported:#?} is not supported for Redirect Pipelines");
return None;
}
};

let pipeline = gst::parse::launch(&description)
.expect("Failed to create pipeline")
.downcast::<gst::Pipeline>()
.expect("Pipeline is not a valid gst::Pipeline");

let typefinder = pipeline.by_name("typefinder")?;

let (tx, rx) = mpsc::channel(10);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A oneshot makes more sense here, no ?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That was my first try, but oneshot requires T to impl copy :(


typefinder.connect("have-type", false, move |values| {
let _typefinder = values[0].get::<gst::Element>().expect("Invalid argument");
let _probability = values[1].get::<u32>().expect("Invalid argument");
let caps = values[2].get::<gst::Caps>().expect("Invalid argument");

if let Err(error) = tx.blocking_send(caps) {
error!("Failed sending caps from typefinder: {error:?}");
}

None
});

pipeline
.set_state(gst::State::Playing)
.expect("Failed to set pipeline to Playing");

let fakesink = pipeline
.by_name("fakesink")
.expect("Fakesink not found in pipeline");
let pad = fakesink.static_pad("sink").expect("Sink pad not found");

let encode = tokio::time::timeout(tokio::time::Duration::from_secs(15), wait_for_encode(pad))
let encode = tokio::time::timeout(tokio::time::Duration::from_secs(15), wait_for_encode(rx))
.await
.ok()
.flatten();
Expand All @@ -147,23 +179,22 @@ pub async fn get_encode_from_rtspsrc(stream_uri: &url::Url) -> Option<VideoEncod
encode
}

pub async fn wait_for_encode(pad: gst::Pad) -> Option<VideoEncodeType> {
loop {
if let Some(caps) = pad.current_caps() {
trace!("caps from rtspsrc: {caps:?}");

if let Some(structure) = caps.structure(0) {
if let Ok(encoding_name) = structure.get::<String>("encoding-name") {
let encoding = match encoding_name.to_ascii_uppercase().as_str() {
"H264" => Some(VideoEncodeType::H264),
"H265" => Some(VideoEncodeType::H265),
_unsupported => None,
};

break encoding;
}
}
}
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
pub async fn wait_for_encode(mut rx: mpsc::Receiver<gst::Caps>) -> Option<VideoEncodeType> {
if let Some(caps) = rx.recv().await {
let structure = caps.structure(0)?;

let encoding_name = structure.get::<String>("encoding-name").ok()?;

let encoding = match encoding_name.to_ascii_uppercase().as_str() {
"H264" => Some(VideoEncodeType::H264),
"H265" => Some(VideoEncodeType::H265),
_unsupported => None,
};

trace!("Found encoding {encoding:?} from caps: {caps:?}");

return encoding;
}

return None;
}
40 changes: 37 additions & 3 deletions src/lib/stream/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@ pub mod webrtc;
use std::sync::Arc;

use ::gst::prelude::*;
use anyhow::{anyhow, Result};
use anyhow::{anyhow, Context, Result};
use gst::utils::get_encode_from_stream_uri;
use manager::Manager;
use pipeline::Pipeline;
use sink::{create_image_sink, create_rtsp_sink, create_udp_sink};
Expand All @@ -21,7 +22,7 @@ use webrtc::signalling_protocol::PeerId;
use crate::{
mavlink::mavlink_camera::MavlinkCamera,
video::{
types::{VideoEncodeType, VideoSourceType},
types::{FrameInterval, VideoEncodeType, VideoSourceType},
video_source::cameras_available,
},
video_stream::types::VideoAndStreamInformation,
Expand Down Expand Up @@ -53,8 +54,41 @@ impl Stream {
pub async fn try_new(video_and_stream_information: &VideoAndStreamInformation) -> Result<Self> {
let pipeline_id = Manager::generate_uuid();

let video_and_stream_information = {
let mut video_and_stream_information = video_and_stream_information.clone();

if matches!(
video_and_stream_information.video_source,
VideoSourceType::Redirect(_)
) {
let url = video_and_stream_information
.stream_information
.endpoints
.first()
.context("No URL found")?;

let Some(encode) = get_encode_from_stream_uri(&url).await else {
return Err(anyhow!("No encode found for stream"));
};

video_and_stream_information
.stream_information
.configuration = CaptureConfiguration::Video(VideoCaptureConfiguration {
encode,
height: 0,
width: 0,
frame_interval: FrameInterval {
numerator: 0,
denominator: 0,
},
});
}

video_and_stream_information
};

let state = Arc::new(RwLock::new(Some(
StreamState::try_new(video_and_stream_information, &pipeline_id).await?,
StreamState::try_new(&video_and_stream_information, &pipeline_id).await?,
)));

let terminated = Arc::new(RwLock::new(false));
Expand Down
81 changes: 66 additions & 15 deletions src/lib/stream/pipeline/redirect_pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,15 @@ use gst::prelude::*;
use tracing::*;

use crate::{
stream::types::CaptureConfiguration, video::types::VideoSourceType,
stream::types::CaptureConfiguration,
video::types::{VideoEncodeType, VideoSourceType},
video_stream::types::VideoAndStreamInformation,
};

use super::{PipelineGstreamerInterface, PipelineState, PIPELINE_RTP_TEE_NAME};
use super::{
PipelineGstreamerInterface, PipelineState, PIPELINE_FILTER_NAME, PIPELINE_RTP_TEE_NAME,
PIPELINE_VIDEO_TEE_NAME,
};

#[derive(Debug)]
pub struct RedirectPipeline {
Expand All @@ -24,7 +28,7 @@ impl RedirectPipeline {
.stream_information
.configuration
{
CaptureConfiguration::Redirect(configuration) => configuration,
CaptureConfiguration::Video(configuration) => configuration,
unsupported => {
return Err(anyhow!(
"{unsupported:?} is not supported as Redirect Pipeline"
Expand Down Expand Up @@ -55,39 +59,86 @@ impl RedirectPipeline {
.first()
.context("Failed to access the fisrt endpoint")?;

let sink_tee_name = format!("{PIPELINE_RTP_TEE_NAME}-{pipeline_id}");

let description = match url.scheme() {
let source_description = match url.scheme() {
"rtsp" => {
format!(
concat!(
"rtspsrc location={location} is-live=true latency=0",
" ! application/x-rtp",
" ! tee name={sink_tee_name} allow-not-linked=true"
),
location = url,
sink_tee_name = sink_tee_name,
)
}
"udp" => {
format!(
concat!(
"udpsrc address={address} port={port} close-socket=false auto-multicast=true",
" ! application/x-rtp",
),
address = url.host().context("UDP URL without host")?,
port = url.port().context("UDP URL without port")?,
)
}
unsupported => {
return Err(anyhow!(
"Scheme {unsupported:#?} is not supported for Redirect Pipelines"
))
}
};

let encode = match &video_and_stream_information
.stream_information
.configuration
{
CaptureConfiguration::Video(configuration) => Some(configuration.encode.clone()),
_unknown => None,
};

let filter_name = format!("{PIPELINE_FILTER_NAME}-{pipeline_id}");
let video_tee_name = format!("{PIPELINE_VIDEO_TEE_NAME}-{pipeline_id}");
let rtp_tee_name = format!("{PIPELINE_RTP_TEE_NAME}-{pipeline_id}");

let description = match encode {
Some(VideoEncodeType::H264) => {
format!(
concat!(
"udpsrc address={address} port={port} close-socket=false auto-multicast=true",
" ! application/x-rtp",
" ! tee name={sink_tee_name} allow-not-linked=true"
" ! rtph264depay",
// " ! h264parse", // we might want to add this in the future to expand the compatibility, since it can transform the stream format
" ! capsfilter name={filter_name} caps=video/x-h264,stream-format=avc,alignment=au",
" ! tee name={video_tee_name} allow-not-linked=true",
" ! rtph264pay aggregate-mode=zero-latency config-interval=10 pt=96",
" ! tee name={rtp_tee_name} allow-not-linked=true"
),
filter_name = filter_name,
video_tee_name = video_tee_name,
rtp_tee_name = rtp_tee_name,
)
}
Some(VideoEncodeType::H265) => {
format!(
concat!(
" ! rtph265depay",
// " ! h265parse", // we might want to add this in the future to expand the compatibility, since it can transform the stream format
" ! capsfilter name={filter_name} caps=video/x-h265,profile={profile},stream-format=byte-stream,alignment=au",
" ! tee name={video_tee_name} allow-not-linked=true",
" ! rtph265pay aggregate-mode=zero-latency config-interval=10 pt=96",
" ! tee name={rtp_tee_name} allow-not-linked=true"
),
address = url.host().context("UDP URL without host")?,
port = url.port().context("UDP URL without port")?,
sink_tee_name = sink_tee_name,
filter_name = filter_name,
video_tee_name = video_tee_name,
profile = "main",
rtp_tee_name = rtp_tee_name,
)
}
unsupported => {
return Err(anyhow!(
"Scheme {unsupported:#?} is not supported for Redirect Pipelines"
"Encode {unsupported:?} is not supported for Redirect Pipeline"
))
}
};

let description = format!("{source_description} {description}");

let pipeline = gst::parse::launch(&description)?;

let pipeline = pipeline
Expand Down
Loading