Skip to content

Commit 2743d53

Browse files
authored
✨ Handle metadata refresh in real time mode (#109)
1 parent 7f3a88f commit 2743d53

5 files changed

Lines changed: 502 additions & 189 deletions

File tree

robot_log_visualizer/__init__.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
11
import os
22

3-
# Prefer the PySide6 backend when QtPy resolves the Qt binding.
4-
os.environ.setdefault("QT_API", "pyside2")
3+
os.environ.setdefault("QT_API", "pyqt5")

robot_log_visualizer/signal_provider/realtime_signal_provider.py

Lines changed: 206 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -10,11 +10,10 @@
1010
import numpy as np
1111

1212
from robot_log_visualizer.signal_provider.signal_provider import (
13-
ProviderType,
14-
SignalProvider,
15-
)
13+
ProviderType, SignalProvider)
1614
from robot_log_visualizer.utils.utils import PeriodicThreadState
1715

16+
ROBOT_REALTIME_KEY = "robot_realtime"
1817

1918
def are_deps_installed():
2019
try:
@@ -115,16 +114,16 @@ def __init__(self, period: float, signal_root_name: str):
115114
# Track signals to buffer
116115
self.buffered_signals = set()
117116
# Always include joints_state
118-
self.buffered_signals.add("robot_realtime::joints_state::positions")
119-
120-
# TODO: implement a logic to remove signals that are not needed anymore
117+
self.buffered_signals.add(
118+
f"{ROBOT_REALTIME_KEY}::joints_state::positions"
119+
) # TODO: implement a logic to remove signals that are not needed anymore
121120
def add_signals_to_buffer(self, signals: Union[str, Iterable[str]]):
122121
"""Add signals to the buffer set."""
123122
if isinstance(signals, str):
124123
signals = {signals}
125124
self.buffered_signals.update(signals)
126125
# Always include joints_state
127-
self.buffered_signals.add("robot_realtime::joints_state::positions")
126+
self.buffered_signals.add(f"{ROBOT_REALTIME_KEY}::joints_state::positions")
128127

129128
def __len__(self):
130129
"""
@@ -145,6 +144,9 @@ def _update_data_buffer(
145144
Any sample older than the fixed time window is removed.
146145
"""
147146

147+
if not keys:
148+
return
149+
148150
if keys[0] not in raw_data:
149151
raw_data[keys[0]] = DequeToNumpyLeaf()
150152

