Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Better tasking less locks #318

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ async fn main() -> Result<(), std::io::Error> {

stream::webrtc::signalling_server::SignallingServer::default();

if let Err(error) = stream::manager::start_default() {
if let Err(error) = stream::manager::start_default().await {
error!("Failed to start default streams. Reason: {error:?}")
}

Expand Down
30 changes: 8 additions & 22 deletions src/mavlink/mavlink_camera.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ use super::utils::*;
#[derive(Debug)]
pub struct MavlinkCameraHandle {
inner: Arc<MavlinkCamera>,
_runtime: tokio::runtime::Runtime,
heartbeat_handle: tokio::task::JoinHandle<()>,
messages_handle: tokio::task::JoinHandle<()>,
}
Expand All @@ -34,33 +33,18 @@ struct MavlinkCamera {

impl MavlinkCameraHandle {
#[instrument(level = "debug")]
pub fn try_new(video_and_stream_information: &VideoAndStreamInformation) -> Result<Self> {
pub async fn try_new(video_and_stream_information: &VideoAndStreamInformation) -> Result<Self> {
let inner = Arc::new(MavlinkCamera::try_new(video_and_stream_information)?);

let sender = crate::mavlink::manager::Manager::get_sender();

let runtime = tokio::runtime::Builder::new_multi_thread()
.on_thread_start(|| debug!("Thread started"))
.on_thread_stop(|| debug!("Thread stopped"))
.thread_name_fn(|| {
static ATOMIC_ID: std::sync::atomic::AtomicUsize =
std::sync::atomic::AtomicUsize::new(0);
let id = ATOMIC_ID.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
format!("MavlinkCamera-{id}")
})
.worker_threads(2)
.enable_all()
.build()
.expect("Failed building a new tokio runtime");

let heartbeat_handle =
runtime.spawn(MavlinkCamera::heartbeat_loop(inner.clone(), sender.clone()));
tokio::spawn(MavlinkCamera::heartbeat_loop(inner.clone(), sender.clone()));
let messages_handle =
runtime.spawn(MavlinkCamera::messages_loop(inner.clone(), sender.clone()));
tokio::spawn(MavlinkCamera::messages_loop(inner.clone(), sender.clone()));

Ok(Self {
inner,
_runtime: runtime,
heartbeat_handle,
messages_handle,
})
Expand Down Expand Up @@ -165,14 +149,16 @@ impl MavlinkCamera {
let mut receiver = sender.subscribe();

loop {
let (header, message) = match receiver.recv().await {
tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
// Note: we can't block this task awaiting for recv, otherwise it might not die. This is why we use try_recv here
let (header, message) = match receiver.try_recv() {
Ok(Message::Received(message)) => message,
Err(broadcast::error::RecvError::Closed) => {
Err(broadcast::error::TryRecvError::Closed) => {
unreachable!(
"Closed channel: This should never happen, this channel is static!"
);
}
Ok(Message::ToBeSent(_)) | Err(broadcast::error::RecvError::Lagged(_)) => continue,
Ok(Message::ToBeSent(_)) | Err(_) => continue,
};

trace!("Message received: {header:?}, {message:?}");
Expand Down
36 changes: 18 additions & 18 deletions src/server/manager.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use super::pages;

use actix_extensible_rate_limit::{
backend::{memory::InMemoryBackend, SimpleInputFunctionBuilder},

Check warning on line 4 in src/server/manager.rs

View workflow job for this annotation

GitHub Actions / build (ubuntu-22.04, x86_64-unknown-linux-gnu, linux-desktop)

unused imports: `RateLimiter`, `SimpleInputFunctionBuilder`, `memory::InMemoryBackend`

Check warning on line 4 in src/server/manager.rs

View workflow job for this annotation

GitHub Actions / build (ubuntu-22.04, armv7-unknown-linux-gnueabihf, armv7)

unused imports: `RateLimiter`, `SimpleInputFunctionBuilder`, `memory::InMemoryBackend`
RateLimiter,
};
use actix_service::Service;
Expand All @@ -27,9 +27,8 @@
App::new()
// Add debug call for API access
.wrap_fn(|req, srv| {
trace!("{:#?}", &req);
let fut = srv.call(req);
async { fut.await }
trace!("{req:#?}");
srv.call(req)
})
.wrap(TracingLogger::default())
.wrap(actix_web::middleware::Logger::default())
Expand Down Expand Up @@ -68,25 +67,26 @@
)
.route("/xml", web::get().to(pages::xml))
.route("/sdp", web::get().to(pages::sdp))
.service(
web::scope("/thumbnail")
// Add a rate limitter to prevent flood
.wrap(
RateLimiter::builder(
InMemoryBackend::builder().build(),
SimpleInputFunctionBuilder::new(std::time::Duration::from_secs(1), 4)
.real_ip_key()
.build(),
)
.add_headers()
.build(),
)
.route("", web::get().to(pages::thumbnail)),
)
// .service(
// web::scope("/thumbnail")
// // Add a rate limitter to prevent flood
// .wrap(
// RateLimiter::builder(
// InMemoryBackend::builder().build(),
// SimpleInputFunctionBuilder::new(std::time::Duration::from_secs(1), 4)
// .real_ip_key()
// .build(),
// )
// .add_headers()
// .build(),
// )
// .route("", web::get().to(pages::thumbnail)),
// )
Comment on lines +70 to +84
Copy link
Collaborator Author

@joaoantoniocardoso joaoantoniocardoso Nov 30, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This needs to be reworked with care to work with only one worker...

.build()
})
.bind(server_address)
.expect("Failed starting web API")
.workers(1)
.run()
.await
}
84 changes: 43 additions & 41 deletions src/server/pages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
xml,
};
use crate::video_stream::types::VideoAndStreamInformation;
use actix_web::http::header;

Check warning on line 11 in src/server/pages.rs

View workflow job for this annotation

GitHub Actions / build (ubuntu-22.04, x86_64-unknown-linux-gnu, linux-desktop)

unused import: `actix_web::http::header`

Check warning on line 11 in src/server/pages.rs

View workflow job for this annotation

GitHub Actions / build (ubuntu-22.04, armv7-unknown-linux-gnueabihf, armv7)

unused import: `actix_web::http::header`
use actix_web::{
web::{self, Json},
HttpRequest, HttpResponse,
Expand Down Expand Up @@ -70,7 +70,7 @@

#[derive(Apiv2Schema, Debug, Deserialize, Validate)]
pub struct ThumbnailFileRequest {
source: String,

Check warning on line 73 in src/server/pages.rs

View workflow job for this annotation

GitHub Actions / build (ubuntu-22.04, x86_64-unknown-linux-gnu, linux-desktop)

field `source` is never read

Check warning on line 73 in src/server/pages.rs

View workflow job for this annotation

GitHub Actions / build (ubuntu-22.04, armv7-unknown-linux-gnueabihf, armv7)

field `source` is never read
/// The Quality level (a percentage value as an integer between 1 and 100) is inversely proportional to JPEG compression level, which means the higher, the best.
#[validate(range(min = 1, max = 100))]
quality: Option<u8>,
Expand Down Expand Up @@ -232,7 +232,7 @@
pub async fn reset_settings(query: web::Query<ResetSettings>) -> HttpResponse {
if query.all.unwrap_or_default() {
settings::manager::reset();
if let Err(error) = stream_manager::start_default() {
if let Err(error) = stream_manager::start_default().await {
return HttpResponse::InternalServerError()
.content_type("text/plain")
.body(format!("{error:#?}"));
Expand Down Expand Up @@ -269,7 +269,7 @@

#[api_v2_operation]
/// Create a video stream
pub fn streams_post(json: web::Json<PostStream>) -> HttpResponse {
pub async fn streams_post(json: web::Json<PostStream>) -> HttpResponse {
let json = json.into_inner();

let video_source = match video_source::get_video_source(&json.source) {
Expand All @@ -285,7 +285,9 @@
name: json.name,
stream_information: json.stream_information,
video_source,
}) {
})
.await
{
return HttpResponse::NotAcceptable()
.content_type("text/plain")
.body(format!("{error:#?}"));
Expand Down Expand Up @@ -424,41 +426,41 @@
}
}

#[api_v2_operation]
/// Provides a thumbnail file of the given source
pub async fn thumbnail(thumbnail_file_request: web::Query<ThumbnailFileRequest>) -> HttpResponse {
// Ideally, we should be using `actix_web_validator::Query` instead of `web::Query`,
// but because paperclip (at least until 0.8) is using `actix-web-validator 3.x`,
// and `validator 0.14`, the newest api needed to use it along #[api_v2_operation]
// wasn't implemented yet, it doesn't compile.
// To workaround this, we are manually calling the validator here, using actix to
// automatically handle the validation error for us as it normally would.
// TODO: update this function to use `actix_web_validator::Query` directly and get
// rid of this workaround.
if let Err(errors) = thumbnail_file_request.validate() {
warn!("Failed validating ThumbnailFileRequest. Reason: {errors:?}");
return actix_web::ResponseError::error_response(&actix_web_validator::Error::from(errors));
}

let source = thumbnail_file_request.source.clone();
let quality = thumbnail_file_request.quality.unwrap_or(70u8);
let target_height = thumbnail_file_request.target_height.map(|v| v as u32);

match stream_manager::get_jpeg_thumbnail_from_source(source, quality, target_height).await {
Some(Ok(image)) => HttpResponse::Ok().content_type("image/jpeg").body(image),
None => HttpResponse::NotFound()
.content_type("text/plain")
.body(format!(
"Thumbnail not found for source {:?}.",
thumbnail_file_request.source
)),
Some(Err(error)) => HttpResponse::ServiceUnavailable()
.reason("Thumbnail temporarily unavailable")
.insert_header((header::RETRY_AFTER, 10))
.content_type("text/plain")
.body(format!(
"Thumbnail for source {:?} is temporarily unavailable. Try again later. Details: {error:?}",
thumbnail_file_request.source
)),
}
}
// #[api_v2_operation]
// /// Provides a thumbnail file of the given source
// pub async fn thumbnail(thumbnail_file_request: web::Query<ThumbnailFileRequest>) -> HttpResponse {
// Ideally, we should be using `actix_web_validator::Query` instead of `web::Query`,
// but because paperclip (at least until 0.8) is using `actix-web-validator 3.x`,
// and `validator 0.14`, the newest api needed to use it along #[api_v2_operation]
// wasn't implemented yet, it doesn't compile.
// To workaround this, we are manually calling the validator here, using actix to
// automatically handle the validation error for us as it normally would.
// TODO: update this function to use `actix_web_validator::Query` directly and get
// rid of this workaround.
// if let Err(errors) = thumbnail_file_request.validate() {
// warn!("Failed validating ThumbnailFileRequest. Reason: {errors:?}");
// return actix_web::ResponseError::error_response(&actix_web_validator::Error::from(errors));
// }

// let source = thumbnail_file_request.source.clone();
// let quality = thumbnail_file_request.quality.unwrap_or(70u8);
// let target_height = thumbnail_file_request.target_height.map(|v| v as u32);

// match stream_manager::get_jpeg_thumbnail_from_source(source, quality, target_height).await {
// Some(Ok(image)) => HttpResponse::Ok().content_type("image/jpeg").body(image),
// None => HttpResponse::NotFound()
// .content_type("text/plain")
// .body(format!(
// "Thumbnail not found for source {:?}.",
// thumbnail_file_request.source
// )),
// Some(Err(error)) => HttpResponse::ServiceUnavailable()
// .reason("Thumbnail temporarily unavailable")
// .insert_header((header::RETRY_AFTER, 10))
// .content_type("text/plain")
// .body(format!(
// "Thumbnail for source {:?} is temporarily unavailable. Try again later. Details: {error:?}",
// thumbnail_file_request.source
// )),
// }
// }
Comment on lines +429 to +466
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This needs to be reworked with care to work with only one worker...

26 changes: 13 additions & 13 deletions src/settings/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use directories::ProjectDirs;
use serde::{Deserialize, Serialize};
use std::io::prelude::*;
use std::path::Path;
use std::sync::{Arc, Mutex};
use std::sync::{Arc, RwLock};
use tracing::*;

use crate::cli;
Expand Down Expand Up @@ -34,7 +34,7 @@ struct Manager {
}

lazy_static! {
static ref MANAGER: Arc<Mutex<Manager>> = Arc::new(Mutex::new(Manager { content: None }));
static ref MANAGER: Arc<RwLock<Manager>> = Arc::new(RwLock::new(Manager { content: None }));
}

impl Default for SettingsStruct {
Expand Down Expand Up @@ -101,7 +101,7 @@ impl Manager {
// Init settings manager with the desired settings file,
// will be created if does not exist
pub fn init(file_name: Option<&str>) {
let mut manager = MANAGER.lock().unwrap();
let mut manager = MANAGER.write().unwrap();
let file_name = file_name.unwrap_or("settings.json");
manager.content = Some(Manager::with(file_name));
}
Expand Down Expand Up @@ -153,7 +153,7 @@ fn save_settings_to_file(file_name: &str, content: &SettingsStruct) -> Result<()

// Save the latest state of the settings
pub fn save() {
let manager = MANAGER.lock().unwrap();
let manager = MANAGER.read().unwrap();
//TODO: deal com save problems here
if let Some(content) = &manager.content {
if let Err(error) = save_settings_to_file(&content.file_name, &content.config) {
Expand All @@ -171,12 +171,12 @@ pub fn save() {

#[allow(dead_code)]
pub fn header() -> HeaderSettingsFile {
let manager = MANAGER.lock().unwrap();
let manager = MANAGER.read().unwrap();
manager.content.as_ref().unwrap().config.header.clone()
}

pub fn mavlink_endpoint() -> Option<String> {
let manager = MANAGER.lock().unwrap();
let manager = MANAGER.read().unwrap();
return manager
.content
.as_ref()
Expand All @@ -189,23 +189,23 @@ pub fn mavlink_endpoint() -> Option<String> {
pub fn set_mavlink_endpoint(endpoint: &str) {
//TODO: make content more easy to access
{
let mut manager = MANAGER.lock().unwrap();
let mut manager = MANAGER.write().unwrap();
let mut content = manager.content.as_mut();
content.as_mut().unwrap().config.mavlink_endpoint = Some(endpoint.into());
}
save();
}

pub fn streams() -> Vec<VideoAndStreamInformation> {
let manager = MANAGER.lock().unwrap();
let manager = MANAGER.read().unwrap();
let content = manager.content.as_ref();
content.unwrap().config.streams.clone()
}

pub fn set_streams(streams: &[VideoAndStreamInformation]) {
// Take care of scope mutex
// Take care of scope RwLock
{
let mut manager = MANAGER.lock().unwrap();
let mut manager = MANAGER.write().unwrap();
let mut content = manager.content.as_mut();
content.as_mut().unwrap().config.streams.clear();
content
Expand All @@ -219,9 +219,9 @@ pub fn set_streams(streams: &[VideoAndStreamInformation]) {
}

pub fn reset() {
// Take care of scope mutex
// Take care of scope RwLock
{
let mut manager = MANAGER.lock().unwrap();
let mut manager = MANAGER.write().unwrap();
manager.content.as_mut().unwrap().config = SettingsStruct::default();
}
save();
Expand Down Expand Up @@ -254,7 +254,7 @@ mod tests {
#[test]
fn test_no_aboslute_path() {
init(None);
let manager = MANAGER.lock().unwrap();
let manager = MANAGER.read().unwrap();
let file_name = &manager.content.as_ref().unwrap().file_name;
assert!(
std::path::Path::new(&file_name).exists(),
Expand Down
Loading
Loading