From 41ac4a7072036d786854f2be12cc3bb58098ec74 Mon Sep 17 00:00:00 2001 From: Oussama Teffahi <70609372+oteffahi@users.noreply.github.com> Date: Fri, 7 Mar 2025 17:24:52 +0100 Subject: [PATCH] Add support for query/reply messages to Downsampling (#1797) * Add messages and flows to downsampling config * Add Downsampling message types logic * Move Downsampling messages config out of rules In the current implementation, rules within the same Downsampling config share the same state if configured on the same keyexpr. Having message types within the rules can lead users to configure different messages types within different rules with same keyexpr, which would produce the same result as having all message types within the same rule, i.e downsampling on the whole set of messages instead of doing so seperately for each type. * Update DEFAULT_CONFIG * Add missing check for empty flows list * Fix tests with json5 config * Fix eagerly evaluated downsampling interceptor * Add downsampling query/reply tests * Fix clippy warning * DownsamplingFilters use default debug format --------- Co-authored-by: OlivierHecart --- DEFAULT_CONFIG.json5 | 14 +- commons/zenoh-config/src/lib.rs | 14 +- .../net/routing/interceptor/authorization.rs | 8 +- .../net/routing/interceptor/downsampling.rs | 99 ++++++++--- zenoh/src/net/routing/interceptor/mod.rs | 24 ++- zenoh/tests/interceptors.rs | 166 +++++++++++++++--- 6 files changed, 273 insertions(+), 52 deletions(-) diff --git a/DEFAULT_CONFIG.json5 b/DEFAULT_CONFIG.json5 index d66f64ebc..1a86e8cac 100644 --- a/DEFAULT_CONFIG.json5 +++ b/DEFAULT_CONFIG.json5 @@ -281,8 +281,18 @@ // /// Optional list of network interfaces messages will be processed on, the rest will be passed as is. // /// If absent, the rules will be applied to all interfaces, in case of an empty list it means that they will not be applied to any. // interfaces: [ "wlan0" ], - // /// Data flow messages will be processed on. ("egress" or "ingress") - // flow: "egress", + // /// Optional list of data flows messages will be processed on ("egress" and/or "ingress"). + // /// If absent, the rules will be applied to both flows. + // flow: ["ingress", "egress"], + // /// List of message type on which downsampling will be applied. Must not be empty. + // messages: [ + // /// Publication (Put and Delete) + // "push", + // /// Get + // "query", + // /// Queryable Reply to a Query + // "reply" + // ], // /// A list of downsampling rules: key_expression and the maximum frequency in Hertz // rules: [ // { key_expr: "demo/example/zenoh-rs-pub", freq: 0.1 }, diff --git a/commons/zenoh-config/src/lib.rs b/commons/zenoh-config/src/lib.rs index 895b6799c..ca5cf6655 100644 --- a/commons/zenoh-config/src/lib.rs +++ b/commons/zenoh-config/src/lib.rs @@ -88,6 +88,14 @@ pub enum InterceptorFlow { Ingress, } +#[derive(Clone, Copy, Debug, Serialize, Deserialize, Eq, PartialEq)] +#[serde(rename_all = "snake_case")] +pub enum DownsamplingMessage { + Push, + Query, + Reply, +} + #[derive(Debug, Deserialize, Serialize, Clone)] pub struct DownsamplingRuleConf { /// A list of key-expressions to which the downsampling will be applied. @@ -104,10 +112,12 @@ pub struct DownsamplingItemConf { /// A list of interfaces to which the downsampling will be applied /// Downsampling will be applied for all interfaces if the parameter is None pub interfaces: Option>, + // list of message types on which the downsampling will be applied + pub messages: Vec, /// A list of interfaces to which the downsampling will be applied. pub rules: Vec, - /// Downsampling flow direction: egress, ingress - pub flow: InterceptorFlow, + /// Downsampling flow directions: egress and/or ingress + pub flows: Option>, } #[derive(Serialize, Debug, Deserialize, Clone)] diff --git a/zenoh/src/net/routing/interceptor/authorization.rs b/zenoh/src/net/routing/interceptor/authorization.rs index 9959fece0..5ef62ae79 100644 --- a/zenoh/src/net/routing/interceptor/authorization.rs +++ b/zenoh/src/net/routing/interceptor/authorization.rs @@ -30,6 +30,8 @@ use zenoh_keyexpr::{ keyexpr_tree::{IKeyExprTree, IKeyExprTreeMut, KeBoxTree}, }; use zenoh_result::ZResult; + +use super::InterfaceEnabled; type PolicyForSubject = FlowPolicy; type PolicyMap = HashMap; @@ -243,12 +245,6 @@ impl FlowPolicy { } } -#[derive(Default, Debug)] -pub struct InterfaceEnabled { - pub ingress: bool, - pub egress: bool, -} - pub struct PolicyEnforcer { pub(crate) acl_enabled: bool, pub(crate) default_permission: Permission, diff --git a/zenoh/src/net/routing/interceptor/downsampling.rs b/zenoh/src/net/routing/interceptor/downsampling.rs index 60af846bb..fc23a3b9d 100644 --- a/zenoh/src/net/routing/interceptor/downsampling.rs +++ b/zenoh/src/net/routing/interceptor/downsampling.rs @@ -23,7 +23,9 @@ use std::{ sync::{Arc, Mutex}, }; -use zenoh_config::{DownsamplingItemConf, DownsamplingRuleConf, InterceptorFlow}; +use zenoh_config::{ + DownsamplingItemConf, DownsamplingMessage, DownsamplingRuleConf, InterceptorFlow, +}; use zenoh_core::zlock; use zenoh_keyexpr::keyexpr_tree::{ impls::KeyedSetProvider, support::UnknownWildness, IKeyExprTree, IKeyExprTreeMut, KeBoxTree, @@ -46,7 +48,19 @@ pub(crate) fn downsampling_interceptor_factories( bail!("Invalid Downsampling config: id '{id}' is repeated"); } } - res.push(Box::new(DownsamplingInterceptorFactory::new(ds.clone()))); + let mut ds = ds.clone(); + // check for undefined flows and initialize them + let flows = ds + .flows + .get_or_insert(vec![InterceptorFlow::Ingress, InterceptorFlow::Egress]); + if flows.is_empty() { + bail!("Invalid Downsampling config: flows list must not be empty"); + } + // check for empty messages list + if ds.messages.is_empty() { + bail!("Invalid Downsampling config: messages list must not be empty"); + } + res.push(Box::new(DownsamplingInterceptorFactory::new(ds))); } Ok(res) @@ -55,7 +69,8 @@ pub(crate) fn downsampling_interceptor_factories( pub struct DownsamplingInterceptorFactory { interfaces: Option>, rules: Vec, - flow: InterceptorFlow, + flows: InterfaceEnabled, + messages: Arc, } impl DownsamplingInterceptorFactory { @@ -63,7 +78,12 @@ impl DownsamplingInterceptorFactory { Self { interfaces: conf.interfaces, rules: conf.rules, - flow: conf.flow, + flows: conf + .flows + .expect("config flows should be set") + .as_slice() + .into(), + messages: Arc::new(conf.messages.as_slice().into()), } } } @@ -91,21 +111,20 @@ impl InterceptorFactoryTrait for DownsamplingInterceptorFactory { } } }; - - match self.flow { - InterceptorFlow::Ingress => ( - Some(Box::new(ComputeOnMiss::new(DownsamplingInterceptor::new( + ( + self.flows.ingress.then(|| { + Box::new(ComputeOnMiss::new(DownsamplingInterceptor::new( + self.messages.clone(), self.rules.clone(), - )))), - None, - ), - InterceptorFlow::Egress => ( - None, - Some(Box::new(ComputeOnMiss::new(DownsamplingInterceptor::new( + ))) as IngressInterceptor + }), + self.flows.egress.then(|| { + Box::new(ComputeOnMiss::new(DownsamplingInterceptor::new( + self.messages.clone(), self.rules.clone(), - )))), - ), - } + ))) as EgressInterceptor + }), + ) } fn new_transport_multicast( @@ -120,16 +139,52 @@ impl InterceptorFactoryTrait for DownsamplingInterceptorFactory { } } +#[derive(Debug, Default, Clone)] +pub(crate) struct DownsamplingFilters { + push: bool, + query: bool, + reply: bool, +} + +impl From<&[DownsamplingMessage]> for DownsamplingFilters { + fn from(value: &[DownsamplingMessage]) -> Self { + let mut res = Self::default(); + for v in value { + match v { + DownsamplingMessage::Push => res.push = true, + DownsamplingMessage::Query => res.query = true, + DownsamplingMessage::Reply => res.reply = true, + } + } + res + } +} + struct Timestate { pub threshold: tokio::time::Duration, pub latest_message_timestamp: tokio::time::Instant, } pub(crate) struct DownsamplingInterceptor { + filtered_messages: Arc, ke_id: Arc>>, ke_state: Arc>>, } +impl DownsamplingInterceptor { + fn is_msg_filtered(&self, ctx: &RoutingContext) -> bool { + match ctx.msg.body { + NetworkBody::Push(_) => self.filtered_messages.push, + NetworkBody::Request(_) => self.filtered_messages.query, + NetworkBody::Response(_) => self.filtered_messages.reply, + NetworkBody::ResponseFinal(_) => false, + NetworkBody::Interest(_) => false, + NetworkBody::Declare(_) => false, + NetworkBody::OAM(_) => false, + } + } +} + impl InterceptorTrait for DownsamplingInterceptor { fn compute_keyexpr_cache(&self, key_expr: &KeyExpr<'_>) -> Option> { let ke_id = zlock!(self.ke_id); @@ -146,7 +201,7 @@ impl InterceptorTrait for DownsamplingInterceptor { ctx: RoutingContext, cache: Option<&Box>, ) -> Option> { - if matches!(ctx.msg.body, NetworkBody::Push(_)) { + if self.is_msg_filtered(&ctx) { if let Some(cache) = cache { if let Some(id) = cache.downcast_ref::>() { if let Some(id) = id { @@ -177,7 +232,7 @@ impl InterceptorTrait for DownsamplingInterceptor { const NANOS_PER_SEC: f64 = 1_000_000_000.0; impl DownsamplingInterceptor { - pub fn new(rules: Vec) -> Self { + pub fn new(messages: Arc, rules: Vec) -> Self { let mut ke_id = KeBoxTree::default(); let mut ke_state = HashMap::default(); for (id, rule) in rules.into_iter().enumerate() { @@ -197,12 +252,14 @@ impl DownsamplingInterceptor { }, ); tracing::debug!( - "New downsampler rule enabled: key_expr={:?}, threshold={:?}", + "New downsampler rule enabled: key_expr={:?}, threshold={:?}, messages={:?}", rule.key_expr, - threshold + threshold, + messages, ); } Self { + filtered_messages: messages, ke_id: Arc::new(Mutex::new(ke_id)), ke_state: Arc::new(Mutex::new(ke_state)), } diff --git a/zenoh/src/net/routing/interceptor/mod.rs b/zenoh/src/net/routing/interceptor/mod.rs index 1e8ee6e61..69b8dd60c 100644 --- a/zenoh/src/net/routing/interceptor/mod.rs +++ b/zenoh/src/net/routing/interceptor/mod.rs @@ -24,7 +24,7 @@ use access_control::acl_interceptor_factories; mod authorization; use std::any::Any; -use zenoh_config::Config; +use zenoh_config::{Config, InterceptorFlow}; use zenoh_protocol::network::NetworkMessage; use zenoh_result::ZResult; use zenoh_transport::{multicast::TransportMulticast, unicast::TransportUnicast}; @@ -35,6 +35,28 @@ use crate::api::key_expr::KeyExpr; pub mod downsampling; use crate::net::routing::interceptor::downsampling::downsampling_interceptor_factories; +#[derive(Default, Debug)] +pub struct InterfaceEnabled { + pub ingress: bool, + pub egress: bool, +} + +impl From<&[InterceptorFlow]> for InterfaceEnabled { + fn from(value: &[InterceptorFlow]) -> Self { + let mut res = Self { + ingress: false, + egress: false, + }; + for v in value { + match v { + InterceptorFlow::Egress => res.egress = true, + InterceptorFlow::Ingress => res.ingress = true, + } + } + res + } +} + pub(crate) trait InterceptorTrait { fn compute_keyexpr_cache(&self, key_expr: &KeyExpr<'_>) -> Option>; diff --git a/zenoh/tests/interceptors.rs b/zenoh/tests/interceptors.rs index 0c0d5d298..fd863a456 100644 --- a/zenoh/tests/interceptors.rs +++ b/zenoh/tests/interceptors.rs @@ -23,8 +23,10 @@ use std::{ }, }; -use zenoh::{key_expr::KeyExpr, Config, Wait}; -use zenoh_config::{DownsamplingItemConf, DownsamplingRuleConf, InterceptorFlow}; +use zenoh::{key_expr::KeyExpr, query::ConsolidationMode, Config, Wait}; +use zenoh_config::{ + DownsamplingItemConf, DownsamplingMessage, DownsamplingRuleConf, InterceptorFlow, +}; // Tokio's time granularity on different platforms #[cfg(target_os = "windows")] @@ -40,40 +42,40 @@ fn build_config( ds_config: Vec, flow: InterceptorFlow, ) -> (Config, Config) { - let mut pub_config = Config::default(); - pub_config + let mut sender_config = Config::default(); + sender_config .scouting .multicast .set_enabled(Some(false)) .unwrap(); - let mut sub_config = Config::default(); - sub_config + let mut receiver_config = Config::default(); + receiver_config .scouting .multicast .set_enabled(Some(false)) .unwrap(); - sub_config + receiver_config .listen .endpoints .set(vec![locator.parse().unwrap()]) .unwrap(); - pub_config + sender_config .connect .endpoints .set(vec![locator.parse().unwrap()]) .unwrap(); match flow { - InterceptorFlow::Egress => pub_config.set_downsampling(ds_config).unwrap(), - InterceptorFlow::Ingress => sub_config.set_downsampling(ds_config).unwrap(), + InterceptorFlow::Egress => sender_config.set_downsampling(ds_config).unwrap(), + InterceptorFlow::Ingress => receiver_config.set_downsampling(ds_config).unwrap(), }; - (pub_config, sub_config) + (sender_config, receiver_config) } -fn downsampling_test( +fn downsampling_pub_sub_test( pub_config: Config, sub_config: Config, ke_prefix: &str, @@ -152,8 +154,9 @@ fn downsampling_by_keyexpr_impl(flow: InterceptorFlow) { let ds_config = DownsamplingItemConf { id: None, - flow, + flows: Some(vec![flow]), interfaces: None, + messages: vec![DownsamplingMessage::Push], rules: vec![ DownsamplingRuleConf { key_expr: ke_10hz.clone().into(), @@ -186,7 +189,7 @@ fn downsampling_by_keyexpr_impl(flow: InterceptorFlow) { let (pub_config, sub_config) = build_config(locator, vec![ds_config], flow); - downsampling_test(pub_config, sub_config, ke_prefix, ke_of_rates, rate_check); + downsampling_pub_sub_test(pub_config, sub_config, ke_prefix, ke_of_rates, rate_check); } #[test] @@ -208,8 +211,9 @@ fn downsampling_by_interface_impl(flow: InterceptorFlow) { let ds_config = vec![ DownsamplingItemConf { id: Some("someid".to_string()), - flow, + flows: Some(vec![flow]), interfaces: Some(vec!["lo".to_string(), "lo0".to_string()]), + messages: vec![DownsamplingMessage::Push], rules: vec![DownsamplingRuleConf { key_expr: ke_10hz.clone().into(), freq: 10.0, @@ -217,8 +221,9 @@ fn downsampling_by_interface_impl(flow: InterceptorFlow) { }, DownsamplingItemConf { id: None, - flow, + flows: Some(vec![flow]), interfaces: Some(vec!["some_unknown_interface".to_string()]), + messages: vec![DownsamplingMessage::Push], rules: vec![DownsamplingRuleConf { key_expr: ke_no_effect.clone().into(), freq: 10.0, @@ -240,7 +245,7 @@ fn downsampling_by_interface_impl(flow: InterceptorFlow) { let (pub_config, sub_config) = build_config(locator, ds_config, flow); - downsampling_test(pub_config, sub_config, ke_prefix, ke_of_rates, rate_check); + downsampling_pub_sub_test(pub_config, sub_config, ke_prefix, ke_of_rates, rate_check); } #[cfg(unix)] @@ -263,7 +268,8 @@ fn downsampling_config_error_wrong_strategy() { r#" [ { - flow: "down", + flows: ["down"], + messages: ["push"], rules: [ { key_expr: "test/downsamples_by_keyexp/r100", freq: 10, }, { key_expr: "test/downsamples_by_keyexp/r50", freq: 20, } @@ -290,14 +296,16 @@ fn downsampling_config_error_repeated_id() { [ { id: "REPEATED", - flow: "egress", + flows: ["egress"], + messages: ["push"], rules: [ { key_expr: "test/downsamples_by_keyexp/r100", freq: 10, }, ], }, { id: "REPEATED", - flow: "ingress", + flows: ["ingress"], + messages: ["push"], rules: [ { key_expr: "test/downsamples_by_keyexp/r50", freq: 20, } ], @@ -309,3 +317,121 @@ fn downsampling_config_error_repeated_id() { zenoh::open(config).wait().unwrap(); } + +fn downsampling_query_reply_test( + query_config: Config, + queryable_config: Config, + queryable_ke: &str, + reply_counter: Arc, + nb_queries: usize, +) { + let query_session = zenoh::open(query_config).wait().unwrap(); + let queryable_session = zenoh::open(queryable_config).wait().unwrap(); + + let response_ke = queryable_ke.to_owned(); + queryable_session + .declare_queryable(queryable_ke) + .callback(move |query| { + query.reply(&response_ke, "".as_bytes()).wait().unwrap(); + }) + .background() + .wait() + .unwrap(); + + std::thread::sleep(std::time::Duration::from_secs(1)); + + let mut handles = Vec::new(); + for _ in 0..nb_queries { + let queryable_ke = queryable_ke.to_owned(); + let query_session = query_session.clone(); + let reply_counter = reply_counter.clone(); + let handle = std::thread::spawn(move || { + let query = query_session + .get(&queryable_ke) + .consolidation(ConsolidationMode::None) + .timeout(std::time::Duration::from_secs(5)) + .wait() + .unwrap(); + while let Ok(reply) = query.recv() { + if reply.into_result().is_ok() { + reply_counter.fetch_add(1, Ordering::SeqCst); + } + } + }); + handles.push(handle); + } + for handle in handles { + handle.join().unwrap(); + } + query_session.close().wait().unwrap(); + queryable_session.close().wait().unwrap(); +} + +fn downsampling_query_rate_test(flow: InterceptorFlow) { + let queryable_ke = "test/downsamples_query"; + let locator = "tcp/127.0.0.1:31448"; + + let ds_config = DownsamplingItemConf { + id: None, + flows: Some(vec![flow]), + interfaces: None, + messages: vec![DownsamplingMessage::Query], + rules: vec![DownsamplingRuleConf { + key_expr: queryable_ke.try_into().unwrap(), + freq: 0.01, + }], + }; + + let (query_config, queryable_config) = build_config(locator, vec![ds_config], flow); + let reply_counter = Arc::new(AtomicUsize::new(0)); + downsampling_query_reply_test( + query_config, + queryable_config, + queryable_ke, + reply_counter.clone(), + 2, + ); + assert!(reply_counter.load(Ordering::SeqCst) == 1); +} + +fn downsampling_reply_rate_test(flow: InterceptorFlow) { + let queryable_ke = "test/downsamples_reply"; + let locator = "tcp/127.0.0.1:31449"; + + let ds_config = DownsamplingItemConf { + id: None, + flows: Some(vec![flow]), + interfaces: None, + messages: vec![DownsamplingMessage::Reply], + rules: vec![DownsamplingRuleConf { + key_expr: queryable_ke.try_into().unwrap(), + freq: 0.01, + }], + }; + + let (queryable_config, query_config) = build_config(locator, vec![ds_config], flow); + let reply_counter = Arc::new(AtomicUsize::new(0)); + downsampling_query_reply_test( + query_config, + queryable_config, + queryable_ke, + reply_counter.clone(), + 2, + ); + + assert!(reply_counter.load(Ordering::SeqCst) == 1); +} + +#[test] +fn downsampling_query_test() { + zenoh::init_log_from_env_or("error"); + downsampling_query_rate_test(InterceptorFlow::Ingress); + downsampling_query_rate_test(InterceptorFlow::Egress); +} + +#[test] +fn downsampling_reply_test() { + zenoh::init_log_from_env_or("error"); + downsampling_reply_rate_test(InterceptorFlow::Ingress); + downsampling_reply_rate_test(InterceptorFlow::Egress); +}