Skip to content

Commit

Permalink
Add support for query/reply messages to Downsampling (#1797)
Browse files Browse the repository at this point in the history
* Add messages and flows to downsampling config

* Add Downsampling message types logic

* Move Downsampling messages config out of rules

In the current implementation, rules within the same Downsampling
config share the same state if configured on the same keyexpr.
Having message types within the rules can lead users to configure
different messages types within different rules with same keyexpr,
which would produce the same result as having all message types
within the same rule, i.e downsampling on the whole set of messages
instead of doing so seperately for each type.

* Update DEFAULT_CONFIG

* Add missing check for empty flows list

* Fix tests with json5 config

* Fix eagerly evaluated downsampling interceptor

* Add downsampling query/reply tests

* Fix clippy warning

* DownsamplingFilters use default debug format

---------

Co-authored-by: OlivierHecart <[email protected]>
  • Loading branch information
oteffahi and OlivierHecart authored Mar 7, 2025
1 parent 112d9cd commit 41ac4a7
Show file tree
Hide file tree
Showing 6 changed files with 273 additions and 52 deletions.
14 changes: 12 additions & 2 deletions DEFAULT_CONFIG.json5
Original file line number Diff line number Diff line change
Expand Up @@ -281,8 +281,18 @@
// /// Optional list of network interfaces messages will be processed on, the rest will be passed as is.
// /// If absent, the rules will be applied to all interfaces, in case of an empty list it means that they will not be applied to any.
// interfaces: [ "wlan0" ],
// /// Data flow messages will be processed on. ("egress" or "ingress")
// flow: "egress",
// /// Optional list of data flows messages will be processed on ("egress" and/or "ingress").
// /// If absent, the rules will be applied to both flows.
// flow: ["ingress", "egress"],
// /// List of message type on which downsampling will be applied. Must not be empty.
// messages: [
// /// Publication (Put and Delete)
// "push",
// /// Get
// "query",
// /// Queryable Reply to a Query
// "reply"
// ],
// /// A list of downsampling rules: key_expression and the maximum frequency in Hertz
// rules: [
// { key_expr: "demo/example/zenoh-rs-pub", freq: 0.1 },
Expand Down
14 changes: 12 additions & 2 deletions commons/zenoh-config/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,14 @@ pub enum InterceptorFlow {
Ingress,
}

#[derive(Clone, Copy, Debug, Serialize, Deserialize, Eq, PartialEq)]
#[serde(rename_all = "snake_case")]
pub enum DownsamplingMessage {
Push,
Query,
Reply,
}

#[derive(Debug, Deserialize, Serialize, Clone)]
pub struct DownsamplingRuleConf {
/// A list of key-expressions to which the downsampling will be applied.
Expand All @@ -104,10 +112,12 @@ pub struct DownsamplingItemConf {
/// A list of interfaces to which the downsampling will be applied
/// Downsampling will be applied for all interfaces if the parameter is None
pub interfaces: Option<Vec<String>>,
// list of message types on which the downsampling will be applied
pub messages: Vec<DownsamplingMessage>,
/// A list of interfaces to which the downsampling will be applied.
pub rules: Vec<DownsamplingRuleConf>,
/// Downsampling flow direction: egress, ingress
pub flow: InterceptorFlow,
/// Downsampling flow directions: egress and/or ingress
pub flows: Option<Vec<InterceptorFlow>>,
}

#[derive(Serialize, Debug, Deserialize, Clone)]
Expand Down
8 changes: 2 additions & 6 deletions zenoh/src/net/routing/interceptor/authorization.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ use zenoh_keyexpr::{
keyexpr_tree::{IKeyExprTree, IKeyExprTreeMut, KeBoxTree},
};
use zenoh_result::ZResult;

use super::InterfaceEnabled;
type PolicyForSubject = FlowPolicy;

type PolicyMap = HashMap<usize, PolicyForSubject, RandomState>;
Expand Down Expand Up @@ -243,12 +245,6 @@ impl FlowPolicy {
}
}

#[derive(Default, Debug)]
pub struct InterfaceEnabled {
pub ingress: bool,
pub egress: bool,
}

pub struct PolicyEnforcer {
pub(crate) acl_enabled: bool,
pub(crate) default_permission: Permission,
Expand Down
99 changes: 78 additions & 21 deletions zenoh/src/net/routing/interceptor/downsampling.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@ use std::{
sync::{Arc, Mutex},
};

use zenoh_config::{DownsamplingItemConf, DownsamplingRuleConf, InterceptorFlow};
use zenoh_config::{
DownsamplingItemConf, DownsamplingMessage, DownsamplingRuleConf, InterceptorFlow,
};
use zenoh_core::zlock;
use zenoh_keyexpr::keyexpr_tree::{
impls::KeyedSetProvider, support::UnknownWildness, IKeyExprTree, IKeyExprTreeMut, KeBoxTree,
Expand All @@ -46,7 +48,19 @@ pub(crate) fn downsampling_interceptor_factories(
bail!("Invalid Downsampling config: id '{id}' is repeated");
}
}
res.push(Box::new(DownsamplingInterceptorFactory::new(ds.clone())));
let mut ds = ds.clone();
// check for undefined flows and initialize them
let flows = ds
.flows
.get_or_insert(vec![InterceptorFlow::Ingress, InterceptorFlow::Egress]);
if flows.is_empty() {
bail!("Invalid Downsampling config: flows list must not be empty");
}
// check for empty messages list
if ds.messages.is_empty() {
bail!("Invalid Downsampling config: messages list must not be empty");
}
res.push(Box::new(DownsamplingInterceptorFactory::new(ds)));
}

Ok(res)
Expand All @@ -55,15 +69,21 @@ pub(crate) fn downsampling_interceptor_factories(
pub struct DownsamplingInterceptorFactory {
interfaces: Option<Vec<String>>,
rules: Vec<DownsamplingRuleConf>,
flow: InterceptorFlow,
flows: InterfaceEnabled,
messages: Arc<DownsamplingFilters>,
}

impl DownsamplingInterceptorFactory {
pub fn new(conf: DownsamplingItemConf) -> Self {
Self {
interfaces: conf.interfaces,
rules: conf.rules,
flow: conf.flow,
flows: conf
.flows
.expect("config flows should be set")
.as_slice()
.into(),
messages: Arc::new(conf.messages.as_slice().into()),
}
}
}
Expand Down Expand Up @@ -91,21 +111,20 @@ impl InterceptorFactoryTrait for DownsamplingInterceptorFactory {
}
}
};

