diff --git a/examples/examples/wolf.rs b/examples/examples/wolf.rs new file mode 100644 index 0000000000..f4cfdb4dd9 --- /dev/null +++ b/examples/examples/wolf.rs @@ -0,0 +1,94 @@ +use std::sync::{Arc, Mutex}; +use std::thread; +use std::time::{Duration, Instant}; +use zenoh::prelude::sync::*; +use zenoh::subscriber::Subscriber; + +#[derive(Clone, Copy)] +struct Chrono(Instant); +impl Chrono { + fn now() -> Self { + Self(Instant::now()) + } +} +impl std::fmt::Display for Chrono { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + let t = self.0.elapsed(); + write!(f, "{:.3}", t.as_secs_f32()) + } +} + +#[derive(Clone)] +struct Node { + id: String, +} + +struct NodeWrapper<'a> { + node: Arc>, + session: Arc, + subscription: Option>, +} + +impl NodeWrapper<'_> { + fn new(id: String) -> Self { + let node = Node::new(id); + let node = Arc::new(Mutex::new(node)); + let session = zenoh::open(config::default()).res().unwrap().into_arc(); + Self { + node, + session, + subscription: None, + } + } + + fn run(&mut self, chrono: Chrono) { + let node_clone = Arc::clone(&self.node); + + let subscription = self + .session + .declare_subscriber("key/expression") + .reliable() + .callback(move |sample| { + let value = sample.value.to_string(); + println!("{chrono} RECV {value}"); + let mut node = node_clone.lock().unwrap(); + println!("{chrono} LOCK {value}"); + node.id = value.clone(); + thread::sleep(Duration::from_millis(200)); + println!("{chrono} UNLOCK {value}"); + }) + .res() + .unwrap(); + self.subscription = Some(subscription); + } + + fn get_id(&self) -> String { + self.node.lock().unwrap().id.clone() + } +} + +impl Node { + fn new(id: String) -> Self { + Self { id } + } +} + +fn main() { + let chrono = Chrono::now(); + let mut node = NodeWrapper::new("node1".to_string()); + node.run(chrono); + let mut sessions = Vec::new(); + for i in 1..=16 { + //sleep(Duration::from_secs(1)).await; + println!("{chrono} Starting Session {i}"); + println!("{chrono} N is {}", node.get_id()); + let session = SyncResolve::res(zenoh::open(config::default())).unwrap(); + + for j in 0..100 { + session.put("key/expression", 100 * i + j).res().unwrap(); + } + sessions.push(session); + } + println!("Main thread sleeping"); + std::thread::sleep(Duration::from_secs(60)); +} diff --git a/io/zenoh-transport/src/unicast/link.rs b/io/zenoh-transport/src/unicast/link.rs index 68d59817c0..7d5756bfd8 100644 --- a/io/zenoh-transport/src/unicast/link.rs +++ b/io/zenoh-transport/src/unicast/link.rs @@ -227,7 +227,7 @@ async fn tx_task( let (batch_size, _) = tx_compressed( is_compressed, link.is_streamed(), - &bytes, + bytes, &mut compression_aux_buff, )?; bytes = &compression_aux_buff[..batch_size]; @@ -471,7 +471,7 @@ fn rx_decompress( end_pos: &mut usize, ) -> ZResult<()> { let is_compressed: bool = buffer[COMPRESSION_BYTE_INDEX] == COMPRESSION_ENABLED; - Ok(if is_compressed { + if is_compressed { let mut aux_buff = pool.try_take().unwrap_or_else(|| pool.alloc()); *end_pos = lz4_flex::block::decompress_into( &buffer[BATCH_PAYLOAD_START_INDEX..read_bytes], @@ -482,7 +482,8 @@ fn rx_decompress( } else { *start_pos = BATCH_PAYLOAD_START_INDEX; *end_pos = read_bytes; - }) + }; + Ok(()) } #[cfg(all(feature = "unstable", feature = "transport_compression"))] @@ -589,14 +590,11 @@ fn set_uncompressed_batch_header( if is_streamed { let mut header = [0_u8, 0_u8]; header[..HEADER_BYTES_SIZE].copy_from_slice(&bytes[..HEADER_BYTES_SIZE]); - let mut batch_size = u16::from_le_bytes(header); - batch_size += 1; - let batch_size: u16 = batch_size.try_into().map_err(|e| { - zerror!( - "Compression error: unable to convert compression size into u16: {}", - e - ) - })?; + let batch_size = if let Some(size) = u16::from_le_bytes(header).checked_add(1) { + size + } else { + bail!("Compression error: unable to convert compression size into u16",) + }; buff[0..HEADER_BYTES_SIZE].copy_from_slice(&batch_size.to_le_bytes()); buff[COMPRESSION_BYTE_INDEX_STREAMED] = COMPRESSION_DISABLED; let batch_size: usize = batch_size.into(); @@ -612,7 +610,7 @@ fn set_uncompressed_batch_header( // May happen when the payload size is itself the MTU and adding the header exceeds it. Err(zerror!("Failed to send uncompressed batch, batch size ({}) exceeds the maximum batch size of {}.", final_batch_size, MAX_BATCH_SIZE))?; } - return Ok(final_batch_size); + Ok(final_batch_size) } #[cfg(all(feature = "transport_compression", feature = "unstable"))] @@ -626,18 +624,17 @@ fn tx_compression_test() { // Compression done for the sake of comparing the result. let payload_compression_size = lz4_flex::block::compress_into(&payload, &mut buff).unwrap(); - fn get_header_value(buff: &Box<[u8]>) -> u16 { + fn get_header_value(buff: &[u8]) -> u16 { let mut header = [0_u8, 0_u8]; header[..HEADER_BYTES_SIZE].copy_from_slice(&buff[..HEADER_BYTES_SIZE]); - let batch_size = u16::from_le_bytes(header); - batch_size + u16::from_le_bytes(header) } // Streamed with compression enabled let batch = [16, 0, 1, 2, 3, 4, 1, 2, 3, 4, 1, 2, 3, 4, 1, 2, 3, 4]; let (batch_size, was_compressed) = tx_compressed(true, true, &batch, &mut buff).unwrap(); let header = get_header_value(&buff); - assert_eq!(was_compressed, true); + assert!(was_compressed); assert_eq!(header as usize, payload_compression_size + COMPRESSION_BYTE); assert!(batch_size < batch.len() + COMPRESSION_BYTE); assert_eq!(batch_size, payload_compression_size + 3); @@ -645,7 +642,7 @@ fn tx_compression_test() { // Not streamed with compression enabled let batch = payload; let (batch_size, was_compressed) = tx_compressed(true, false, &batch, &mut buff).unwrap(); - assert_eq!(was_compressed, true); + assert!(was_compressed); assert!(batch_size < batch.len() + COMPRESSION_BYTE); assert_eq!(batch_size, payload_compression_size + COMPRESSION_BYTE); @@ -653,20 +650,20 @@ fn tx_compression_test() { let batch = [16, 0, 1, 2, 3, 4, 1, 2, 3, 4, 1, 2, 3, 4, 1, 2, 3, 4]; let (batch_size, was_compressed) = tx_compressed(false, true, &batch, &mut buff).unwrap(); let header = get_header_value(&buff); - assert_eq!(was_compressed, false); + assert!(!was_compressed); assert_eq!(header as usize, payload.len() + COMPRESSION_BYTE); assert_eq!(batch_size, batch.len() + COMPRESSION_BYTE); // Not streamed and compression disabled let batch = payload; let (batch_size, was_compressed) = tx_compressed(false, false, &batch, &mut buff).unwrap(); - assert_eq!(was_compressed, false); + assert!(!was_compressed); assert_eq!(batch_size, payload.len() + COMPRESSION_BYTE); // Verify that if the compression result is bigger than the original payload size, then the non compressed payload is returned. let batch = [16, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16]; // a non compressable payload with no repetitions let (batch_size, was_compressed) = tx_compressed(true, true, &batch, &mut buff).unwrap(); - assert_eq!(was_compressed, false); + assert!(!was_compressed); assert_eq!(batch_size, batch.len() + COMPRESSION_BYTE); } diff --git a/plugins/zenoh-plugin-rest/src/lib.rs b/plugins/zenoh-plugin-rest/src/lib.rs index 6d6e5956eb..d9165d0b1d 100644 --- a/plugins/zenoh-plugin-rest/src/lib.rs +++ b/plugins/zenoh-plugin-rest/src/lib.rs @@ -198,7 +198,11 @@ impl Plugin for RestPlugin { let conf: Config = serde_json::from_value(plugin_conf.clone()) .map_err(|e| zerror!("Plugin `{}` configuration error: {}", name, e))?; - async_std::task::spawn(run(runtime.clone(), conf.clone())); + let task = async_std::task::spawn(run(runtime.clone(), conf.clone())); + let task = async_std::task::block_on(task.timeout(std::time::Duration::from_millis(1))); + if let Ok(Err(e)) = task { + bail!("REST server failed within 1ms: {e}") + } Ok(Box::new(RunningPlugin(conf))) } } @@ -435,7 +439,7 @@ async fn write(mut req: Request<(Arc, String)>) -> tide::Result ZResult<()> { // Try to initiate login. // Required in case of dynamic lib, otherwise no logs. // But cannot be done twice in case of static link. @@ -461,7 +465,9 @@ pub async fn run(runtime: Runtime, conf: Config) { if let Err(e) = app.listen(conf.http_port).await { log::error!("Unable to start http server for REST: {:?}", e); + return Err(e.into()); } + Ok(()) } fn path_to_key_expr<'a>(path: &'a str, zid: &str) -> ZResult> { diff --git a/zenohd/src/main.rs b/zenohd/src/main.rs index ba61d8fa29..a582e6c7e3 100644 --- a/zenohd/src/main.rs +++ b/zenohd/src/main.rs @@ -1,3 +1,5 @@ +use std::collections::HashSet; + // // Copyright (c) 2023 ZettaScale Technology // @@ -78,15 +80,20 @@ clap::Arg::new("adminspace-permissions").long("adminspace-permissions").value_na let mut plugins = PluginsManager::dynamic(config.libloader()); // Static plugins are to be added here, with `.add_static::()` + let mut required_plugins = HashSet::new(); for plugin_load in config.plugins().load_requests() { let PluginLoad { name, paths, required, } = plugin_load; + log::info!( + "Loading {req} plugin \"{name}\"", + req = if required { "required" } else { "" } + ); if let Err(e) = match paths { - None => plugins.load_plugin_by_name(name), - Some(paths) => plugins.load_plugin_by_paths(name, &paths), + None => plugins.load_plugin_by_name(name.clone()), + Some(paths) => plugins.load_plugin_by_paths(name.clone(), &paths), } { if required { panic!("Plugin load failure: {}", e) @@ -94,6 +101,9 @@ clap::Arg::new("adminspace-permissions").long("adminspace-permissions").value_na log::error!("Plugin load failure: {}", e) } } + if required { + required_plugins.insert(name); + } } let runtime = match Runtime::new(config).await { @@ -105,6 +115,11 @@ clap::Arg::new("adminspace-permissions").long("adminspace-permissions").value_na }; for (name, path, start_result) in plugins.start_all(&runtime) { + let required = required_plugins.contains(name); + log::info!( + "Starting {req} plugin \"{name}\"", + req = if required { "required" } else { "" } + ); match start_result { Ok(Some(_)) => log::info!("Successfully started plugin {} from {:?}", name, path), Ok(None) => log::warn!("Plugin {} from {:?} wasn't loaded, as an other plugin by the same name is already running", name, path), @@ -113,7 +128,11 @@ clap::Arg::new("adminspace-permissions").long("adminspace-permissions").value_na Ok(s) => s, Err(_) => panic!("Formatting the error from plugin {} ({:?}) failed, this is likely due to ABI unstability.\r\nMake sure your plugin was built with the same version of cargo as zenohd", name, path), }; - log::error!("Plugin start failure: {}", if report.is_empty() {"no details provided"} else {report.as_str()}); + if required { + panic!("Plugin \"{name}\" failed to start: {}", if report.is_empty() {"no details provided"} else {report.as_str()}); + }else { + log::error!("Required plugin \"{name}\" failed to start: {}", if report.is_empty() {"no details provided"} else {report.as_str()}); + } } } } @@ -157,6 +176,9 @@ fn config_from_args(args: &ArgMatches) -> Config { config .insert_json5("plugins/rest/http_port", &format!(r#""{value}""#)) .unwrap(); + config + .insert_json5("plugins/rest/__required__", "true") + .unwrap(); } } if let Some(plugins_search_dirs) = args.values_of("plugin-search-dir") {