From 60e225828ffc58b1ef95b156325a8a73de5de538 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=A3o=20Ant=C3=B4nio=20Cardoso?= Date: Sat, 13 Apr 2024 14:43:25 -0300 Subject: [PATCH] src: stream: Update to work with Stream async API --- src/stream/manager.rs | 394 +++++++++++++++++------------------------- 1 file changed, 161 insertions(+), 233 deletions(-) diff --git a/src/stream/manager.rs b/src/stream/manager.rs index 1865a517..11d70444 100644 --- a/src/stream/manager.rs +++ b/src/stream/manager.rs @@ -17,8 +17,8 @@ use anyhow::{anyhow, Context, Error, Result}; type ClonableResult = Result>; -use async_std::stream::StreamExt; use cached::proc_macro::cached; +use futures::stream::StreamExt; use tracing::*; use super::{ @@ -39,20 +39,21 @@ lazy_static! { impl Manager { #[instrument(level = "debug", skip(self))] - fn update_settings(&self) { - let video_and_stream_informations = self - .streams - .values() - .filter_map(|stream| match stream.state.read() { - Ok(guard) => guard + async fn update_settings(&self) { + use futures::StreamExt; + + let streams = futures::stream::iter(self.streams.values()); + + let video_and_stream_informations = + futures::StreamExt::filter_map(streams, |stream| async move { + let stream_guard = stream.state.read().await; + + stream_guard .as_ref() - .map(|state| state.video_and_stream_information.clone()), - Err(error) => { - error!("Failed locking a Mutex. Reason: {error}"); - None - } + .map(|state| state.video_and_stream_information.clone()) }) - .collect::>(); + .collect::>() + .await; settings::manager::set_streams(video_and_stream_informations.as_slice()); } @@ -195,39 +196,42 @@ pub async fn streams() -> Result> { pub async fn get_first_sdp_from_source(source: String) -> ClonableResult { let manager = MANAGER.read().await; - let Some(result) = manager.streams.values().find_map(|stream| { - let state_guard = match stream.state.read() { - Ok(guard) => guard, - Err(error) => { - error!("Failed locking a Mutex. Reason: {error}"); - return None; - } - }; + let result = futures::stream::iter(manager.streams.values()) + .filter_map(|stream| { + let source = &source; + + let future = async move { + let state_guard = stream.state.read().await; + + let state_ref = state_guard.as_ref()?; + + if state_ref + .video_and_stream_information + .video_source + .inner() + .source_string() + .eq(source) + { + return state_ref + .pipeline + .inner_state_as_ref() + .sinks + .values() + .find_map(|sink| sink.get_sdp().ok()); + } - state_guard.as_ref().and_then(|state| { - if state - .video_and_stream_information - .video_source - .inner() - .source_string() - == source - { - state - .pipeline - .inner_state_as_ref() - .sinks - .values() - .find_map(|sink| sink.get_sdp().ok()) - } else { None - } + }; + Box::pin(future) }) - }) else { - return Err(Arc::new(anyhow!( - "Failed to find any valid sdp for souce {source:?}" - ))); - }; - Ok(result) + .next() + .await + .context(format!("Failed to find any valid sdp for souce {source:?}")) + .map_err(Arc::new); + + drop(manager); + + result } #[instrument(level = "debug")] @@ -273,64 +277,55 @@ pub async fn get_jpeg_thumbnail_from_source( .build() .expect("Failed building a new tokio runtime") .block_on(async move { - let res = async move { - let manager = MANAGER.read().await; - - let stream = manager.streams.values().find(|stream| { - let state = match stream.state.read() { - Ok(guard) => guard, - Err(error) => { - error!("Failed locking a Mutex. Reason: {error}"); - return false; - } - }; + let manager = MANAGER.read().await; - state.as_ref().is_some_and(|state| { - state + let res = futures::stream::iter(&manager.streams) + .filter_map(|(_id, stream)| { + let source = &source; + + let future = async move { + let state_guard = stream.state.read().await; + + let state_ref = state_guard.as_ref()?; + + if !state_ref .video_and_stream_information .video_source .inner() .source_string() - == source - }) - })?; - - let state_guard = match stream.state.read() { - Ok(guard) => { - if guard.is_none() { + .eq(source) + { return None; } - guard - } - Err(error) => { - error!("Failed locking a Mutex. Reason: {error}"); - return None; - } - }; - - let mut sinks = futures::stream::iter( - state_guard - .as_ref() - .unwrap() - .pipeline - .inner_state_as_ref() - .sinks - .values(), - ); - let Some(Sink::Image(image_sink)) = - sinks.find(|sink| matches!(sink, Sink::Image(_))).await - else { - return None; - }; - - Some( - image_sink - .make_jpeg_thumbnail_from_last_frame(quality, target_height) - .await - .map_err(Arc::new), - ) - } - .await; + + let sinks = state_ref.pipeline.inner_state_as_ref().sinks.values(); + + let sink = futures::stream::iter(sinks) + .filter_map(|sink| { + let future = async move { + matches!(sink, Sink::Image(_)).then_some(sink) + }; + + Box::pin(future) + }) + .next() + .await?; + + let Sink::Image(image_sink) = sink else { + return None; + }; + + Some( + image_sink + .make_jpeg_thumbnail_from_last_frame(quality, target_height) + .await + .map_err(Arc::new), + ) + }; + Box::pin(future) + }) + .next() + .await; let _ = tx.send(res); }); @@ -349,21 +344,11 @@ pub async fn add_stream_and_start( { let manager = MANAGER.read().await; for stream in manager.streams.values() { - let state_guard = match stream.state.read() { - Ok(guard) => { - if guard.is_none() { - return Err(anyhow!("Stream without State")); - } - guard - } - Err(error) => { - return Err(anyhow!("Failed locking a Mutex. Reason: {error}")); - } - }; + let state_guard = stream.state.read().await; + + let state_ref = state_guard.as_ref().context("Stream without State")?; - state_guard - .as_ref() - .unwrap() + state_ref .video_and_stream_information .conflicts_with(&video_and_stream_information)?; } @@ -376,41 +361,40 @@ pub async fn add_stream_and_start( } #[instrument(level = "debug")] -pub async fn remove_stream_by_name(stream_name: &str) -> Result<()> { +async fn get_stream_id_from_name(stream_name: &str) -> Result { let manager = MANAGER.read().await; - if let Some(stream_id) = &manager.streams.iter().find_map(|(id, stream)| { - let state_guard = match stream.state.read() { - Ok(guard) => { - if guard.is_none() { - warn!("Stream without State, skipping..."); - return None; - } - guard - } - Err(error) => { - error!("Failed locking a Mutex. Reason: {error}"); - return None; - } - }; + let stream_id = futures::stream::iter(&manager.streams) + .filter_map(|(id, stream)| { + let future = async move { + let state_guard = stream.state.read().await; - if state_guard - .as_ref() - .unwrap() - .video_and_stream_information - .name - == *stream_name - { - return Some(*id); - } - None - }) { - drop(manager); - Manager::remove_stream(stream_id).await?; - return Ok(()); - } + let state_ref = state_guard.as_ref()?; + + state_ref + .video_and_stream_information + .name + .eq(stream_name) + .then_some(*id) + }; + Box::pin(future) + }) + .next() + .await + .context(format!("Stream named {stream_name:?} not found")); - Err(anyhow!("Stream named {stream_name:?} not found")) + drop(manager); + + stream_id +} + +#[instrument(level = "debug")] +pub async fn remove_stream_by_name(stream_name: &str) -> Result<()> { + let stream_id = get_stream_id_from_name(stream_name).await?; + + Manager::remove_stream(&stream_id).await?; + + Ok(()) } impl Manager { @@ -437,19 +421,11 @@ impl Manager { let sink = Sink::WebRTC(WebRTCSink::try_new(bind, sender)?); - let mut state_guard = match stream.state.write() { - Ok(guard) => { - if guard.is_none() { - return Err(anyhow!("Stream without State")); - } - guard - } - Err(error) => { - return Err(anyhow!("Failed locking a Mutex. Reason: {error}")); - } - }; + let mut state_guard = stream.state.write().await; + + let state_mut = state_guard.as_mut().context("Stream without State")?; - state_guard.as_mut().unwrap().pipeline.add_sink(sink)?; + state_mut.pipeline.add_sink(sink)?; debug!("WebRTC session created: {session_id:?}"); @@ -468,21 +444,11 @@ impl Manager { .get_mut(&bind.producer_id) .context(format!("Producer {:?} not found", bind.producer_id))?; - let mut state_guard = match stream.state.write() { - Ok(guard) => { - if guard.is_none() { - return Err(anyhow!("Stream without State")); - } - guard - } - Err(error) => { - return Err(anyhow!("Failed locking a Mutex. Reason: {error}")); - } - }; + let mut state_guard = stream.state.write().await; + + let state_mut = state_guard.as_mut().context("Stream without State")?; - state_guard - .as_mut() - .unwrap() + state_mut .pipeline .remove_sink(&bind.session_id) .context(format!("Cannot remove session {:?}", bind.session_id))?; @@ -499,27 +465,16 @@ impl Manager { ) -> Result<()> { let manager = MANAGER.read().await; - let state_guard = match manager + let stream = manager .streams .get(&bind.producer_id) - .context(format!("Producer {:?} not found", bind.producer_id))? - .state - .read() - { - Ok(guard) => { - if guard.is_none() { - return Err(anyhow!("Stream without State")); - } - guard - } - Err(error) => { - return Err(anyhow!("Failed locking a Mutex. Reason: {error}")); - } - }; + .context(format!("Producer {:?} not found", bind.producer_id))?; - let sink = state_guard - .as_ref() - .unwrap() + let state_guard = stream.state.read().await; + + let state_ref = state_guard.as_ref().context("Stream without State")?; + + let sink = state_ref .pipeline .inner_state_as_ref() .sinks @@ -556,27 +511,16 @@ impl Manager { ) -> Result<()> { let manager = MANAGER.read().await; - let state_guard = match manager + let stream = manager .streams .get(&bind.producer_id) - .context(format!("Producer {:?} not found", bind.producer_id))? - .state - .read() - { - Ok(guard) => { - if guard.is_none() { - return Err(anyhow!("Stream without State")); - } - guard - } - Err(error) => { - return Err(anyhow!("Failed locking a Mutex. Reason: {error}")); - } - }; + .context(format!("Producer {:?} not found", bind.producer_id))?; - let sink = state_guard - .as_ref() - .unwrap() + let state_guard = stream.state.read().await; + + let state_ref = state_guard.as_ref().context("Stream without State")?; + + let sink = state_ref .pipeline .inner_state_as_ref() .sinks @@ -598,22 +542,18 @@ impl Manager { pub async fn add_stream(stream: Stream) -> Result<()> { let mut manager = MANAGER.write().await; - let stream_id = match stream.state.read() { - Ok(guard) => { - if guard.is_none() { - return Err(anyhow!("Stream without State")); - } - guard.as_ref().unwrap().pipeline_id - } - Err(error) => { - return Err(anyhow!("Failed locking a Mutex. Reason: {error}")); - } + let stream_id = { + let state_guard = stream.state.read().await; + + let state_ref = state_guard.as_ref().context("Stream without State")?; + + state_ref.pipeline_id }; if manager.streams.insert(stream_id, stream).is_some() { return Err(anyhow!("Failed adding stream {stream_id:?}")); } - manager.update_settings(); + manager.update_settings().await; info!("Stream {stream_id} successfully added!"); @@ -632,8 +572,7 @@ impl Manager { .streams .remove(stream_id) .context(format!("Stream {stream_id:?} not found"))?; - manager.update_settings(); - drop(manager); + manager.update_settings().await; info!("Stream {stream_id} successfully removed!"); @@ -644,25 +583,11 @@ impl Manager { pub async fn streams_information() -> Result> { let manager = MANAGER.read().await; - Ok(manager - .streams - .values() - .filter_map(|stream| { - let state_guard = match stream.state.read() { - Ok(guard) => { - if guard.is_none() { - warn!("Stream without State, skipping..."); - return None; - } - guard - } - Err(error) => { - error!("Failed locking a Mutex. Reason: {error}"); - return None; - } - }; + let status = futures::stream::iter(manager.streams.values()) + .filter_map(|stream| async move { + let state_guard = stream.state.read().await; - let state_ref = state_guard.as_ref().unwrap(); + let state_ref = state_guard.as_ref()?; Some(StreamStatus { id: state_ref.pipeline_id, @@ -670,7 +595,10 @@ impl Manager { video_and_stream: state_ref.video_and_stream_information.clone(), }) }) - .collect()) + .collect() + .await; + + Ok(status) } #[instrument(level = "debug")]