Skip to content

Commit 625e32e

Browse files
authored
Query context manager (#461)
* remove useless `IntoRust` bound in handlers system Only the `IntoPython` bound is important, the rest was there to reinforce the typing, but it's not needed at all, and it prevented to use objects not implementing `IntoRust` (which will be the case when `Query` will use `option_wrapper`) * Add context manager to queries, to properly use channels with queryables * fix tests with new aligned queryable print * add documentation for `Query.drop`
1 parent 6008dd3 commit 625e32e

File tree

6 files changed

+86
-70
lines changed

6 files changed

+86
-70
lines changed

examples/z_queryable.py

Lines changed: 12 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -17,32 +17,27 @@
1717

1818

1919
def main(conf: zenoh.Config, key: str, payload: str, complete: bool):
20-
def queryable_callback(query):
21-
print(
22-
f">> [Queryable ] Received Query '{query.selector}'"
23-
+ (
24-
f" with payload: {query.payload.to_string()}"
25-
if query.payload is not None
26-
else ""
27-
)
28-
)
29-
query.reply(key, payload)
30-
3120
# initiate logging
3221
zenoh.init_log_from_env_or("error")
3322

3423
print("Opening session...")
3524
with zenoh.open(conf) as session:
3625
print(f"Declaring Queryable on '{key}'...")
37-
session.declare_queryable(key, queryable_callback, complete=complete)
26+
queryable = session.declare_queryable(key, complete=complete)
3827

3928
print("Press CTRL-C to quit...")
4029
while True:
41-
try:
42-
time.sleep(1)
43-
except Exception as err:
44-
print(err, flush=True)
45-
raise
30+
with queryable.recv() as query:
31+
if query.payload is not None:
32+
print(
33+
f">> [Queryable ] Received Query '{query.selector}'"
34+
f" with payload: '{query.payload.to_string()}'"
35+
)
36+
else:
37+
print(f">> [Queryable ] Received Query '{query.selector}'")
38+
query.reply(key, payload)
39+
# it's possible to call `query.drop()` after handling it
40+
# instead of using a context manager
4641

4742

4843
# --- Command line argument parsing --- --- --- --- --- ---

src/handlers.rs

Lines changed: 15 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -207,6 +207,8 @@ impl Drop for PythonCallback {
207207
}
208208
}
209209

210+
// the generic type is not useful per se, it just there to make typing
211+
// prettier, e.g. to have `get` returning a `PyResult<HandlerImpl<Reply>>`
210212
pub(crate) enum HandlerImpl<T> {
211213
Rust(Py<Handler>, PhantomData<T>),
212214
Python(PyObject),
@@ -239,10 +241,7 @@ impl<T> ToPyObject for HandlerImpl<T> {
239241
}
240242
}
241243

242-
impl<T: IntoRust> HandlerImpl<T>
243-
where
244-
T::Into: IntoPython,
245-
{
244+
impl<T> HandlerImpl<T> {
246245
pub(crate) fn try_recv(&self, py: Python) -> PyResult<PyObject> {
247246
match self {
248247
Self::Rust(handler, _) => handler.borrow(py).try_recv(py),
@@ -258,11 +257,11 @@ where
258257
}
259258
}
260259

261-
struct RustHandler<H: IntoRust, T: IntoRust>
260+
struct RustHandler<H: IntoRust, T: IntoPython>
262261
where
263-
H::Into: IntoHandler<T::Into>,
262+
H::Into: IntoHandler<T>,
264263
{
265-
handler: <H::Into as IntoHandler<T::Into>>::Handler,
264+
handler: <H::Into as IntoHandler<T>>::Handler,
266265
_phantom: PhantomData<T>,
267266
}
268267

@@ -300,10 +299,7 @@ impl<E: fmt::Display> fmt::Display for DeadlineError<E> {
300299
}
301300
}
302301

303-
impl<T: IntoRust> Receiver for RustHandler<DefaultHandler, T>
304-
where
305-
T::Into: IntoPython,
306-
{
302+
impl<T: IntoPython> Receiver for RustHandler<DefaultHandler, T> {
307303
fn type_name(&self) -> &'static str {
308304
short_type_name::<T>()
309305
}
@@ -325,10 +321,7 @@ where
325321
}
326322
}
327323

328-
impl<T: IntoRust> Receiver for RustHandler<FifoChannel, T>
329-
where
330-
T::Into: IntoPython,
331-
{
324+
impl<T: IntoPython> Receiver for RustHandler<FifoChannel, T> {
332325
fn type_name(&self) -> &'static str {
333326
short_type_name::<T>()
334327
}
@@ -350,10 +343,7 @@ where
350343
}
351344
}
352345