match self.flow {
InterceptorFlow::Ingress => (
Some(Box::new(ComputeOnMiss::new(DownsamplingInterceptor::new(
(
self.flows.ingress.then(|| {
Box::new(ComputeOnMiss::new(DownsamplingInterceptor::new(
self.messages.clone(),
self.rules.clone(),
)))),
None,
),
InterceptorFlow::Egress => (
None,
Some(Box::new(ComputeOnMiss::new(DownsamplingInterceptor::new(
))) as IngressInterceptor
}),
self.flows.egress.then(|| {
Box::new(ComputeOnMiss::new(DownsamplingInterceptor::new(
self.messages.clone(),
self.rules.clone(),
)))),
),
}
))) as EgressInterceptor
}),
)
}

fn new_transport_multicast(
Expand All @@ -120,16 +139,52 @@ impl InterceptorFactoryTrait for DownsamplingInterceptorFactory {
}
}

#[derive(Debug, Default, Clone)]
pub(crate) struct DownsamplingFilters {
push: bool,
query: bool,
reply: bool,
}

impl From<&[DownsamplingMessage]> for DownsamplingFilters {
fn from(value: &[DownsamplingMessage]) -> Self {
let mut res = Self::default();
for v in value {
match v {
DownsamplingMessage::Push => res.push = true,
DownsamplingMessage::Query => res.query = true,
DownsamplingMessage::Reply => res.reply = true,
}
}
res
}
}

struct Timestate {
pub threshold: tokio::time::Duration,
pub latest_message_timestamp: tokio::time::Instant,
}