@@ -171,26 +173,78 @@ def _update_data_buffer(
171173
def _populate_realtime_logger_metadata(self, raw_data: dict, keys: list, value):
172174
"""
173175
Recursively populate metadata into raw_data.
174-
Here we simply store metadata (e.g. elements names) into a list.
176+
177+
- Creates only missing nested nodes.
178+
- At a leaf: initialize buffers if missing and merge elements_names
179+
(do not overwrite existing elements_names).
180+
- Returns True if the call created or extended metadata for the given path,
181+
False otherwise.
175182
"""
183+
184+
if not isinstance(keys, list) or not keys:
185+
raise ValueError(
186+
f"Invalid keys parameter: {keys}. Expected a non-empty list."
187+
)
188+
if not all(isinstance(k, str) for k in keys):
189+
raise ValueError(
190+
f"Invalid keys elements: {keys}. All elements must be strings."
191+
)
192+
193+
if not isinstance(raw_data, (dict, DequeToNumpyLeaf)):
194+
raise ValueError(
195+
f"Invalid raw_data parameter: {raw_data}. Expected a dictionary-like object."
196+
)
197+
198+
if not isinstance(value, (list, tuple, str, int, float)):
199+
raise ValueError(
200+
f"Invalid value parameter: {value}. Expected a list, tuple, or scalar."
201+
)
202+
176203
if keys[0] == "timestamps":
177-
return
204+
return False
205+
206+
# ensure node exists
178207
if keys[0] not in raw_data:
179208
raw_data[keys[0]] = DequeToNumpyLeaf()
209+
created = True
210+
else:
211+
created = False
212+
180213
if len(keys) == 1:
214+
# leaf
181215
if not value:
182-
if keys[0] in raw_data:
183-
del raw_data[keys[0]]
184-
return
185-
if "elements_names" not in raw_data[keys[0]]:
186-
raw_data[keys[0]]["elements_names"] = []
187-
# Also create empty buffers (which will later be updated in run())
188-
raw_data[keys[0]]["data"] = deque()
189-
raw_data[keys[0]]["timestamps"] = deque()
216+
# do not delete existing node on empty value; just no-op
217+
return created
218+
219+
node = raw_data[keys[0]]
220+
221+
# initialize leaf buffers if missing
222+
if "elements_names" not in node:
223+
node["elements_names"] = (
224+
list(value) if isinstance(value, (list, tuple)) else value
225+
)
226+
node["data"] = deque()
227+
node["timestamps"] = deque()
228+
return True
190229

191-
raw_data[keys[0]]["elements_names"] = value
230+
# merge element names (append only new entries)
231+
if isinstance(node["elements_names"], list) and isinstance(
232+
value, (list, tuple)
233+
):
234+
added = False
235+
for v in value:
236+
if v not in node["elements_names"]:
237+
node["elements_names"].append(v)
238+
added = True
239+
return added or created
240+
241+
# fallback: if elements_names is not a list, don't overwrite
242+
return created
192243
else:
193-
self._populate_realtime_logger_metadata(raw_data[keys[0]], keys[1:], value)
244+
# recurse into the subtree
245+
return self._populate_realtime_logger_metadata(
246+
raw_data[keys[0]], keys[1:], value
247+
)
194248

195249
def open(self, source: str) -> bool:
196250
"""
@@ -224,10 +278,12 @@ def open(self, source: str) -> bool:
224278
return False
225279

226280
self.realtime_network_init = True
227-
self.joints_name = self.rt_metadata_dict["robot_realtime::description_list"]
228-
self.robot_name = self.rt_metadata_dict["robot_realtime::yarp_robot_name"][
229-
0
281+
self.joints_name = self.rt_metadata_dict[
282+
f"{ROBOT_REALTIME_KEY}::description_list"
230283
]
284+
self.robot_name = self.rt_metadata_dict[
285+
f"{ROBOT_REALTIME_KEY}::yarp_robot_name"
286+
][0]
231287

232288
# Populate metadata into self.data recursively.
233289
for key_string, value in self.rt_metadata_dict.items():
@@ -254,6 +310,75 @@ def index(self):
254310
finally:
255311
self.index_lock.unlock()
256312

313+
def check_for_new_metadata(self) -> bool:
314+
"""
315+
Check if new metadata is available using the client's streaming data flag.
316+
This avoids expensive RPC calls.
317+
318+
Returns:
319+
bool: True if new metadata is available, False otherwise.
320+
"""
321+
client = self.vector_collections_client
322+
if client is None:
323+
return False
324+
325+
try:
326+
return client.is_new_metadata_available()
327+
except Exception as exc:
328+
print(f"Error checking for new metadata: {exc}")
329+
return False
330+
331+
def update_metadata(self):
332+
"""
333+
Refresh the metadata from the remote realtime logger.
334+
New metadata items are added to self.rt_metadata_dict and
335+
the corresponding data buffers are created in self.data.
336+
337+
Returns:
338+
dict: New metadata items added, or None if no new items.
339+
"""
340+
341+
client = self.vector_collections_client
342+
if client is None:
343+
print("Refresh metadata: realtime client unavailable.")
344+
return
345+
346+
try:
347+
updated_md = client.get_metadata().vectors
348+
except Exception as exc:
349+
print(f"Error fetching metadata: {exc}")
350+
return
351+
352+
existing_md = self.rt_metadata_dict or {}
353+
new_items = {k: v for k, v in updated_md.items() if k not in existing_md}
354+
355+
if not new_items:
356+
return
357+
358+
existing_md.update(new_items)
359+
self.rt_metadata_dict = existing_md
360+
361+
desc_key = f"{ROBOT_REALTIME_KEY}::description_list"
362+
yarp_name_key = f"{ROBOT_REALTIME_KEY}::yarp_robot_name"
363+
if desc_key in new_items:
364+
self.joints_name = existing_md[desc_key]
365+
if yarp_name_key in new_items:
366+
names = existing_md.get(yarp_name_key, [])
367+
if names:
368+
self.robot_name = names[0]
369+
370+
# Populate metadata into self.data recursively.
371+
for key_string, value in self.rt_metadata_dict.items():
372+
keys = key_string.split("::")
373+
self._populate_realtime_logger_metadata(self.data, keys, value)
374+
375+
# Remove keys that are not needed for the realtime plotting.
376+
if self.root_name in self.data:
377+
self.data[self.root_name].pop("description_list", None)
378+
self.data[self.root_name].pop("yarp_robot_name", None)
379+
380+
return new_items
381+
257382
def get_item_from_path_at_index(self, path, index, default_path=None, neighbor=0):
258383
"""
259384
Get the latest data item from the given path at the latest index.
@@ -288,49 +413,68 @@ def run(self):
288413
"""
289414
while True:
290415
start = time.time()
416+
if self.state == PeriodicThreadState.closed:
417+
return
418+
291419
if self.state == PeriodicThreadState.running:
292-
# Read the latest data from the realtime logger.
293-
vc_input = self.vector_collections_client.read_data(True).vectors
420+
new_samples_read = False
421+
422+
while True:
423+
try:
424+
packet = self.vector_collections_client.read_data(False)
425+
except Exception as exc: # noqa: BLE001 - surface runtime issues
426+
print(f"Error reading realtime data: {exc}")
427+
break
428+
429+
if packet is None:
430+
break
431+
432+
vc_input = getattr(packet, "vectors", None)
433+
if not vc_input:
434+
break
435+
436+
timestamps_key = f"{ROBOT_REALTIME_KEY}::timestamps"
437+
if timestamps_key not in vc_input or not vc_input[timestamps_key]:
438+
continue
439+
440+
recent_timestamp = vc_input[timestamps_key][0]
294441

295-
if vc_input:
296442
self.index_lock.lock()
297-
# Retrieve the most recent timestamp from the input.
298-
recent_timestamp = vc_input["robot_realtime::timestamps"][0]
299-
self._timestamps.append(recent_timestamp)
300-
# Keep the global timestamps within the fixed plot window.
301-
while self._timestamps and (
302-
recent_timestamp - self._timestamps[0]
303-
> self.realtime_fixed_plot_window
304-
):
305-
self._timestamps.popleft()
306-
307-
# Update initial and end times.
308-
if self._timestamps:
309-
self.initial_time = self._timestamps[0]
310-
self.end_time = self._timestamps[-1]
311-
312-
# For signal selected from the user that is in the received data (except timestamps),
313-
# update the appropriate buffer.
314-
for key_string, value in vc_input.items():
315-
if key_string == "robot_realtime::timestamps":
316-
continue
317-
318-
# Check if any selected signal starts with this path
319-
match = any(
320-
sel.startswith(key_string) for sel in self.buffered_signals
321-
)
322-
if not match:
323-
continue
324-
325-
keys = key_string.split("::")
326-
self._update_data_buffer(
327-
self.data, keys, value, recent_timestamp
328-
)
329-
330-
self.index_lock.unlock()
331-
332-
# Signal that new data are available.
333-
self.update_index_signal.emit()
443+
try:
444+
self._timestamps.append(recent_timestamp)
445+
446+
while self._timestamps and (
447+
recent_timestamp - self._timestamps[0]
448+
> self.realtime_fixed_plot_window
449+
):
450+
self._timestamps.popleft()
451+
452+
if self._timestamps:
453+
self.initial_time = self._timestamps[0]
454+
self.end_time = self._timestamps[-1]
455+
456+
for key_string, value in vc_input.items():
457+
if key_string == timestamps_key:
458+
continue
459+
460+
match = any(
461+
sel.startswith(key_string)
462+
for sel in self.buffered_signals
463+
)
464+
if not match:
465+
continue
466+
467+
keys = key_string.split("::")
468+
self._update_data_buffer(
469+
self.data, keys, value, recent_timestamp
470+
)
471+
finally:
472+
self.index_lock.unlock()
473+
474+
new_samples_read = True
475+
476+
if new_samples_read:
477+
self.update_index_signal.emit()
334478

335479
# Sleep until the next period.
336480
sleep_time = self.period - (time.time() - start)

0 commit comments

Comments
 (0)