353-
impl<T: IntoRust> Receiver for RustHandler<RingChannel, T>
354-
where
355-
T::Into: IntoPython,
356-
{
346+
impl<T: IntoPython> Receiver for RustHandler<RingChannel, T> {
357347
fn type_name(&self) -> &'static str {
358348
short_type_name::<T>()
359349
}
@@ -375,14 +365,13 @@ where
375365
}
376366
}
377367

378-
fn rust_handler<H: IntoRust, T: IntoRust>(
368+
fn rust_handler<H: IntoRust, T: IntoPython>(
379369
py: Python,
380370
into_handler: H,
381-
) -> (RustCallback<T::Into>, HandlerImpl<T>)
371+
) -> (RustCallback<T>, HandlerImpl<T::Into>)
382372
where
383-
H::Into: IntoHandler<T::Into>,
384-
<H::Into as IntoHandler<T::Into>>::Handler: Send + Sync,
385-
T::Into: IntoPython,
373+
H::Into: IntoHandler<T>,
374+
<H::Into as IntoHandler<T>>::Handler: Send + Sync,
386375
RustHandler<H, T>: Receiver,
387376
{
388377
let (callback, handler) = into_handler.into_rust().into_handler();
@@ -418,13 +407,10 @@ fn python_callback<T: IntoPython>(callback: &Bound<PyAny>) -> PyResult<RustCallb
418407
})
419408
}
420409

