Skip to content

Commit

Permalink
Add context manager to queries, to properly use channels with queryables
Browse files Browse the repository at this point in the history
  • Loading branch information
wyfo committed Feb 28, 2025
1 parent b97d137 commit 52deec4
Show file tree
Hide file tree
Showing 3 changed files with 55 additions and 37 deletions.
29 changes: 12 additions & 17 deletions examples/z_queryable.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,32 +17,27 @@


def main(conf: zenoh.Config, key: str, payload: str, complete: bool):
def queryable_callback(query):
print(
f">> [Queryable ] Received Query '{query.selector}'"
+ (
f" with payload: {query.payload.to_string()}"
if query.payload is not None
else ""
)
)
query.reply(key, payload)

# initiate logging
zenoh.init_log_from_env_or("error")

print("Opening session...")
with zenoh.open(conf) as session:
print(f"Declaring Queryable on '{key}'...")
session.declare_queryable(key, queryable_callback, complete=complete)
queryable = session.declare_queryable(key, complete=complete)

print("Press CTRL-C to quit...")
while True:
try:
time.sleep(1)
except Exception as err:
print(err, flush=True)
raise
with queryable.recv() as query:
if query.payload is not None:
print(
f">> [Queryable ] Received Query '{query.selector}'"
f" with payload: '{query.payload.to_string()}'"
)
else:
print(f">> [Queryable ] Received Query '{query.selector}'")
query.reply(key, payload)
# it's possible to call `query.drop()` after handling it
# instead of using a context manager


# --- Command line argument parsing --- --- --- --- --- ---
Expand Down
60 changes: 40 additions & 20 deletions src/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,38 +76,53 @@ impl QueryConsolidation {
}
}

wrapper!(zenoh::query::Query: Clone);
option_wrapper!(zenoh::query::Query, "Dropped query");

#[pymethods]
impl Query {
fn __enter__<'a, 'py>(this: &'a Bound<'py, Self>) -> &'a Bound<'py, Self> {
this
}

#[pyo3(signature = (*_args, **_kwargs))]
fn __exit__(
&mut self,
py: Python,
_args: &Bound<PyTuple>,
_kwargs: Option<&Bound<PyDict>>,
) -> PyResult<PyObject> {
self.drop();
Ok(py.None())
}

#[getter]
fn selector(&self) -> Selector {
self.0.selector().into_owned().into()
fn selector(&self) -> PyResult<Selector> {
Ok(self.get_ref()?.selector().into_owned().into())
}

#[getter]
fn key_expr(&self) -> KeyExpr {
self.0.key_expr().clone().into_owned().into()
fn key_expr(&self) -> PyResult<KeyExpr> {
Ok(self.get_ref()?.key_expr().clone().into_owned().into())
}

#[getter]
fn parameters(&self) -> Parameters {
self.0.parameters().clone().into_owned().into()
fn parameters(&self) -> PyResult<Parameters> {
Ok(self.get_ref()?.parameters().clone().into_owned().into())
}

#[getter]
fn payload(&self) -> Option<ZBytes> {
self.0.payload().cloned().map_into()
fn payload(&self) -> PyResult<Option<ZBytes>> {
Ok(self.get_ref()?.payload().cloned().map_into())
}

#[getter]
fn encoding(&self) -> Option<Encoding> {
self.0.encoding().cloned().map_into()
fn encoding(&self) -> PyResult<Option<Encoding>> {
Ok(self.get_ref()?.encoding().cloned().map_into())
}

#[getter]
fn attachment(&self) -> Option<ZBytes> {
self.0.attachment().cloned().map_into()
fn attachment(&self) -> PyResult<Option<ZBytes>> {
Ok(self.get_ref()?.attachment().cloned().map_into())
}

// TODO timestamp
Expand All @@ -125,7 +140,7 @@ impl Query {
#[pyo3(from_py_with = "ZBytes::from_py_opt")] attachment: Option<ZBytes>,
) -> PyResult<()> {
let build = build!(
self.0.reply(key_expr, payload),
self.get_ref()?.reply(key_expr, payload),
encoding,
congestion_control,
priority,
Expand All @@ -134,14 +149,15 @@ impl Query {
);
wait(py, build)
}

#[pyo3(signature = (payload, *, encoding = None))]
fn reply_err(
&self,
py: Python,
#[pyo3(from_py_with = "ZBytes::from_py")] payload: ZBytes,
#[pyo3(from_py_with = "Encoding::from_py_opt")] encoding: Option<Encoding>,
) -> PyResult<()> {
let build = build!(self.0.reply_err(payload), encoding);
let build = build!(self.get_ref()?.reply_err(payload), encoding);
wait(py, build)
}

Expand All @@ -156,7 +172,7 @@ impl Query {
#[pyo3(from_py_with = "ZBytes::from_py_opt")] attachment: Option<ZBytes>,
) -> PyResult<()> {
let build = build!(
self.0.reply_del(key_expr),
self.get_ref()?.reply_del(key_expr),
congestion_control,
priority,
express,
Expand All @@ -165,12 +181,16 @@ impl Query {
wait(py, build)
}

fn __repr__(&self) -> String {
format!("{:?}", self.0)
fn drop(&mut self) {
Python::with_gil(|gil| gil.allow_threads(|| drop(self.0.take())));
}

fn __str__(&self) -> String {
format!("{}", self.0)
fn __repr__(&self) -> PyResult<String> {
Ok(format!("{:?}", self.get_ref()?))
}

fn __str__(&self) -> PyResult<String> {
Ok(format!("{}", self.get_ref()?))
}
}

Expand Down
3 changes: 3 additions & 0 deletions zenoh/__init__.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -509,6 +509,8 @@ class Publisher:
class Query:
"""Structs received by a Queryable."""

def __enter__(self) -> Self: ...
def __exit__(self, *_args, **_kwargs): ...
@property
def selector(self) -> Selector: ...
@property
Expand Down Expand Up @@ -552,6 +554,7 @@ class Query:
By default, queries only accept replies whose key expression intersects with the query's. Unless the query has enabled disjoint replies (you can check this through Query::accepts_replies), replying on a disjoint key expression will result in an error when resolving the reply.
"""

def drop(self): ...
def __str__(self) -> str: ...

@final
Expand Down

0 comments on commit 52deec4

Please sign in to comment.