Skip to content

Commit

Permalink
Allow plugins to fail at startup, and Zenohd to react to that failure
Browse files Browse the repository at this point in the history
  • Loading branch information
Pierre Avital committed Jul 3, 2023
1 parent 02e5f70 commit 0575fbe
Show file tree
Hide file tree
Showing 3 changed files with 50 additions and 25 deletions.
37 changes: 17 additions & 20 deletions io/zenoh-transport/src/unicast/link.rs
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ async fn tx_task(
let (batch_size, _) = tx_compressed(
is_compressed,
link.is_streamed(),
&bytes,
bytes,
&mut compression_aux_buff,
)?;
bytes = &compression_aux_buff[..batch_size];
Expand Down Expand Up @@ -471,7 +471,7 @@ fn rx_decompress(
end_pos: &mut usize,
) -> ZResult<()> {
let is_compressed: bool = buffer[COMPRESSION_BYTE_INDEX] == COMPRESSION_ENABLED;
Ok(if is_compressed {
if is_compressed {
let mut aux_buff = pool.try_take().unwrap_or_else(|| pool.alloc());
*end_pos = lz4_flex::block::decompress_into(
&buffer[BATCH_PAYLOAD_START_INDEX..read_bytes],
Expand All @@ -482,7 +482,8 @@ fn rx_decompress(
} else {
*start_pos = BATCH_PAYLOAD_START_INDEX;
*end_pos = read_bytes;
})
};
Ok(())
}

#[cfg(all(feature = "unstable", feature = "transport_compression"))]
Expand Down Expand Up @@ -589,14 +590,11 @@ fn set_uncompressed_batch_header(
if is_streamed {
let mut header = [0_u8, 0_u8];
header[..HEADER_BYTES_SIZE].copy_from_slice(&bytes[..HEADER_BYTES_SIZE]);
let mut batch_size = u16::from_le_bytes(header);
batch_size += 1;
let batch_size: u16 = batch_size.try_into().map_err(|e| {
zerror!(
"Compression error: unable to convert compression size into u16: {}",
e
)
})?;
let batch_size = if let Some(size) = u16::from_le_bytes(header).checked_add(1) {
size
} else {
bail!("Compression error: unable to convert compression size into u16",)
};
buff[0..HEADER_BYTES_SIZE].copy_from_slice(&batch_size.to_le_bytes());
buff[COMPRESSION_BYTE_INDEX_STREAMED] = COMPRESSION_DISABLED;
let batch_size: usize = batch_size.into();
Expand All @@ -612,7 +610,7 @@ fn set_uncompressed_batch_header(
// May happen when the payload size is itself the MTU and adding the header exceeds it.
Err(zerror!("Failed to send uncompressed batch, batch size ({}) exceeds the maximum batch size of {}.", final_batch_size, MAX_BATCH_SIZE))?;
}
return Ok(final_batch_size);
Ok(final_batch_size)
}

#[cfg(all(feature = "transport_compression", feature = "unstable"))]
Expand All @@ -626,47 +624,46 @@ fn tx_compression_test() {
// Compression done for the sake of comparing the result.
let payload_compression_size = lz4_flex::block::compress_into(&payload, &mut buff).unwrap();

fn get_header_value(buff: &Box<[u8]>) -> u16 {
fn get_header_value(buff: &[u8]) -> u16 {
let mut header = [0_u8, 0_u8];
header[..HEADER_BYTES_SIZE].copy_from_slice(&buff[..HEADER_BYTES_SIZE]);
let batch_size = u16::from_le_bytes(header);
batch_size
u16::from_le_bytes(header)
}

// Streamed with compression enabled
let batch = [16, 0, 1, 2, 3, 4, 1, 2, 3, 4, 1, 2, 3, 4, 1, 2, 3, 4];
let (batch_size, was_compressed) = tx_compressed(true, true, &batch, &mut buff).unwrap();
let header = get_header_value(&buff);
assert_eq!(was_compressed, true);
assert!(was_compressed);
assert_eq!(header as usize, payload_compression_size + COMPRESSION_BYTE);
assert!(batch_size < batch.len() + COMPRESSION_BYTE);
assert_eq!(batch_size, payload_compression_size + 3);

// Not streamed with compression enabled
let batch = payload;
let (batch_size, was_compressed) = tx_compressed(true, false, &batch, &mut buff).unwrap();
assert_eq!(was_compressed, true);
assert!(was_compressed);
assert!(batch_size < batch.len() + COMPRESSION_BYTE);
assert_eq!(batch_size, payload_compression_size + COMPRESSION_BYTE);

// Streamed with compression disabled
let batch = [16, 0, 1, 2, 3, 4, 1, 2, 3, 4, 1, 2, 3, 4, 1, 2, 3, 4];
let (batch_size, was_compressed) = tx_compressed(false, true, &batch, &mut buff).unwrap();
let header = get_header_value(&buff);
assert_eq!(was_compressed, false);
assert!(!was_compressed);
assert_eq!(header as usize, payload.len() + COMPRESSION_BYTE);
assert_eq!(batch_size, batch.len() + COMPRESSION_BYTE);

// Not streamed and compression disabled
let batch = payload;
let (batch_size, was_compressed) = tx_compressed(false, false, &batch, &mut buff).unwrap();
assert_eq!(was_compressed, false);
assert!(!was_compressed);
assert_eq!(batch_size, payload.len() + COMPRESSION_BYTE);

// Verify that if the compression result is bigger than the original payload size, then the non compressed payload is returned.
let batch = [16, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16]; // a non compressable payload with no repetitions
let (batch_size, was_compressed) = tx_compressed(true, true, &batch, &mut buff).unwrap();
assert_eq!(was_compressed, false);
assert!(!was_compressed);
assert_eq!(batch_size, batch.len() + COMPRESSION_BYTE);
}

Expand Down
10 changes: 8 additions & 2 deletions plugins/zenoh-plugin-rest/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,11 @@ impl Plugin for RestPlugin {

let conf: Config = serde_json::from_value(plugin_conf.clone())
.map_err(|e| zerror!("Plugin `{}` configuration error: {}", name, e))?;
async_std::task::spawn(run(runtime.clone(), conf.clone()));
let task = async_std::task::spawn(run(runtime.clone(), conf.clone()));
let task = async_std::task::block_on(task.timeout(std::time::Duration::from_millis(1)));
if let Ok(Err(e)) = task {
bail!("REST server failed within 1ms: {e}")
}
Ok(Box::new(RunningPlugin(conf)))
}
}
Expand Down Expand Up @@ -435,7 +439,7 @@ async fn write(mut req: Request<(Arc<Session>, String)>) -> tide::Result<Respons
}
}

pub async fn run(runtime: Runtime, conf: Config) {
pub async fn run(runtime: Runtime, conf: Config) -> ZResult<()> {
// Try to initiate login.
// Required in case of dynamic lib, otherwise no logs.
// But cannot be done twice in case of static link.
Expand All @@ -461,7 +465,9 @@ pub async fn run(runtime: Runtime, conf: Config) {

if let Err(e) = app.listen(conf.http_port).await {
log::error!("Unable to start http server for REST: {:?}", e);
return Err(e.into());
}
Ok(())
}

fn path_to_key_expr<'a>(path: &'a str, zid: &str) -> ZResult<KeyExpr<'a>> {
Expand Down
28 changes: 25 additions & 3 deletions zenohd/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::collections::HashSet;

//
// Copyright (c) 2023 ZettaScale Technology
//
Expand Down Expand Up @@ -78,22 +80,30 @@ clap::Arg::new("adminspace-permissions").long("adminspace-permissions").value_na

let mut plugins = PluginsManager::dynamic(config.libloader());
// Static plugins are to be added here, with `.add_static::<PluginType>()`
let mut required_plugins = HashSet::new();
for plugin_load in config.plugins().load_requests() {
let PluginLoad {
name,
paths,
required,
} = plugin_load;
log::info!(
"Loading {req} plugin \"{name}\"",
req = if required { "required" } else { "" }
);
if let Err(e) = match paths {
None => plugins.load_plugin_by_name(name),
Some(paths) => plugins.load_plugin_by_paths(name, &paths),
None => plugins.load_plugin_by_name(name.clone()),
Some(paths) => plugins.load_plugin_by_paths(name.clone(), &paths),
} {
if required {
panic!("Plugin load failure: {}", e)
} else {
log::error!("Plugin load failure: {}", e)
}
}
if required {
required_plugins.insert(name);
}
}

let runtime = match Runtime::new(config).await {
Expand All @@ -105,6 +115,11 @@ clap::Arg::new("adminspace-permissions").long("adminspace-permissions").value_na
};

for (name, path, start_result) in plugins.start_all(&runtime) {
let required = required_plugins.contains(name);
log::info!(
"Starting {req} plugin \"{name}\"",
req = if required { "required" } else { "" }
);
match start_result {
Ok(Some(_)) => log::info!("Successfully started plugin {} from {:?}", name, path),
Ok(None) => log::warn!("Plugin {} from {:?} wasn't loaded, as an other plugin by the same name is already running", name, path),
Expand All @@ -113,7 +128,11 @@ clap::Arg::new("adminspace-permissions").long("adminspace-permissions").value_na
Ok(s) => s,
Err(_) => panic!("Formatting the error from plugin {} ({:?}) failed, this is likely due to ABI unstability.\r\nMake sure your plugin was built with the same version of cargo as zenohd", name, path),
};
log::error!("Plugin start failure: {}", if report.is_empty() {"no details provided"} else {report.as_str()});
if required {
panic!("Plugin \"{name}\" failed to start: {}", if report.is_empty() {"no details provided"} else {report.as_str()});
}else {
log::error!("Required plugin \"{name}\" failed to start: {}", if report.is_empty() {"no details provided"} else {report.as_str()});
}
}
}
}
Expand Down Expand Up @@ -157,6 +176,9 @@ fn config_from_args(args: &ArgMatches) -> Config {
config
.insert_json5("plugins/rest/http_port", &format!(r#""{value}""#))
.unwrap();
config
.insert_json5("plugins/rest/__required__", "true")
.unwrap();
}
}
if let Some(plugins_search_dirs) = args.values_of("plugin-search-dir") {
Expand Down

0 comments on commit 0575fbe

Please sign in to comment.