From 0575fbec21ae1dfb5cfa13a9a19fe9d64ce77953 Mon Sep 17 00:00:00 2001 From: Pierre Avital Date: Mon, 3 Jul 2023 10:28:06 +0200 Subject: [PATCH 1/2] Allow plugins to fail at startup, and Zenohd to react to that failure --- io/zenoh-transport/src/unicast/link.rs | 37 ++++++++++++-------------- plugins/zenoh-plugin-rest/src/lib.rs | 10 +++++-- zenohd/src/main.rs | 28 ++++++++++++++++--- 3 files changed, 50 insertions(+), 25 deletions(-) diff --git a/io/zenoh-transport/src/unicast/link.rs b/io/zenoh-transport/src/unicast/link.rs index 68d59817c0..7d5756bfd8 100644 --- a/io/zenoh-transport/src/unicast/link.rs +++ b/io/zenoh-transport/src/unicast/link.rs @@ -227,7 +227,7 @@ async fn tx_task( let (batch_size, _) = tx_compressed( is_compressed, link.is_streamed(), - &bytes, + bytes, &mut compression_aux_buff, )?; bytes = &compression_aux_buff[..batch_size]; @@ -471,7 +471,7 @@ fn rx_decompress( end_pos: &mut usize, ) -> ZResult<()> { let is_compressed: bool = buffer[COMPRESSION_BYTE_INDEX] == COMPRESSION_ENABLED; - Ok(if is_compressed { + if is_compressed { let mut aux_buff = pool.try_take().unwrap_or_else(|| pool.alloc()); *end_pos = lz4_flex::block::decompress_into( &buffer[BATCH_PAYLOAD_START_INDEX..read_bytes], @@ -482,7 +482,8 @@ fn rx_decompress( } else { *start_pos = BATCH_PAYLOAD_START_INDEX; *end_pos = read_bytes; - }) + }; + Ok(()) } #[cfg(all(feature = "unstable", feature = "transport_compression"))] @@ -589,14 +590,11 @@ fn set_uncompressed_batch_header( if is_streamed { let mut header = [0_u8, 0_u8]; header[..HEADER_BYTES_SIZE].copy_from_slice(&bytes[..HEADER_BYTES_SIZE]); - let mut batch_size = u16::from_le_bytes(header); - batch_size += 1; - let batch_size: u16 = batch_size.try_into().map_err(|e| { - zerror!( - "Compression error: unable to convert compression size into u16: {}", - e - ) - })?; + let batch_size = if let Some(size) = u16::from_le_bytes(header).checked_add(1) { + size + } else { + bail!("Compression error: unable to convert compression size into u16",) + }; buff[0..HEADER_BYTES_SIZE].copy_from_slice(&batch_size.to_le_bytes()); buff[COMPRESSION_BYTE_INDEX_STREAMED] = COMPRESSION_DISABLED; let batch_size: usize = batch_size.into(); @@ -612,7 +610,7 @@ fn set_uncompressed_batch_header( // May happen when the payload size is itself the MTU and adding the header exceeds it. Err(zerror!("Failed to send uncompressed batch, batch size ({}) exceeds the maximum batch size of {}.", final_batch_size, MAX_BATCH_SIZE))?; } - return Ok(final_batch_size); + Ok(final_batch_size) } #[cfg(all(feature = "transport_compression", feature = "unstable"))] @@ -626,18 +624,17 @@ fn tx_compression_test() { // Compression done for the sake of comparing the result. let payload_compression_size = lz4_flex::block::compress_into(&payload, &mut buff).unwrap(); - fn get_header_value(buff: &Box<[u8]>) -> u16 { + fn get_header_value(buff: &[u8]) -> u16 { let mut header = [0_u8, 0_u8]; header[..HEADER_BYTES_SIZE].copy_from_slice(&buff[..HEADER_BYTES_SIZE]); - let batch_size = u16::from_le_bytes(header); - batch_size + u16::from_le_bytes(header) } // Streamed with compression enabled let batch = [16, 0, 1, 2, 3, 4, 1, 2, 3, 4, 1, 2, 3, 4, 1, 2, 3, 4]; let (batch_size, was_compressed) = tx_compressed(true, true, &batch, &mut buff).unwrap(); let header = get_header_value(&buff); - assert_eq!(was_compressed, true); + assert!(was_compressed); assert_eq!(header as usize, payload_compression_size + COMPRESSION_BYTE); assert!(batch_size < batch.len() + COMPRESSION_BYTE); assert_eq!(batch_size, payload_compression_size + 3); @@ -645,7 +642,7 @@ fn tx_compression_test() { // Not streamed with compression enabled let batch = payload; let (batch_size, was_compressed) = tx_compressed(true, false, &batch, &mut buff).unwrap(); - assert_eq!(was_compressed, true); + assert!(was_compressed); assert!(batch_size < batch.len() + COMPRESSION_BYTE); assert_eq!(batch_size, payload_compression_size + COMPRESSION_BYTE); @@ -653,20 +650,20 @@ fn tx_compression_test() { let batch = [16, 0, 1, 2, 3, 4, 1, 2, 3, 4, 1, 2, 3, 4, 1, 2, 3, 4]; let (batch_size, was_compressed) = tx_compressed(false, true, &batch, &mut buff).unwrap(); let header = get_header_value(&buff); - assert_eq!(was_compressed, false); + assert!(!was_compressed); assert_eq!(header as usize, payload.len() + COMPRESSION_BYTE); assert_eq!(batch_size, batch.len() + COMPRESSION_BYTE); // Not streamed and compression disabled let batch = payload; let (batch_size, was_compressed) = tx_compressed(false, false, &batch, &mut buff).unwrap(); - assert_eq!(was_compressed, false); + assert!(!was_compressed); assert_eq!(batch_size, payload.len() + COMPRESSION_BYTE); // Verify that if the compression result is bigger than the original payload size, then the non compressed payload is returned. let batch = [16, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16]; // a non compressable payload with no repetitions let (batch_size, was_compressed) = tx_compressed(true, true, &batch, &mut buff).unwrap(); - assert_eq!(was_compressed, false); + assert!(!was_compressed); assert_eq!(batch_size, batch.len() + COMPRESSION_BYTE); } diff --git a/plugins/zenoh-plugin-rest/src/lib.rs b/plugins/zenoh-plugin-rest/src/lib.rs index 6d6e5956eb..d9165d0b1d 100644 --- a/plugins/zenoh-plugin-rest/src/lib.rs +++ b/plugins/zenoh-plugin-rest/src/lib.rs @@ -198,7 +198,11 @@ impl Plugin for RestPlugin { let conf: Config = serde_json::from_value(plugin_conf.clone()) .map_err(|e| zerror!("Plugin `{}` configuration error: {}", name, e))?; - async_std::task::spawn(run(runtime.clone(), conf.clone())); + let task = async_std::task::spawn(run(runtime.clone(), conf.clone())); + let task = async_std::task::block_on(task.timeout(std::time::Duration::from_millis(1))); + if let Ok(Err(e)) = task { + bail!("REST server failed within 1ms: {e}") + } Ok(Box::new(RunningPlugin(conf))) } } @@ -435,7 +439,7 @@ async fn write(mut req: Request<(Arc, String)>) -> tide::Result ZResult<()> { // Try to initiate login. // Required in case of dynamic lib, otherwise no logs. // But cannot be done twice in case of static link. @@ -461,7 +465,9 @@ pub async fn run(runtime: Runtime, conf: Config) { if let Err(e) = app.listen(conf.http_port).await { log::error!("Unable to start http server for REST: {:?}", e); + return Err(e.into()); } + Ok(()) } fn path_to_key_expr<'a>(path: &'a str, zid: &str) -> ZResult> { diff --git a/zenohd/src/main.rs b/zenohd/src/main.rs index ba61d8fa29..a582e6c7e3 100644 --- a/zenohd/src/main.rs +++ b/zenohd/src/main.rs @@ -1,3 +1,5 @@ +use std::collections::HashSet; + // // Copyright (c) 2023 ZettaScale Technology // @@ -78,15 +80,20 @@ clap::Arg::new("adminspace-permissions").long("adminspace-permissions").value_na let mut plugins = PluginsManager::dynamic(config.libloader()); // Static plugins are to be added here, with `.add_static::()` + let mut required_plugins = HashSet::new(); for plugin_load in config.plugins().load_requests() { let PluginLoad { name, paths, required, } = plugin_load; + log::info!( + "Loading {req} plugin \"{name}\"", + req = if required { "required" } else { "" } + ); if let Err(e) = match paths { - None => plugins.load_plugin_by_name(name), - Some(paths) => plugins.load_plugin_by_paths(name, &paths), + None => plugins.load_plugin_by_name(name.clone()), + Some(paths) => plugins.load_plugin_by_paths(name.clone(), &paths), } { if required { panic!("Plugin load failure: {}", e) @@ -94,6 +101,9 @@ clap::Arg::new("adminspace-permissions").long("adminspace-permissions").value_na log::error!("Plugin load failure: {}", e) } } + if required { + required_plugins.insert(name); + } } let runtime = match Runtime::new(config).await { @@ -105,6 +115,11 @@ clap::Arg::new("adminspace-permissions").long("adminspace-permissions").value_na }; for (name, path, start_result) in plugins.start_all(&runtime) { + let required = required_plugins.contains(name); + log::info!( + "Starting {req} plugin \"{name}\"", + req = if required { "required" } else { "" } + ); match start_result { Ok(Some(_)) => log::info!("Successfully started plugin {} from {:?}", name, path), Ok(None) => log::warn!("Plugin {} from {:?} wasn't loaded, as an other plugin by the same name is already running", name, path), @@ -113,7 +128,11 @@ clap::Arg::new("adminspace-permissions").long("adminspace-permissions").value_na Ok(s) => s, Err(_) => panic!("Formatting the error from plugin {} ({:?}) failed, this is likely due to ABI unstability.\r\nMake sure your plugin was built with the same version of cargo as zenohd", name, path), }; - log::error!("Plugin start failure: {}", if report.is_empty() {"no details provided"} else {report.as_str()}); + if required { + panic!("Plugin \"{name}\" failed to start: {}", if report.is_empty() {"no details provided"} else {report.as_str()}); + }else { + log::error!("Required plugin \"{name}\" failed to start: {}", if report.is_empty() {"no details provided"} else {report.as_str()}); + } } } } @@ -157,6 +176,9 @@ fn config_from_args(args: &ArgMatches) -> Config { config .insert_json5("plugins/rest/http_port", &format!(r#""{value}""#)) .unwrap(); + config + .insert_json5("plugins/rest/__required__", "true") + .unwrap(); } } if let Some(plugins_search_dirs) = args.values_of("plugin-search-dir") { From bb8c071c099a261e9f9913e89ec377943025529c Mon Sep 17 00:00:00 2001 From: Luca Cominardi Date: Tue, 25 Jul 2023 13:32:37 +0200 Subject: [PATCH 2/2] Address PR comments --- zenohd/src/main.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/zenohd/src/main.rs b/zenohd/src/main.rs index a582e6c7e3..3d436cad6d 100644 --- a/zenohd/src/main.rs +++ b/zenohd/src/main.rs @@ -1,5 +1,3 @@ -use std::collections::HashSet; - // // Copyright (c) 2023 ZettaScale Technology // @@ -17,6 +15,7 @@ use async_std::task; use clap::{ArgMatches, Command}; use futures::future; use git_version::git_version; +use std::collections::HashSet; use zenoh::config::{ Config, EndPoint, ModeDependentValue, PermissionsConf, PluginLoad, ValidatedMap, };