Skip to content

Commit 6c237c1

Browse files
add querier (#399)
* add querier * add docs * address review comments * switch zenoh branch to main
1 parent d07ce42 commit 6c237c1

File tree

9 files changed

+409
-5
lines changed

9 files changed

+409
-5
lines changed

examples/README.md

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,24 @@ or
106106
python3 z_get.py -s 'demo/**'
107107
```
108108

109+
### z_querier
110+
111+
Continuously sends query messages for a selector.
112+
The queryables with a matching path or selector (for instance [z_queryable](#z_queryable) and [z_storage](#z_storage))
113+
will receive these queries and reply with paths/payloads that will be received by the querier's query callback.
114+
115+
Typical usage:
116+
117+
```bash
118+
python3 z_querier.py
119+
```
120+
121+
or
122+
123+
```bash
124+
python3 z_get.py -s 'demo/**'
125+
```
126+
109127
### z_queryable
110128

111129
Creates a queryable function with a key expression.

examples/z_pub.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,11 +12,14 @@
1212
# ZettaScale Zenoh Team, <[email protected]>
1313
#
1414
import time
15+
from typing import Optional
1516

1617
import zenoh
1718

1819

19-
def main(conf: zenoh.Config, key: str, payload: str, iter: int, interval: int):
20+
def main(
21+
conf: zenoh.Config, key: str, payload: str, iter: Optional[int], interval: int
22+
):
2023
# initiate logging
2124
zenoh.init_log_from_env_or("error")
2225

examples/z_querier.py

Lines changed: 152 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,152 @@
1+
#
2+
# Copyright (c) 2024 ZettaScale Technology
3+
#
4+
# This program and the accompanying materials are made available under the
5+
# terms of the Eclipse Public License 2.0 which is available at
6+
# http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
7+
# which is available at https://www.apache.org/licenses/LICENSE-2.0.
8+
#
9+
# SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
10+
#
11+
# Contributors:
12+
# ZettaScale Zenoh Team, <[email protected]>
13+
#
14+
import itertools
15+
import time
16+
from typing import Optional, Tuple
17+
18+
import zenoh
19+
20+
21+
def main(
22+
conf: zenoh.Config,
23+
selector: str,
24+
target: zenoh.QueryTarget,
25+
payload: str,
26+
timeout: float,
27+
iter: Optional[int],
28+
):
29+
# initiate logging
30+
zenoh.init_log_from_env_or("error")
31+
print("Opening session...")
32+
with zenoh.open(conf) as session:
33+
query_selector = zenoh.Selector(selector)
34+
35+
print(f"Declaring Querier on '{query_selector.key_expr}'...")
36+
querier = session.declare_querier(
37+
query_selector.key_expr, target=target, timeout=timeout
38+
)
39+
40+
print("Press CTRL-C to quit...")
41+
for idx in itertools.count() if iter is None else range(iter):
42+
time.sleep(1.0)
43+
buf = f"[{idx:4d}] {payload if payload else ''}"
44+
print(f"Querying '{selector}' with payload '{buf}')...")
45+
46+
replies = querier.get(parameters=query_selector.parameters, payload=buf)
47+
for reply in replies:
48+
try:
49+
print(
50+
f">> Received ('{reply.ok.key_expr}': '{reply.ok.payload.to_string()}')"
51+
)
52+
except:
53+
print(f">> Received (ERROR: '{reply.err.payload.to_string()}')")
54+
55+
56+
if __name__ == "__main__":
57+
# --- Command line argument parsing --- --- --- --- --- ---
58+
import argparse
59+
import json
60+
61+
parser = argparse.ArgumentParser(
62+
prog="z_querier", description="zenoh querier example"
63+
)
64+
parser.add_argument(
65+
"--mode",
66+
"-m",
67+
dest="mode",
68+
choices=["peer", "client"],
69+
type=str,
70+
help="The zenoh session mode.",
71+
)
72+
parser.add_argument(
73+
"--connect",
74+
"-e",
75+
dest="connect",
76+
metavar="ENDPOINT",
77+
action="append",
78+
type=str,
79+
help="Endpoints to connect to.",
80+
)
81+
parser.add_argument(
82+
"--listen",
83+
"-l",
84+
dest="listen",
85+
metavar="ENDPOINT",
86+
action="append",
87+
type=str,
88+
help="Endpoints to listen on.",
89+
)
90+
parser.add_argument(
91+
"--selector",
92+
"-s",
93+
dest="selector",
94+
default="demo/example/**",
95+
type=str,
96+
help="The selection of resources to query.",
97+
)
98+
parser.add_argument(
99+
"--target",
100+
"-t",
101+
dest="target",
102+
choices=["ALL", "BEST_MATCHING", "ALL_COMPLETE", "NONE"],
103+
default="BEST_MATCHING",
104+
type=str,
105+
help="The target queryables of the query.",
106+
)
107+
parser.add_argument(
108+
"--payload",
109+
"-p",
110+
dest="payload",
111+
type=str,
112+
help="An optional payload to send in the query.",
113+
)
114+
parser.add_argument(
115+
"--timeout",
116+
"-o",
117+
dest="timeout",
118+
default=10.0,
119+
type=float,
120+
help="The query timeout",
121+
)
122+
parser.add_argument(
123+
"--config",
124+
"-c",
125+
dest="config",
126+
metavar="FILE",
127+
type=str,
128+
help="A configuration file.",
129+
)
130+
parser.add_argument(
131+
"--iter", dest="iter", type=int, help="How many gets to perform"
132+
)
133+
134+
args = parser.parse_args()
135+
conf = (
136+
zenoh.Config.from_file(args.config)
137+
if args.config is not None
138+
else zenoh.Config()
139+
)
140+
if args.mode is not None:
141+
conf.insert_json5("mode", json.dumps(args.mode))
142+
if args.connect is not None:
143+
conf.insert_json5("connect/endpoints", json.dumps(args.connect))
144+
if args.listen is not None:
145+
conf.insert_json5("listen/endpoints", json.dumps(args.listen))
146+
target = {
147+
"ALL": zenoh.QueryTarget.ALL,
148+
"BEST_MATCHING": zenoh.QueryTarget.BEST_MATCHING,
149+
"ALL_COMPLETE": zenoh.QueryTarget.ALL_COMPLETE,
150+
}.get(args.target)
151+
152+
main(conf, args.selector, target, args.payload, args.timeout, args.iter)

src/lib.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -60,8 +60,8 @@ pub(crate) mod zenoh {
6060
pubsub::{Publisher, Subscriber},
6161
qos::{CongestionControl, Priority, Reliability},
6262
query::{
63-
ConsolidationMode, Parameters, Query, QueryConsolidation, QueryTarget, Queryable,
64-
Reply, ReplyError, Selector,
63+
ConsolidationMode, Parameters, Querier, Query, QueryConsolidation, QueryTarget,
64+
Queryable, Reply, ReplyError, Selector,
6565
},
6666
sample::{Sample, SampleKind},
6767
scouting::{scout, Hello, Scout},

src/query.rs

Lines changed: 56 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ use pyo3::{
2121
use crate::{
2222
bytes::{Encoding, ZBytes},
2323
config::ZenohId,
24-
handlers::HandlerImpl,
24+
handlers::{into_handler, HandlerImpl},
2525
key_expr::KeyExpr,
2626
macros::{build, downcast_or_new, enum_mapper, option_wrapper, wrapper},
2727
qos::{CongestionControl, Priority},
@@ -284,6 +284,61 @@ impl Queryable {
284284
}
285285
}
286286

287+
option_wrapper!(zenoh::query::Querier<'static>, "Undeclared querier");
288+
289+
#[pymethods]
290+
impl Querier {
291+
#[classmethod]
292+
fn __class_getitem__(cls: &Bound<PyType>, args: &Bound<PyAny>) -> PyObject {
293+
generic(cls, args)
294+
}
295+
296+
fn __enter__<'a, 'py>(this: &'a Bound<'py, Self>) -> PyResult<&'a Bound<'py, Self>> {
297+
Self::check(this)
298+
}
299+
300+
#[pyo3(signature = (*_args, **_kwargs))]
301+
fn __exit__(
302+
&mut self,
303+
py: Python,
304+
_args: &Bound<PyTuple>,
305+
_kwargs: Option<&Bound<PyDict>>,
306+
) -> PyResult<PyObject> {
307+
self.undeclare(py)?;
308+
Ok(py.None())
309+
}
310+
311+
#[getter]
312+
fn key_expr(&self) -> PyResult<KeyExpr> {
313+
Ok(self.get_ref()?.key_expr().clone().into())
314+
}
315+
316+
#[allow(clippy::too_many_arguments)]
317+
#[pyo3(signature = (handler = None, *, parameters = None, payload = None, encoding = None, attachment = None))]
318+
fn get(
319+
&self,
320+
py: Python,
321+
handler: Option<&Bound<PyAny>>,
322+
#[pyo3(from_py_with = "Parameters::from_py_opt")] parameters: Option<Parameters>,
323+
#[pyo3(from_py_with = "ZBytes::from_py_opt")] payload: Option<ZBytes>,
324+
#[pyo3(from_py_with = "Encoding::from_py_opt")] encoding: Option<Encoding>,
325+
#[pyo3(from_py_with = "ZBytes::from_py_opt")] attachment: Option<ZBytes>,
326+
) -> PyResult<HandlerImpl<Reply>> {
327+
let this = self.get_ref()?;
328+
let (handler, _) = into_handler(py, handler)?;
329+
let builder = build!(this.get(), parameters, payload, encoding, attachment);
330+
wait(py, builder.with(handler)).map_into()
331+
}
332+
333+
fn undeclare(&mut self, py: Python) -> PyResult<()> {
334+
wait(py, self.take()?.undeclare())
335+
}
336+
337+
fn __repr__(&self) -> PyResult<String> {
338+
Ok(format!("{:?}", self.get_ref()?))
339+
}
340+
}
341+
287342
wrapper!(zenoh::query::Selector<'static>: Clone);
288343
downcast_or_new!(Selector, None);
289344

src/session.rs

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ use crate::{
2828
macros::{build, wrapper},
2929
pubsub::{Publisher, Subscriber},
3030
qos::{CongestionControl, Priority, Reliability},
31-
query::{QueryConsolidation, QueryTarget, Queryable, Reply, Selector},
31+
query::{Querier, QueryConsolidation, QueryTarget, Queryable, Reply, Selector},
3232
time::Timestamp,
3333
utils::{timeout, wait, IntoPython, MapInto},
3434
};
@@ -225,6 +225,33 @@ impl Session {
225225
wait(py, builder).map_into()
226226
}
227227

228+
#[allow(clippy::too_many_arguments)]
229+
#[pyo3(signature = (key_expr, *, target = None, consolidation = None, timeout = None, congestion_control = None, priority = None, express = None))]
230+
fn declare_querier(
231+
&self,
232+
py: Python,
233+
#[pyo3(from_py_with = "KeyExpr::from_py")] key_expr: KeyExpr,
234+
target: Option<QueryTarget>,
235+
#[pyo3(from_py_with = "QueryConsolidation::from_py_opt")] consolidation: Option<
236+
QueryConsolidation,
237+
>,
238+
#[pyo3(from_py_with = "timeout")] timeout: Option<Duration>,
239+
congestion_control: Option<CongestionControl>,
240+
priority: Option<Priority>,
241+
express: Option<bool>,
242+
) -> PyResult<Querier> {
243+
let builder = build!(
244+
self.0.declare_querier(key_expr),
245+
target,
246+
consolidation,
247+
timeout,
248+
congestion_control,
249+
priority,
250+
express,
251+
);
252+
wait(py, builder).map_into()
253+
}
254+
228255
fn liveliness(&self) -> Liveliness {
229256
Liveliness(self.0.clone())
230257
}

tests/examples_check.py

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -149,6 +149,43 @@ def test_z_get_z_queryable():
149149
assert not z_queryable.errors
150150

151151

152+
def test_z_querier_z_queryable():
153+
"""Test z_querier & z_queryable"""
154+
z_queryable = Pyrun("z_queryable.py", ["-k=demo/example/zenoh-python-queryable"])
155+
time.sleep(3)
156+
## z_querier: Able to get reply from queryable
157+
z_querier = Pyrun(
158+
"z_querier.py", ["-s=demo/example/zenoh-python-queryable", "-p=value"]
159+
)
160+
time.sleep(5)
161+
z_queryable.interrupt()
162+
z_querier.interrupt()
163+
164+
if not (
165+
"Received ('demo/example/zenoh-python-queryable': 'Queryable from Python!')"
166+
in "".join(z_querier.stdout)
167+
):
168+
z_querier.dbg()
169+
z_queryable.dbg()
170+
z_querier.errors.append("z_querier didn't get a response from z_queryable")
171+
queryableout = "".join(z_queryable.stdout)
172+
if not (
173+
"Received Query 'demo/example/zenoh-python-queryable' with payload: [ 0] value"
174+
in queryableout
175+
):
176+
z_queryable.errors.append("z_queryable didn't catch query [0]")
177+
elif not (
178+
"Received Query 'demo/example/zenoh-python-queryable' with payload: [ 2] value"
179+
in queryableout
180+
):
181+
z_queryable.errors.append("z_queryable didn't catch query [2]")
182+
if any(("z_queryable" in error) for error in z_queryable.errors):
183+
z_queryable.dbg()
184+
185+
assert not z_querier.errors
186+
assert not z_queryable.errors
187+
188+
152189
def test_z_storage_z_sub():
153190
"""Test z_storage & z_sub."""
154191
z_storage = Pyrun("z_storage.py")

0 commit comments

Comments
 (0)