Skip to content

Commit

Permalink
Allow plugins to fail at startup, and Zenohd to react to that failure
Browse files Browse the repository at this point in the history
  • Loading branch information
Pierre Avital committed Jul 3, 2023
1 parent 02e5f70 commit b0a97ea
Show file tree
Hide file tree
Showing 4 changed files with 144 additions and 25 deletions.
94 changes: 94 additions & 0 deletions examples/examples/wolf.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
use std::sync::{Arc, Mutex};
use std::thread;
use std::time::{Duration, Instant};
use zenoh::prelude::sync::*;
use zenoh::subscriber::Subscriber;

#[derive(Clone, Copy)]
struct Chrono(Instant);
impl Chrono {
fn now() -> Self {
Self(Instant::now())
}
}
impl std::fmt::Display for Chrono {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let t = self.0.elapsed();
write!(f, "{:.3}", t.as_secs_f32())
}
}

#[derive(Clone)]
struct Node {
id: String,
}

struct NodeWrapper<'a> {
node: Arc<Mutex<Node>>,
session: Arc<Session>,
subscription: Option<Subscriber<'a, ()>>,
}

impl NodeWrapper<'_> {
fn new(id: String) -> Self {
let node = Node::new(id);
let node = Arc::new(Mutex::new(node));
let session = zenoh::open(config::default()).res().unwrap().into_arc();
Self {
node,
session,
subscription: None,
}
}

fn run(&mut self, chrono: Chrono) {
let node_clone = Arc::clone(&self.node);

let subscription = self
.session
.declare_subscriber("key/expression")
.reliable()
.callback(move |sample| {
let value = sample.value.to_string();
println!("{chrono} RECV {value}");
let mut node = node_clone.lock().unwrap();
println!("{chrono} LOCK {value}");
node.id = value.clone();
thread::sleep(Duration::from_millis(200));
println!("{chrono} UNLOCK {value}");
})
.res()
.unwrap();
self.subscription = Some(subscription);
}

fn get_id(&self) -> String {
self.node.lock().unwrap().id.clone()
}
}

impl Node {
fn new(id: String) -> Self {
Self { id }
}
}