421-
pub(crate) fn into_handler<T: IntoRust>(
410+
pub(crate) fn into_handler<T: IntoPython>(
422411
py: Python,
423412
obj: Option<&Bound<PyAny>>,
424-
) -> PyResult<(impl IntoHandler<T::Into, Handler = HandlerImpl<T>>, bool)>
425-
where
426-
T::Into: IntoPython,
427-
{
413+
) -> PyResult<(impl IntoHandler<T, Handler = HandlerImpl<T::Into>>, bool)> {
428414
let mut background = false;
429415
let Some(obj) = obj else {
430416
return Ok((rust_handler(py, DefaultHandler), background));

src/query.rs

Lines changed: 40 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -76,38 +76,53 @@ impl QueryConsolidation {
7676
}
7777
}
7878

79-
wrapper!(zenoh::query::Query: Clone);
79+
option_wrapper!(zenoh::query::Query, "Dropped query");
8080

8181
#[pymethods]
8282
impl Query {
83+
fn __enter__<'a, 'py>(this: &'a Bound<'py, Self>) -> &'a Bound<'py, Self> {
84+
this
85+
}
86+
87+
#[pyo3(signature = (*_args, **_kwargs))]
88+
fn __exit__(
89+
&mut self,
90+
py: Python,
91+
_args: &Bound<PyTuple>,
92+
_kwargs: Option<&Bound<PyDict>>,
93+
) -> PyResult<PyObject> {
94+
self.drop();
95+
Ok(py.None())
96+
}
97+
8398
#[getter]
84-
fn selector(&self) -> Selector {
85-
self.0.selector().into_owned().into()
99+
fn selector(&self) -> PyResult<Selector> {
100+
Ok(self.get_ref()?.selector().into_owned().into())
86101
}
87102

88103
#[getter]
89-
fn key_expr(&self) -> KeyExpr {
90-
self.0.key_expr().clone().into_owned().into()
104+
fn key_expr(&self) -> PyResult<KeyExpr> {
105+
Ok(self.get_ref()?.key_expr().clone().into_owned().into())
91106
}
92107

93108
#[getter]
94-
fn parameters(&self) -> Parameters {
95-
self.0.parameters().clone().into_owned().into()
109+
fn parameters(&self) -> PyResult<Parameters> {
110+
Ok(self.get_ref()?.parameters().clone().into_owned().into())
96111
}
97112

98113
#[getter]
99-
fn payload(&self) -> Option<ZBytes> {
100-
self.0.payload().cloned().map_into()
114+
fn payload(&self) -> PyResult<Option<ZBytes>> {
115+
Ok(self.get_ref()?.payload().cloned().map_into())
101116
}
102117

103118
#[getter]
104-
fn encoding(&self) -> Option<Encoding> {
105-
self.0.encoding().cloned().map_into()
119+
fn encoding(&self) -> PyResult<Option<Encoding>> {
120+
Ok(self.get_ref()?.encoding().cloned().map_into())
106121
}
107122

108123
#[getter]
109-
fn attachment(&self) -> Option<ZBytes> {
110-
self.0.attachment().cloned().map_into()
124+
fn attachment(&self) -> PyResult<Option<ZBytes>> {
125+
Ok(self.get_ref()?.attachment().cloned().map_into())
111126
}
112127

113128
// TODO timestamp
@@ -125,7 +140,7 @@ impl Query {
125140
#[pyo3(from_py_with = "ZBytes::from_py_opt")] attachment: Option<ZBytes>,
126141
) -> PyResult<()> {
127142
let build = build!(
128-
self.0.reply(key_expr, payload),
143+
self.get_ref()?.reply(key_expr, payload),
129144
encoding,
130145
congestion_control,
131146
priority,
@@ -134,14 +149,15 @@ impl Query {
134149
);
135150
wait(py, build)
136151
}
152+
137153
#[pyo3(signature = (payload, *, encoding = None))]
138154
fn reply_err(
139155
&self,
140156
py: Python,
141157
#[pyo3(from_py_with = "ZBytes::from_py")] payload: ZBytes,
142158
#[pyo3(from_py_with = "Encoding::from_py_opt")] encoding: Option<Encoding>,
143159
) -> PyResult<()> {
144-
let build = build!(self.0.reply_err(payload), encoding);
160+
let build = build!(self.get_ref()?.reply_err(payload), encoding);
145161
wait(py, build)
146162
}
147163

@@ -156,7 +172,7 @@ impl Query {
156172
#[pyo3(from_py_with = "ZBytes::from_py_opt")] attachment: Option<ZBytes>,
157173
) -> PyResult<()> {
158174
let build = build!(
159-
self.0.reply_del(key_expr),
175+
self.get_ref()?.reply_del(key_expr),
160176
congestion_control,
161177
priority,
162178
express,
@@ -165,12 +181,16 @@ impl Query {
165181
wait(py, build)
166182
}
167183

168-
fn __repr__(&self) -> String {
169-
format!("{:?}", self.0)
184+
fn drop(&mut self) {
185+
Python::with_gil(|gil| gil.allow_threads(|| drop(self.0.take())));
170186
}
171187

172-
fn __str__(&self) -> String {
173-
format!("{}", self.0)
188+
fn __repr__(&self) -> PyResult<String> {
189+
Ok(format!("{:?}", self.get_ref()?))
190+
}
191+
192+
fn __str__(&self) -> PyResult<String> {
193+
Ok(format!("{}", self.get_ref()?))
174194
}
175195
}
176196

src/utils.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,14 +37,14 @@ impl<T, E: IntoPyErr> IntoPyResult<T> for Result<T, E> {
3737
}
3838
}
3939

40-
pub(crate) trait IntoRust: Send + Sync + 'static {
40+
pub(crate) trait IntoRust: 'static {
4141
type Into;
4242
fn into_rust(self) -> Self::Into;
4343
}
4444

4545
into_rust!(bool, Duration);
4646

47-
pub(crate) trait IntoPython: Sized + Send + 'static {
47+
pub(crate) trait IntoPython: Sized + Send + Sync + 'static {
4848
type Into: IntoPy<PyObject>;
4949
fn into_python(self) -> Self::Into;
5050
fn into_pyobject(self, py: Python) -> PyObject {

tests/examples_check.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -170,12 +170,12 @@ def test_z_querier_z_queryable():
170170
z_querier.errors.append("z_querier didn't get a response from z_queryable")
171171
queryableout = "".join(z_queryable.stdout)
172172
if not (
173-
"Received Query 'demo/example/zenoh-python-queryable' with payload: [ 0] value"
173+
"Received Query 'demo/example/zenoh-python-queryable' with payload: '[ 0] value'"
174174
in queryableout
175175
):
176176
z_queryable.errors.append("z_queryable didn't catch query [0]")
177177
elif not (
178-
"Received Query 'demo/example/zenoh-python-queryable' with payload: [ 2] value"
178+
"Received Query 'demo/example/zenoh-python-queryable' with payload: '[ 2] value'"
179179
in queryableout
180180
):
181181
z_queryable.errors.append("z_queryable didn't catch query [2]")

zenoh/__init__.pyi

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -509,6 +509,8 @@ class Publisher:
509509
class Query:
510510
"""Structs received by a Queryable."""
511511

512+
def __enter__(self) -> Self: ...
513+
def __exit__(self, *_args, **_kwargs): ...
512514
@property
513515
def selector(self) -> Selector: ...
514516
@property
@@ -552,6 +554,19 @@ class Query:
552554
By default, queries only accept replies whose key expression intersects with the query's. Unless the query has enabled disjoint replies (you can check this through Query::accepts_replies), replying on a disjoint key expression will result in an error when resolving the reply.
553555
"""
554556

557+
def drop(self):
558+
"""Drop the instance of a query.
559+
The query will only be finalized when all query instances (one per queryable
560+
matched) are dropped. Finalization is required to not have query hanging
561+
on the querier side.
562+
563+
This method should be called after handling the query, as Python finalizers
564+
are not reliable, especially when it comes to loop variables. It is also
565+
possible, and advised, to use query context manager, which calls `drop` on
566+
exit. Once a query is dropped, it's no more possible to use it, and its
567+
methods will raise an exception.
568+
"""
569+
555570
def __str__(self) -> str: ...
556571

557572
@final

0 commit comments

Comments
 (0)