pub(crate) struct DownsamplingInterceptor {
filtered_messages: Arc<DownsamplingFilters>,
ke_id: Arc<Mutex<KeBoxTree<usize, UnknownWildness, KeyedSetProvider>>>,
ke_state: Arc<Mutex<HashMap<usize, Timestate>>>,
}

impl DownsamplingInterceptor {
fn is_msg_filtered(&self, ctx: &RoutingContext<NetworkMessage>) -> bool {
match ctx.msg.body {
NetworkBody::Push(_) => self.filtered_messages.push,
NetworkBody::Request(_) => self.filtered_messages.query,
NetworkBody::Response(_) => self.filtered_messages.reply,
NetworkBody::ResponseFinal(_) => false,
NetworkBody::Interest(_) => false,
NetworkBody::Declare(_) => false,
NetworkBody::OAM(_) => false,
}
}
}

impl InterceptorTrait for DownsamplingInterceptor {
fn compute_keyexpr_cache(&self, key_expr: &KeyExpr<'_>) -> Option<Box<dyn Any + Send + Sync>> {
let ke_id = zlock!(self.ke_id);
Expand All @@ -146,7 +201,7 @@ impl InterceptorTrait for DownsamplingInterceptor {
ctx: RoutingContext<NetworkMessage>,
cache: Option<&Box<dyn Any + Send + Sync>>,
) -> Option<RoutingContext<NetworkMessage>> {
if matches!(ctx.msg.body, NetworkBody::Push(_)) {
if self.is_msg_filtered(&ctx) {
if let Some(cache) = cache {
if let Some(id) = cache.downcast_ref::<Option<usize>>() {
if let Some(id) = id {
Expand Down Expand Up @@ -177,7 +232,7 @@ impl InterceptorTrait for DownsamplingInterceptor {
const NANOS_PER_SEC: f64 = 1_000_000_000.0;

impl DownsamplingInterceptor {
pub fn new(rules: Vec<DownsamplingRuleConf>) -> Self {
pub fn new(messages: Arc<DownsamplingFilters>, rules: Vec<DownsamplingRuleConf>) -> Self {
let mut ke_id = KeBoxTree::default();
let mut ke_state = HashMap::default();
for (id, rule) in rules.into_iter().enumerate() {
Expand All @@ -197,12 +252,14 @@ impl DownsamplingInterceptor {
},
);
tracing::debug!(
"New downsampler rule enabled: key_expr={:?}, threshold={:?}",
"New downsampler rule enabled: key_expr={:?}, threshold={:?}, messages={:?}",
rule.key_expr,
threshold
threshold,
messages,
);
}
Self {
filtered_messages: messages,
ke_id: Arc::new(Mutex::new(ke_id)),
ke_state: Arc::new(Mutex::new(ke_state)),
}
Expand Down
24 changes: 23 additions & 1 deletion zenoh/src/net/routing/interceptor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use access_control::acl_interceptor_factories;
mod authorization;
use std::any::Any;

use zenoh_config::Config;
use zenoh_config::{Config, InterceptorFlow};
use zenoh_protocol::network::NetworkMessage;
use zenoh_result::ZResult;
use zenoh_transport::{multicast::TransportMulticast, unicast::TransportUnicast};
Expand All @@ -35,6 +35,28 @@ use crate::api::key_expr::KeyExpr;
pub mod downsampling;
use crate::net::routing::interceptor::downsampling::downsampling_interceptor_factories;

#[derive(Default, Debug)]
pub struct InterfaceEnabled {
pub ingress: bool,
pub egress: bool,
}

impl From<&[InterceptorFlow]> for InterfaceEnabled {
fn from(value: &[InterceptorFlow]) -> Self {
let mut res = Self {
ingress: false,
egress: false,
};
for v in value {
match v {
InterceptorFlow::Egress => res.egress = true,
InterceptorFlow::Ingress => res.ingress = true,
}
}
res
}
}

pub(crate) trait InterceptorTrait {
fn compute_keyexpr_cache(&self, key_expr: &KeyExpr<'_>) -> Option<Box<dyn Any + Send + Sync>>;

Expand Down
Loading

0 comments on commit 41ac4a7

Please sign in to comment.