fn main() {
let chrono = Chrono::now();
let mut node = NodeWrapper::new("node1".to_string());
node.run(chrono);
let mut sessions = Vec::new();
for i in 1..=16 {
//sleep(Duration::from_secs(1)).await;
println!("{chrono} Starting Session {i}");
println!("{chrono} N is {}", node.get_id());
let session = SyncResolve::res(zenoh::open(config::default())).unwrap();

for j in 0..100 {
session.put("key/expression", 100 * i + j).res().unwrap();
}
sessions.push(session);
}
println!("Main thread sleeping");
std::thread::sleep(Duration::from_secs(60));
}
37 changes: 17 additions & 20 deletions io/zenoh-transport/src/unicast/link.rs
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ async fn tx_task(
let (batch_size, _) = tx_compressed(
is_compressed,
link.is_streamed(),
&bytes,
bytes,
&mut compression_aux_buff,
)?;
bytes = &compression_aux_buff[..batch_size];
Expand Down Expand Up @@ -471,7 +471,7 @@ fn rx_decompress(
end_pos: &mut usize,
) -> ZResult<()> {
let is_compressed: bool = buffer[COMPRESSION_BYTE_INDEX] == COMPRESSION_ENABLED;
Ok(if is_compressed {
if is_compressed {
let mut aux_buff = pool.try_take().unwrap_or_else(|| pool.alloc());
*end_pos = lz4_flex::block::decompress_into(
&buffer[BATCH_PAYLOAD_START_INDEX..read_bytes],
Expand All @@ -482,7 +482,8 @@ fn rx_decompress(
} else {
*start_pos = BATCH_PAYLOAD_START_INDEX;
*end_pos = read_bytes;
})
};
Ok(())
}

#[cfg(all(feature = "unstable", feature = "transport_compression"))]
Expand Down Expand Up @@ -589,14 +590,11 @@ fn set_uncompressed_batch_header(
if is_streamed {
let mut header = [0_u8, 0_u8];
header[..HEADER_BYTES_SIZE].copy_from_slice(&bytes[..HEADER_BYTES_SIZE]);
let mut batch_size = u16::from_le_bytes(header);
batch_size += 1;
let batch_size: u16 = batch_size.try_into().map_err(|e| {
zerror!(
"Compression error: unable to convert compression size into u16: {}",
e
)
})?;
let batch_size = if let Some(size) = u16::from_le_bytes(header).checked_add(1) {
size
} else {
bail!("Compression error: unable to convert compression size into u16",)
};
buff[0..HEADER_BYTES_SIZE].copy_from_slice(&batch_size.to_le_bytes());
buff[COMPRESSION_BYTE_INDEX_STREAMED] = COMPRESSION_DISABLED;
let batch_size: usize = batch_size.into();
Expand All @@ -612,7 +610,7 @@ fn set_uncompressed_batch_header(
// May happen when the payload size is itself the MTU and adding the header exceeds it.
Err(zerror!("Failed to send uncompressed batch, batch size ({}) exceeds the maximum batch size of {}.", final_batch_size, MAX_BATCH_SIZE))?;
}
return Ok(final_batch_size);
Ok(final_batch_size)
}

#[cfg(all(feature = "transport_compression", feature = "unstable"))]
Expand All @@ -626,47 +624,46 @@ fn tx_compression_test() {
// Compression done for the sake of comparing the result.
let payload_compression_size = lz4_flex::block::compress_into(&payload, &mut buff).unwrap();

fn get_header_value(buff: &Box<[u8]>) -> u16 {
fn get_header_value(buff: &[u8]) -> u16 {
let mut header = [0_u8, 0_u8];
header[..HEADER_BYTES_SIZE].copy_from_slice(&buff[..HEADER_BYTES_SIZE]);
let batch_size = u16::from_le_bytes(header);
batch_size
u16::from_le_bytes(header)
}

// Streamed with compression enabled
let batch = [16, 0, 1, 2, 3, 4, 1, 2, 3, 4, 1, 2, 3, 4, 1, 2, 3, 4];
let (batch_size, was_compressed) = tx_compressed(true, true, &batch, &mut buff).unwrap();
let header = get_header_value(&buff);
assert_eq!(was_compressed, true);
assert!(was_compressed);
assert_eq!(header as usize, payload_compression_size + COMPRESSION_BYTE);
assert!(batch_size < batch.len() + COMPRESSION_BYTE);
assert_eq!(batch_size, payload_compression_size + 3);

// Not streamed with compression enabled
let batch = payload;
let (batch_size, was_compressed) = tx_compressed(true, false, &batch, &mut buff).unwrap();
assert_eq!(was_compressed, true);
assert!(was_compressed);
assert!(batch_size < batch.len() + COMPRESSION_BYTE);
assert_eq!(batch_size, payload_compression_size + COMPRESSION_BYTE);

// Streamed with compression disabled
let batch = [16, 0, 1, 2, 3, 4, 1, 2, 3, 4, 1, 2, 3, 4, 1, 2, 3, 4];
let (batch_size, was_compressed) = tx_compressed(false, true, &batch, &mut buff).unwrap();
let header = get_header_value(&buff);
assert_eq!(was_compressed, false);
assert!(!was_compressed);
assert_eq!(header as usize, payload.len() + COMPRESSION_BYTE);
assert_eq!(batch_size, batch.len() + COMPRESSION_BYTE);

// Not streamed and compression disabled
let batch = payload;
let (batch_size, was_compressed) = tx_compressed(false, false, &batch, &mut buff).unwrap();
assert_eq!(was_compressed, false);
assert!(!was_compressed);
assert_eq!(batch_size, payload.len() + COMPRESSION_BYTE);

// Verify that if the compression result is bigger than the original payload size, then the non compressed payload is returned.
let batch = [16, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16]; // a non compressable payload with no repetitions
let (batch_size, was_compressed) = tx_compressed(true, true, &batch, &mut buff).unwrap();
assert_eq!(was_compressed, false);
assert!(!was_compressed);
assert_eq!(batch_size, batch.len() + COMPRESSION_BYTE);
}

Expand Down
10 changes: 8 additions & 2 deletions plugins/zenoh-plugin-rest/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,11 @@ impl Plugin for RestPlugin {

let conf: Config = serde_json::from_value(plugin_conf.clone())
.map_err(|e| zerror!("Plugin `{}` configuration error: {}", name, e))?;
async_std::task::spawn(run(runtime.clone(), conf.clone()));
let task = async_std::task::spawn(run(runtime.clone(), conf.clone()));
let task = async_std::task::block_on(task.timeout(std::time::Duration::from_millis(1)));
if let Ok(Err(e)) = task {
bail!("REST server failed within 1ms: {e}")
}
Ok(Box::new(RunningPlugin(conf)))
}
}
Expand Down Expand Up @@ -435,7 +439,7 @@ async fn write(mut req: Request<(Arc<Session>, String)>) -> tide::Result<Respons
}
}

pub async fn run(runtime: Runtime, conf: Config) {
pub async fn run(runtime: Runtime, conf: Config) -> ZResult<()> {
// Try to initiate login.
// Required in case of dynamic lib, otherwise no logs.
// But cannot be done twice in case of static link.
Expand All @@ -461,7 +465,9 @@ pub async fn run(runtime: Runtime, conf: Config) {

if let Err(e) = app.listen(conf.http_port).await {
log::error!("Unable to start http server for REST: {:?}", e);
return Err(e.into());
}
Ok(())
}

fn path_to_key_expr<'a>(path: &'a str, zid: &str) -> ZResult<KeyExpr<'a>> {
Expand Down
28 changes: 25 additions & 3 deletions zenohd/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::collections::HashSet;

//
// Copyright (c) 2023 ZettaScale Technology
//
Expand Down Expand Up @@ -78,22 +80,30 @@ clap::Arg::new("adminspace-permissions").long("adminspace-permissions").value_na

let mut plugins = PluginsManager::dynamic(config.libloader());
// Static plugins are to be added here, with `.add_static::<PluginType>()`
let mut required_plugins = HashSet::new();
for plugin_load in config.plugins().load_requests() {
let PluginLoad {
name,
paths,
required,
} = plugin_load;
log::info!(
"Loading {req} plugin \"{name}\"",
req = if required { "required" } else { "" }
);
if let Err(e) = match paths {
None => plugins.load_plugin_by_name(name),
Some(paths) => plugins.load_plugin_by_paths(name, &paths),
None => plugins.load_plugin_by_name(name.clone()),
Some(paths) => plugins.load_plugin_by_paths(name.clone(), &paths),
} {
if required {
panic!("Plugin load failure: {}", e)
} else {
log::error!("Plugin load failure: {}", e)
}
}
if required {
required_plugins.insert(name);
}
}

let runtime = match Runtime::new(config).await {
Expand All @@ -105,6 +115,11 @@ clap::Arg::new("adminspace-permissions").long("adminspace-permissions").value_na
};

for (name, path, start_result) in plugins.start_all(&runtime) {
let required = required_plugins.contains(name);
log::info!(
"Starting {req} plugin \"{name}\"",
req = if required { "required" } else { "" }
);
match start_result {
Ok(Some(_)) => log::info!("Successfully started plugin {} from {:?}", name, path),
Ok(None) => log::warn!("Plugin {} from {:?} wasn't loaded, as an other plugin by the same name is already running", name, path),
Expand All @@ -113,7 +128,11 @@ clap::Arg::new("adminspace-permissions").long("adminspace-permissions").value_na
Ok(s) => s,
Err(_) => panic!("Formatting the error from plugin {} ({:?}) failed, this is likely due to ABI unstability.\r\nMake sure your plugin was built with the same version of cargo as zenohd", name, path),
};
log::error!("Plugin start failure: {}", if report.is_empty() {"no details provided"} else {report.as_str()});
if required {
panic!("Plugin \"{name}\" failed to start: {}", if report.is_empty() {"no details provided"} else {report.as_str()});
}else {
log::error!("Required plugin \"{name}\" failed to start: {}", if report.is_empty() {"no details provided"} else {report.as_str()});
}
}
}
}
Expand Down Expand Up @@ -157,6 +176,9 @@ fn config_from_args(args: &ArgMatches) -> Config {
config
.insert_json5("plugins/rest/http_port", &format!(r#""{value}""#))
.unwrap();
config
.insert_json5("plugins/rest/__required__", "true")
.unwrap();
}
}
if let Some(plugins_search_dirs) = args.values_of("plugin-search-dir") {
Expand Down

0 comments on commit b0a97ea

Please sign in to comment.