diff --git a/README.md b/README.md
index 9c33a7d..6723934 100644
--- a/README.md
+++ b/README.md
@@ -55,6 +55,25 @@ Once you have installed the `robot-log-visualizer` you can run it from the termi
You can navigate the dataset thanks to the slider or by pressing `Ctrl-f` and `Ctrl-b` to move
forward and backward.
+To pre-load a dataset on startup, pass it as an argument:
+
+```bash
+robot-log-visualizer /path/to/dataset.mat
+```
+
+If you saved a snapshot of your favourite layout, you can restore it right away:
+
+```bash
+robot-log-visualizer --snapshot /path/to/view.json
+```
+
+You can also combine both parameters; in that case the dataset argument is loaded first and then
+the snapshot is applied:
+
+```bash
+robot-log-visualizer /path/to/dataset.mat --snapshot /path/to/view.json
+```
+
> [!IMPORTANT]
> `robot-log-visualizer` only supports reading `.mat` file [version 7.3 or newer](https://www.mathworks.com/help/matlab/import_export/mat-file-versions.html).
diff --git a/robot_log_visualizer/__main__.py b/robot_log_visualizer/__main__.py
index 6e7196f..fcf7aa1 100755
--- a/robot_log_visualizer/__main__.py
+++ b/robot_log_visualizer/__main__.py
@@ -4,7 +4,10 @@
# This software may be modified and distributed under the terms of the
# Released under the terms of the BSD 3-Clause License
+import argparse
+import pathlib
import sys
+from typing import Optional, Sequence
# GUI
from qtpy.QtWidgets import QApplication
@@ -14,7 +17,28 @@
from robot_log_visualizer.robot_visualizer.meshcat_provider import MeshcatProvider
-def main():
+def _parse_arguments(argv: Sequence[str]) -> argparse.Namespace:
+ parser = argparse.ArgumentParser(
+ description="Robot Log Visualizer",
+ )
+ parser.add_argument(
+ "dataset",
+ nargs="?",
+ help="Path to a MAT dataset to load on startup.",
+ )
+ parser.add_argument(
+ "-s",
+ "--snapshot",
+ help="Path to a view snapshot (.json) to restore on startup.",
+ )
+
+ return parser.parse_args(argv)
+
+
+def main(argv: Optional[Sequence[str]] = None):
+ parsed_argv = list(argv) if argv is not None else sys.argv[1:]
+ args = _parse_arguments(parsed_argv)
+
thread_periods = {
"meshcat_provider": 0.03,
"signal_provider": 0.03,
@@ -36,6 +60,16 @@ def main():
# show the main window
gui.show()
+ if args.dataset:
+ dataset_path = pathlib.Path(args.dataset).expanduser()
+ if not gui._load_mat_file(str(dataset_path), quiet=False): # noqa: SLF001
+ print(f"Failed to load dataset '{dataset_path}'.", file=sys.stderr)
+
+ if args.snapshot:
+ snapshot_path = pathlib.Path(args.snapshot).expanduser()
+ if not gui.load_view_snapshot_from_path(snapshot_path):
+ print(f"Failed to load snapshot '{snapshot_path}'.", file=sys.stderr)
+
exec_method = getattr(app, "exec", None)
if exec_method is None:
exec_method = app.exec_
diff --git a/robot_log_visualizer/plotter/pyqtgraph_viewer_canvas.py b/robot_log_visualizer/plotter/pyqtgraph_viewer_canvas.py
index c8acaac..cba5d1b 100644
--- a/robot_log_visualizer/plotter/pyqtgraph_viewer_canvas.py
+++ b/robot_log_visualizer/plotter/pyqtgraph_viewer_canvas.py
@@ -4,7 +4,7 @@
from __future__ import annotations
-from typing import Dict, Iterable, Sequence, Tuple
+from typing import Any, Dict, Iterable, List, Sequence, Tuple
import numpy as np
import pyqtgraph as pg # type: ignore
@@ -57,6 +57,7 @@ def __init__(
self._curves: Dict[str, pg.PlotDataItem] = {}
self._annotations: Dict[Point, pg.TextItem] = {}
self._markers: Dict[Point, pg.ScatterPlotItem] = {}
+ self._annotation_sources: Dict[Point, str] = {}
self._palette: Iterable = ColorPalette()
# UI set‑up
@@ -85,21 +86,23 @@ def set_signal_provider(self, signal_provider) -> None:
self._update_realtime_curves
)
- def update_plots(self, paths: Sequence[Path], legends: Sequence[Legend]) -> None:
+ def update_plots(
+ self, paths: Sequence[Path], legends: Sequence[Legend]
+ ) -> List[str]:
"""Synchronise plots with the *paths* list.
New items are added, disappeared items removed. Existing ones are
left untouched to avoid flicker.
"""
if self._signal_provider is None:
- return
+ return []
# For real-time provider, update the set of selected signals to buffer
if self._signal_provider.provider_type == ProviderType.REALTIME:
selected_keys = ["::".join(path) for path in paths]
self._signal_provider.add_signals_to_buffer(selected_keys)
- self._add_missing_curves(paths, legends)
+ missing_paths = self._add_missing_curves(paths, legends)
self._remove_obsolete_curves(paths)
# Set the X axis range based on the provider type
@@ -117,6 +120,8 @@ def update_plots(self, paths: Sequence[Path], legends: Sequence[Legend]) -> None
self._signal_provider.end_time - self._signal_provider.initial_time,
)
+ return missing_paths
+
# The following trio is wired to whoever controls the replay/stream
def pause_animation(self) -> None: # noqa: D401
"""Pause the vertical‑line animation."""
@@ -173,23 +178,46 @@ def _connect_signals(self) -> None:
def _add_missing_curves(
self, paths: Sequence[Path], legends: Sequence[Legend]
- ) -> None:
+ ) -> List[str]:
"""Plot curves that are present in *paths* but not yet on screen."""
+
+ missing: List[str] = []
for path, legend in zip(paths, legends):
key = "/".join(path)
if key in self._curves:
continue
# Drill down to the data array
- data = self._signal_provider.data
- for subkey in path[:-1]:
- data = data[subkey]
try:
- y = data["data"][:, int(path[-1])]
- except (IndexError, ValueError): # scalar variable
- y = data["data"][:]
+ data = self._signal_provider.data
+ for subkey in path[:-1]:
+ data = data[subkey]
+
+ data_array = data["data"]
+ timestamps = data["timestamps"]
+ except (KeyError, TypeError):
+ missing.append(key)
+ continue
+
+ try:
+ y = data_array[:, int(path[-1])]
+ except (
+ IndexError,
+ ValueError,
+ TypeError,
+ ): # scalar variable or invalid index
+ try:
+ y = data_array[:]
+ except Exception:
+ missing.append(key)
+ continue
+
+ try:
+ x = timestamps - self._signal_provider.initial_time
+ except Exception:
+ missing.append(key)
+ continue
- x = data["timestamps"] - self._signal_provider.initial_time
palette_color = next(self._palette)
pen = pg.mkPen(palette_color.as_hex(), width=2)
@@ -201,12 +229,21 @@ def _add_missing_curves(
symbol=None,
)
+ return missing
+
def _remove_obsolete_curves(self, paths: Sequence[Path]) -> None:
"""Delete curves that disappeared from *paths*."""
valid = {"/".join(p) for p in paths}
for key in [k for k in self._curves if k not in valid]:
self._plot.removeItem(self._curves.pop(key))
+ # Remove annotations associated to the deleted curve
+ orphan_points = [
+ pt for pt, src in self._annotation_sources.items() if src == key
+ ]
+ for point in orphan_points:
+ self._deselect(point)
+
def _update_vline(self) -> None:
"""Move the vertical line to ``current_time``."""
if self._signal_provider is None:
@@ -279,10 +316,11 @@ def _on_mouse_click(self, event) -> None: # noqa: N802
self._deselect(candidate)
else:
assert nearest_curve is not None # mypy‑friendly
- self._select(candidate, nearest_curve.opts["pen"])
+ self._select(candidate, nearest_curve)
- def _select(self, pt: Point, pen: pg.QtGui.QPen) -> None:
+ def _select(self, pt: Point, curve: pg.PlotDataItem) -> None:
"""Add label + circle marker on *pt*."""
+ pen = curve.opts["pen"]
x_span = np.diff(self._plot.viewRange()[0])[0]
y_span = np.diff(self._plot.viewRange()[1])[0]
x_prec = max(0, int(-np.log10(max(x_span, 1e-12))) + 2)
@@ -313,10 +351,15 @@ def _select(self, pt: Point, pen: pg.QtGui.QPen) -> None:
self._plot.addItem(marker)
self._markers[pt] = marker
+ curve_key = self._curve_key(curve)
+ if curve_key is not None:
+ self._annotation_sources[pt] = curve_key
+
def _deselect(self, pt: Point) -> None:
"""Remove annotation + marker on *pt*."""
self._plot.removeItem(self._annotations.pop(pt))
self._plot.removeItem(self._markers.pop(pt))
+ self._annotation_sources.pop(pt, None)
def clear_selections(self) -> None: # noqa: D401
"""Remove **all** annotations and markers."""
@@ -324,3 +367,137 @@ def clear_selections(self) -> None: # noqa: D401
self._plot.removeItem(item)
self._annotations.clear()
self._markers.clear()
+ self._annotation_sources.clear()
+
+ def clear_curves(self) -> None:
+ """Remove every plotted curve and related markers."""
+ for key in list(self._curves.keys()):
+ self._plot.removeItem(self._curves.pop(key))
+ self.clear_selections()
+
+ def capture_state(self) -> Dict[str, Any]:
+ """Return a snapshot of the current canvas configuration."""
+
+ annotations: List[Dict[str, Any]] = []
+ for pt, label in self._annotations.items():
+ source = self._annotation_sources.get(pt)
+ if source is None:
+ continue
+ annotations.append(
+ {
+ "path": source,
+ "point": [float(pt[0]), float(pt[1])],
+ "label": label.toPlainText(),
+ }
+ )
+
+ curve_meta: Dict[str, Dict[str, Any]] = {}
+ for key, curve in self._curves.items():
+ pen_info: Dict[str, Any] = {}
+ pen = curve.opts.get("pen")
+ if pen is not None:
+ qcol = pen.color()
+ pen_info["color"] = (
+ f"#{qcol.red():02x}{qcol.green():02x}{qcol.blue():02x}"
+ )
+ pen_info["width"] = pen.width()
+ curve_meta[key] = {
+ "label": curve.opts.get("name", ""),
+ "pen": pen_info,
+ }
+
+ return {
+ "curves": curve_meta,
+ "view_range": self._plot.viewRange(),
+ "legend_visible": bool(self._plot.plotItem.legend.isVisible()),
+ "annotations": annotations,
+ }
+
+ def apply_view_range(self, view_range: Sequence[Sequence[float]]) -> None:
+ """Restore the axes limits from a snapshot."""
+
+ if len(view_range) != 2:
+ return
+
+ x_range, y_range = view_range
+ if len(x_range) == 2:
+ self._plot.setXRange(float(x_range[0]), float(x_range[1]), padding=0)
+ if len(y_range) == 2:
+ self._plot.setYRange(float(y_range[0]), float(y_range[1]), padding=0)
+
+ def set_legend_visible(self, visible: bool) -> None:
+ """Toggle legend visibility."""
+
+ legend = getattr(self._plot.plotItem, "legend", None)
+ if legend is not None:
+ legend.setVisible(bool(visible))
+
+ def restore_annotations(self, annotations: Sequence[Dict[str, Any]]) -> List[str]:
+ """Re-create selection markers from saved data.
+
+ Returns:
+ List of curve identifiers that could not be restored.
+ """
+
+ missing: List[str] = []
+ for ann in annotations:
+ key = ann.get("path")
+ point = ann.get("point")
+ if key is None or point is None:
+ continue
+ curve = self._curves.get(str(key))
+ if curve is None:
+ missing.append(str(key))
+ continue
+ try:
+ pt_tuple: Point = (float(point[0]), float(point[1]))
+ except (TypeError, ValueError, IndexError):
+ missing.append(str(key))
+ continue
+ self._select(pt_tuple, curve)
+ return missing
+
+ def _curve_key(self, curve: pg.PlotDataItem) -> str | None:
+ for key, item in self._curves.items():
+ if item is curve:
+ return key
+ return None
+
+ def apply_curve_metadata(self, metadata: Dict[str, Dict[str, Any]]) -> None:
+ for key, info in metadata.items():
+ curve = self._curves.get(key)
+ if curve is None:
+ continue
+
+ label = info.get("label")
+ if label is not None:
+ label_text = str(label)
+ if hasattr(curve, "setName"):
+ curve.setName(label_text)
+ else:
+ curve.opts["name"] = label_text
+ legend = getattr(self._plot.plotItem, "legend", None)
+ if legend is not None:
+ if hasattr(legend, "itemChanged"):
+ legend.itemChanged(curve)
+ else: # compatibility fallback for older pyqtgraph releases
+ try:
+ legend.removeItem(curve)
+ except Exception:
+ pass
+ legend.addItem(curve, label_text)
+
+ pen_info = info.get("pen", {})
+ color = pen_info.get("color")
+ width = pen_info.get("width")
+ if color is not None or width is not None:
+ kwargs: Dict[str, Any] = {}
+ if color is not None:
+ kwargs["color"] = color
+ if width is not None:
+ try:
+ kwargs["width"] = float(width)
+ except (TypeError, ValueError):
+ pass
+ if kwargs:
+ curve.setPen(pg.mkPen(**kwargs))
diff --git a/robot_log_visualizer/signal_provider/signal_provider.py b/robot_log_visualizer/signal_provider/signal_provider.py
index 285fb49..d1eb544 100644
--- a/robot_log_visualizer/signal_provider/signal_provider.py
+++ b/robot_log_visualizer/signal_provider/signal_provider.py
@@ -421,6 +421,18 @@ def get_3d_trajectory_at_index(self, index):
return trajectories
+ def export_registered_3d_points(self):
+ locker = QMutexLocker(self._3d_points_path_lock)
+ return {key: list(value) for key, value in self._3d_points_path.items()}
+
+ def export_registered_3d_trajectories(self):
+ locker = QMutexLocker(self._3d_trajectories_path_lock)
+ return {key: list(value) for key, value in self._3d_trajectories_path.items()}
+
+ def export_registered_3d_arrows(self):
+ locker = QMutexLocker(self._3d_arrows_path_lock)
+ return {key: list(value) for key, value in self._3d_arrows.items()}
+
def register_3d_point(self, key, points_path):
self._3d_points_path_lock.lock()
self._3d_points_path[key] = points_path
diff --git a/robot_log_visualizer/ui/gui.py b/robot_log_visualizer/ui/gui.py
index 02ec82a..0883237 100644
--- a/robot_log_visualizer/ui/gui.py
+++ b/robot_log_visualizer/ui/gui.py
@@ -3,13 +3,15 @@
# Released under the terms of the BSD 3-Clause License
# QtPy abstraction
+from __future__ import annotations
from qtpy import QtWidgets, QtGui, QtCore
from qtpy import QtWebEngineWidgets # noqa: F401 # Ensure WebEngine is initialised
-from qtpy.QtCore import QMutex, QMutexLocker, QUrl, Qt, Slot
+from qtpy.QtCore import QMutex, QMutexLocker, QSaveFile, QUrl, Qt, Slot, QIODevice
from qtpy.QtWidgets import (
QDialog,
QDialogButtonBox,
QFileDialog,
+ QMessageBox,
QLineEdit,
QToolButton,
QTreeWidgetItem,
@@ -33,17 +35,32 @@
from robot_log_visualizer.ui.plot_item import PlotItem
from robot_log_visualizer.ui.video_item import VideoItem
from robot_log_visualizer.ui.text_logging import TextLoggingItem
+from robot_log_visualizer.ui.view_snapshot import (
+ SNAPSHOT_VERSION,
+ ViewSnapshot,
+ PlotSnapshot,
+ VisualizationSnapshot,
+ TimelineSnapshot,
+ DatasetSnapshot,
+ WindowSnapshot,
+ TreeSnapshot,
+)
from robot_log_visualizer.utils.utils import (
PeriodicThreadState,
RobotStatePath,
ColorPalette,
+ Color,
)
import sys
import os
+import json
+import base64
import pathlib
import re
+from datetime import datetime
+from typing import Any, Dict, List, Optional, Sequence
import numpy as np
@@ -222,6 +239,9 @@ def __init__(self, signal_provider_period, meshcat_provider, animation_period):
self.visualized_3d_points = set()
self.visualized_3d_trajectories = set()
self.visualized_3d_arrows = set()
+ self.visualized_3d_points_colors = {}
+ self.visualized_3d_trajectories_colors = {}
+ self.visualized_3d_arrows_colors = {}
self.visualized_3d_points_colors_palette = ColorPalette()
self.toolButton_on_click()
@@ -238,6 +258,17 @@ def __init__(self, signal_provider_period, meshcat_provider, animation_period):
self.dataset_loaded = False
+ self.loaded_mat_path = None
+ self._settings = QtCore.QSettings("ami-iit", "robot-log-visualizer")
+ self._snapshot_dir_key = "snapshot/lastDirectory"
+ self._snapshot_last_dir = pathlib.Path(
+ self._settings.value(self._snapshot_dir_key, pathlib.Path.home())
+ )
+
+ self.ui.actionSave_View.setEnabled(False)
+ self.ui.actionSave_View.triggered.connect(self.save_view_snapshot)
+ self.ui.actionLoad_View.triggered.connect(self.load_view_snapshot)
+
# connect action
self.ui.actionQuit.triggered.connect(self.close)
self.ui.actionOpen.triggered.connect(self.open_mat_file)
@@ -485,9 +516,15 @@ def variableTreeWidget_on_click(self):
"legends": legends,
}
- self.plot_items[self.ui.tabPlotWidget.currentIndex()].canvas.update_plots(
- paths, legends
- )
+ missing_curves = self.plot_items[
+ self.ui.tabPlotWidget.currentIndex()
+ ].canvas.update_plots(paths, legends)
+
+ if missing_curves:
+ self.logger.write_to_log(
+ "Some signals could not be plotted: "
+ + ", ".join(sorted(set(missing_curves)))
+ )
def find_text_log_index(self, path):
current_time = self.signal_provider.current_time
@@ -688,82 +725,958 @@ def __populate_text_logging_tree_widget(self, obj, parent) -> QTreeWidgetItem:
parent.addChild(item)
return parent
- def __load_mat_file(self, file_name):
- self.signal_provider = MatfileSignalProvider(
+ def _clear_plot_tabs(self) -> None:
+ while self.ui.tabPlotWidget.count() > 0:
+ widget = self.ui.tabPlotWidget.widget(0)
+ if isinstance(widget, PlotItem):
+ widget.canvas.quit_animation()
+ widget.deleteLater()
+ self.ui.tabPlotWidget.removeTab(0)
+ self.plot_items.clear()
+ self.plotData.clear()
+
+ def _ensure_plot_tab_count(self, count: int) -> None:
+ count = max(1, count)
+ while self.ui.tabPlotWidget.count() > count:
+ self.plotTabCloseButton_on_click(self.ui.tabPlotWidget.count() - 1)
+ while self.ui.tabPlotWidget.count() < count:
+ self.toolButton_on_click()
+
+ def _clear_visualization_items(self) -> None:
+ """Remove all registered 3D primitives from Meshcat and reset trackers."""
+
+ if self.meshcat_provider is not None:
+ for point in list(self.visualized_3d_points):
+ try:
+ self.meshcat_provider.unregister_3d_point(point)
+ except KeyError:
+ pass
+ for trajectory in list(self.visualized_3d_trajectories):
+ try:
+ self.meshcat_provider.unregister_3d_trajectory(trajectory)
+ except KeyError:
+ pass
+ for arrow in list(self.visualized_3d_arrows):
+ try:
+ self.meshcat_provider.unregister_3d_arrow(arrow)
+ except KeyError:
+ pass
+
+ self.visualized_3d_points.clear()
+ self.visualized_3d_trajectories.clear()
+ self.visualized_3d_arrows.clear()
+ self.visualized_3d_points_colors.clear()
+ self.visualized_3d_trajectories_colors.clear()
+ self.visualized_3d_arrows_colors.clear()
+ self.visualized_3d_points_colors_palette.reset()
+
+ def _prepare_for_new_dataset(self) -> None:
+ if self.signal_provider is not None:
+ try:
+ self.signal_provider.update_index_signal.disconnect(self.update_index)
+ except (TypeError, RuntimeError):
+ pass
+ self.signal_provider.state = PeriodicThreadState.closed
+ self.signal_provider.wait()
+ self.signal_provider = None
+
+ self.dataset_loaded = False
+ self.loaded_mat_path = None
+
+ self.ui.variableTreeWidget.clear()
+ self.ui.yarpTextLogTreeWidget.clear()
+ self.robot_state_path = RobotStatePath()
+
+ # Clear registered 3D items in Meshcat
+ self._clear_visualization_items()
+
+ # Remove video tabs (keep meshcat tab at index 0)
+ while self.ui.meshcatAndVideoTab.count() > 1:
+ widget = self.ui.meshcatAndVideoTab.widget(1)
+ widget.deleteLater()
+ self.ui.meshcatAndVideoTab.removeTab(1)
+ for video_item in self.video_items:
+ video_item.deleteLater()
+ self.video_items.clear()
+
+ self._clear_plot_tabs()
+ self.toolButton_on_click()
+ self.ui.tabPlotWidget.setCurrentIndex(0)
+ self.ui.tabPlotWidget.setTabsClosable(False)
+
+ self.ui.timeSlider.setEnabled(False)
+ self.ui.timeSlider.setValue(0)
+ self.ui.startButton.setEnabled(False)
+ self.ui.pauseButton.setEnabled(False)
+
+ self.realtime_connection_enabled = False
+ self.ui.actionRealtime_Connect.setEnabled(True)
+ self.ui.actionSave_View.setEnabled(False)
+
+ def _snapshot_start_directory(self) -> pathlib.Path:
+ if self._snapshot_last_dir.exists():
+ return self._snapshot_last_dir
+ return pathlib.Path.home()
+
+ def _remember_snapshot_directory(self, path: pathlib.Path) -> None:
+ self._snapshot_last_dir = path
+ self._settings.setValue(self._snapshot_dir_key, str(path))
+
+ @staticmethod
+ def _encode_qbytearray(data: QtCore.QByteArray | None) -> Optional[str]:
+ if data is None or data.isEmpty():
+ return None
+ return bytes(data.toBase64()).decode("ascii")
+
+ @staticmethod
+ def _decode_qbytearray(encoded: Optional[str]) -> Optional[QtCore.QByteArray]:
+ if not encoded:
+ return None
+ return QtCore.QByteArray.fromBase64(encoded.encode("ascii"))
+
+ def _selected_variable_paths(self) -> List[List[str]]:
+ paths: List[List[str]] = []
+ for index in self.ui.variableTreeWidget.selectedIndexes():
+ if index.column() != 0:
+ continue
+ path: List[str] = []
+ is_leaf = True
+ current = index
+ while current.data() is not None:
+ if is_leaf:
+ path.append(str(current.row()))
+ is_leaf = False
+ else:
+ path.append(str(current.data()))
+ current = current.parent()
+ path.reverse()
+ if path:
+ paths.append(path)
+ return paths
+
+ def _selected_text_log_paths(self) -> List[List[str]]:
+ paths: List[List[str]] = []
+ for index in self.ui.yarpTextLogTreeWidget.selectedIndexes():
+ if index.column() != 0:
+ continue
+ path: List[str] = []
+ current = index
+ while current.data() is not None:
+ path.append(str(current.data()))
+ current = current.parent()
+ path.reverse()
+ if path:
+ paths.append(path)
+ return paths
+
+ def _find_variable_tree_item(
+ self, path: Sequence[str]
+ ) -> Optional[QTreeWidgetItem]:
+ if not path:
+ return None
+
+ matches = self.ui.variableTreeWidget.findItems(
+ path[0], Qt.MatchFixedString | Qt.MatchExactly
+ )
+ if not matches:
+ return None
+ item = matches[0]
+
+ for part in path[1:]:
+ try:
+ child_index = int(part)
+ except (TypeError, ValueError):
+ child_index = None
+
+ if child_index is not None:
+ if 0 <= child_index < item.childCount():
+ item = item.child(child_index)
+ else:
+ return None
+ else:
+ found = None
+ for i in range(item.childCount()):
+ if item.child(i).text(0) == part:
+ found = item.child(i)
+ break
+ if found is None:
+ return None
+ item = found
+
+ return item
+
+ def _find_text_log_tree_item(
+ self, path: Sequence[str]
+ ) -> Optional[QTreeWidgetItem]:
+ if not path:
+ return None
+
+ matches = self.ui.yarpTextLogTreeWidget.findItems(
+ path[0], Qt.MatchFixedString | Qt.MatchExactly
+ )
+ if not matches:
+ return None
+ item = matches[0]
+
+ for part in path[1:]:
+ found = None
+ for i in range(item.childCount()):
+ if item.child(i).text(0) == part:
+ found = item.child(i)
+ break
+ if found is None:
+ return None
+ item = found
+
+ return item
+
+ def _collect_plot_snapshots(self) -> List[PlotSnapshot]:
+ snapshots: List[PlotSnapshot] = []
+ for index, plot_item in enumerate(self.plot_items):
+ plot_info = self.plotData.get(index, {"paths": [], "legends": []})
+ paths = [
+ [str(component) for component in path]
+ for path in plot_info.get("paths", [])
+ ]
+ legends = [
+ [str(component) for component in legend]
+ for legend in plot_info.get("legends", [])
+ ]
+ snapshots.append(
+ PlotSnapshot(
+ title=self.ui.tabPlotWidget.tabText(index) or "Plot",
+ paths=paths,
+ legends=legends,
+ canvas=plot_item.capture_canvas_state(),
+ )
+ )
+ return snapshots
+
+ def _collect_visualization_snapshot(self) -> VisualizationSnapshot:
+ visualization = VisualizationSnapshot()
+
+ if self.signal_provider is None:
+ return visualization
+
+ for key, path in self.signal_provider.export_registered_3d_points().items():
+ visualization.points.append(
+ {
+ "key": key,
+ "path": list(path),
+ "color": self.visualized_3d_points_colors.get(key),
+ }
+ )
+
+ for (
+ key,
+ path,
+ ) in self.signal_provider.export_registered_3d_trajectories().items():
+ visualization.trajectories.append(
+ {
+ "key": key,
+ "path": list(path),
+ "color": self.visualized_3d_trajectories_colors.get(key),
+ }
+ )
+
+ for key, path in self.signal_provider.export_registered_3d_arrows().items():
+ visualization.arrows.append(
+ {
+ "key": key,
+ "path": list(path),
+ "color": self.visualized_3d_arrows_colors.get(key),
+ }
+ )
+
+ visualization.robot_state = {
+ "base_position_path": list(self.robot_state_path.base_position_path),
+ "base_orientation_path": list(self.robot_state_path.base_orientation_path),
+ "joints_state_path": list(self.robot_state_path.joints_state_path),
+ }
+ if getattr(self.meshcat_provider, "base_frame", ""):
+ visualization.robot_state["base_frame"] = self.meshcat_provider.base_frame
+
+ return visualization
+
+ def _collect_timeline_snapshot(self) -> TimelineSnapshot:
+ if self.signal_provider is None:
+ return TimelineSnapshot(
+ slider_value=self.ui.timeSlider.value(),
+ slider_max=self.ui.timeSlider.maximum(),
+ )
+
+ return TimelineSnapshot(
+ index=self.signal_provider.index,
+ slider_value=self.ui.timeSlider.value(),
+ slider_max=self.ui.timeSlider.maximum(),
+ is_running=self.signal_provider.state == PeriodicThreadState.running,
+ current_time=self.signal_provider.current_time,
+ )
+
+ def _apply_base_highlights(
+ self, base_position_path: Sequence[str], base_orientation_path: Sequence[str]
+ ) -> None:
+ selected_color = QtGui.QColor(255, 0, 0, 127)
+
+ if base_position_path:
+ item = self._find_variable_tree_item(base_position_path)
+ if item is not None:
+ item.setBackground(0, QtGui.QBrush(selected_color))
+
+ if base_orientation_path:
+ item = self._find_variable_tree_item(base_orientation_path)
+ if item is not None:
+ item.setBackground(0, QtGui.QBrush(selected_color))
+
+ def _register_snapshot_point(
+ self, key: str, path: Sequence[str], color_hex: Optional[str]
+ ) -> bool:
+ if self.signal_provider is None:
+ return False
+
+ item = self._find_variable_tree_item(path)
+ if item is None:
+ return False
+
+ color_obj = (
+ Color(color_hex)
+ if color_hex
+ else next(self.visualized_3d_points_colors_palette)
+ )
+ color_hex_value = color_obj.as_hex()
+
+ item.setForeground(0, QtGui.QBrush(QtGui.QColor(color_hex_value)))
+ self.meshcat_provider.register_3d_point(
+ key, list(color_obj.as_normalized_rgb())
+ )
+ self.signal_provider.register_3d_point(key, list(path))
+
+ self.visualized_3d_points.add(key)
+ self.visualized_3d_points_colors[key] = color_hex_value
+ return True
+
+ def _register_snapshot_trajectory(
+ self, key: str, path: Sequence[str], color_hex: Optional[str]
+ ) -> bool:
+ if self.signal_provider is None:
+ return False
+
+ item = self._find_variable_tree_item(path)
+ if item is None:
+ return False
+
+ color_obj = (
+ Color(color_hex)
+ if color_hex
+ else next(self.visualized_3d_points_colors_palette)
+ )
+ color_hex_value = color_obj.as_hex()
+
+ item.setForeground(0, QtGui.QBrush(QtGui.QColor(color_hex_value)))
+ self.meshcat_provider.register_3d_trajectory(
+ key, list(color_obj.as_normalized_rgb())
+ )
+ self.signal_provider.register_3d_trajectory(key, list(path))
+
+ self.visualized_3d_trajectories.add(key)
+ self.visualized_3d_trajectories_colors[key] = color_hex_value
+ return True
+
+ def _register_snapshot_arrow(
+ self, key: str, path: Sequence[str], color_hex: Optional[str]
+ ) -> bool:
+ if self.signal_provider is None:
+ return False
+
+ item = self._find_variable_tree_item(path)
+ if item is None:
+ return False
+
+ color_obj = (
+ Color(color_hex)
+ if color_hex
+ else next(self.visualized_3d_points_colors_palette)
+ )
+ color_hex_value = color_obj.as_hex()
+
+ item.setForeground(0, QtGui.QBrush(QtGui.QColor(color_hex_value)))
+ self.meshcat_provider.register_3d_arrow(
+ key, list(color_obj.as_normalized_rgb())
+ )
+ self.signal_provider.register_3d_arrow(key, list(path))
+
+ self.visualized_3d_arrows.add(key)
+ self.visualized_3d_arrows_colors[key] = color_hex_value
+ return True
+
+ def _set_playback_running(self, running: bool) -> None:
+ if self.signal_provider is None:
+ return
+
+ if running:
+ if self.signal_provider.state != PeriodicThreadState.running:
+ self.startButton_on_click()
+ else:
+ if self.signal_provider.state == PeriodicThreadState.running:
+ self.pauseButton_on_click()
+
+ def _build_view_snapshot(self) -> ViewSnapshot:
+ dataset_provider = "offline"
+ dataset_path: Optional[str] = None
+ robot_name: Optional[str] = None
+ if self.signal_provider is not None:
+ robot_name = getattr(self.signal_provider, "robot_name", None)
+ if self.signal_provider.provider_type == ProviderType.REALTIME:
+ dataset_provider = "realtime"
+ else:
+ dataset_path = self.loaded_mat_path
+
+ window_snapshot = WindowSnapshot(
+ geometry=self._encode_qbytearray(self.saveGeometry()),
+ state=self._encode_qbytearray(self.saveState()),
+ plot_tab_index=self.ui.tabPlotWidget.currentIndex(),
+ main_tab_index=self.ui.tabWidget.currentIndex(),
+ meshcat_tab_index=self.ui.meshcatAndVideoTab.currentIndex(),
+ )
+
+ metadata = {
+ "created_at": datetime.utcnow().isoformat(timespec="seconds"),
+ "application": "robot-log-visualizer",
+ }
+
+ return ViewSnapshot(
+ metadata=metadata,
+ dataset=DatasetSnapshot(
+ path=dataset_path,
+ robot_name=robot_name,
+ provider=dataset_provider,
+ ),
+ window=window_snapshot,
+ timeline=self._collect_timeline_snapshot(),
+ plots=self._collect_plot_snapshots(),
+ tree=TreeSnapshot(
+ selected_variables=self._selected_variable_paths(),
+ selected_text_logs=self._selected_text_log_paths(),
+ ),
+ visualization=self._collect_visualization_snapshot(),
+ )
+
+ def save_view_snapshot(self) -> None:
+ if self.signal_provider is None:
+ QMessageBox.information(
+ self,
+ "Save view snapshot",
+ "No dataset is currently loaded. Load a dataset before saving a view.",
+ )
+ return
+
+ start_dir = str(self._snapshot_start_directory())
+ file_name, _ = QFileDialog.getSaveFileName(
+ self,
+ "Save view snapshot",
+ start_dir,
+ "View snapshots (*.json)",
+ )
+
+ if not file_name:
+ return
+
+ snapshot_path = pathlib.Path(file_name)
+ if snapshot_path.suffix.lower() != ".json":
+ snapshot_path = snapshot_path.with_suffix(".json")
+
+ if snapshot_path.exists():
+ answer = QMessageBox.question(
+ self,
+ "Overwrite snapshot",
+ f"The file '{snapshot_path}' already exists. Overwrite it?",
+ QMessageBox.Yes | QMessageBox.No,
+ QMessageBox.No,
+ )
+ if answer != QMessageBox.Yes:
+ return
+
+ snapshot = self._build_view_snapshot()
+
+ try:
+ payload = json.dumps(snapshot.to_dict(), indent=2, sort_keys=True)
+ except TypeError as exc: # pragma: no cover - defensive branch
+ message = f"Unable to serialise snapshot: {exc}"
+ QMessageBox.critical(self, "Save view snapshot", message)
+ self.logger.write_to_log(message)
+ return
+
+ save_file = QSaveFile(str(snapshot_path))
+ if not save_file.open(QIODevice.WriteOnly | QIODevice.Text):
+ message = f"Cannot open '{snapshot_path}' for writing."
+ QMessageBox.critical(self, "Save view snapshot", message)
+ self.logger.write_to_log(message)
+ return
+
+ save_file.write(payload.encode("utf-8"))
+ if not save_file.commit(): # pragma: no cover - defensive branch
+ message = f"Failed to write snapshot to '{snapshot_path}'."
+ QMessageBox.critical(self, "Save view snapshot", message)
+ self.logger.write_to_log(message)
+ return
+
+ self._remember_snapshot_directory(snapshot_path.parent)
+ self.logger.write_to_log(f"Snapshot saved to '{snapshot_path}'.")
+
+ def _ensure_dataset_for_snapshot(
+ self, snapshot: ViewSnapshot, *, quiet: bool = False
+ ) -> bool:
+ provider_mode = (snapshot.dataset.provider or "offline").lower()
+
+ if provider_mode == "realtime":
+ QMessageBox.warning(
+ self,
+ "Load view snapshot",
+ "Snapshots captured from real-time sessions cannot be restored yet.",
+ )
+ self.logger.write_to_log(
+ "Realtime snapshot restoration is not supported; aborting load."
+ )
+ return False
+
+ expected_path = snapshot.dataset.path
+
+ # If no dataset is currently loaded and no path is provided, ask the user.
+ if not expected_path:
+ if self.dataset_loaded:
+ return True
+ replacement, _ = QFileDialog.getOpenFileName(
+ self,
+ "Select dataset for snapshot",
+ str(self._snapshot_start_directory()),
+ "MAT files (*.mat)",
+ )
+ if not replacement:
+ self.logger.write_to_log(
+ "Snapshot restoration cancelled: dataset selection skipped."
+ )
+ return False
+ return self._load_mat_file(replacement)
+
+ expected_path_obj = pathlib.Path(expected_path).expanduser()
+ expected_exists = expected_path_obj.exists()
+
+ if not expected_exists:
+ warning = f"Dataset '{expected_path_obj}' referenced in the snapshot could not be found."
+ if not quiet:
+ QMessageBox.warning(self, "Load view snapshot", warning)
+ self.logger.write_to_log(warning)
+
+ user_choice = None
+ if not quiet:
+ msg_box = QMessageBox(self)
+ msg_box.setIcon(QMessageBox.Question)
+ msg_box.setWindowTitle("Load dataset")
+ if expected_exists:
+ msg_box.setText(
+ "The snapshot references the dataset:\n" f"{expected_path_obj}"
+ )
+ else:
+ msg_box.setText("The dataset referenced in the snapshot was not found.")
+ msg_box.setInformativeText(
+ "Yes: load the referenced dataset.\n"
+ "No: choose a different dataset.\n"
+ "Cancel: keep the current dataset."
+ )
+ msg_box.setStandardButtons(
+ QMessageBox.Yes | QMessageBox.No | QMessageBox.Cancel
+ )
+ msg_box.setDefaultButton(QMessageBox.Yes)
+ user_choice = msg_box.exec_()
+ else:
+ user_choice = QMessageBox.Yes if expected_exists else QMessageBox.No
+
+ if user_choice == QMessageBox.Cancel:
+ if self.dataset_loaded:
+ self.logger.write_to_log(
+ "Snapshot dataset load skipped by user; keeping current dataset."
+ )
+ return True
+ self.logger.write_to_log(
+ "Snapshot restoration cancelled: no dataset loaded."
+ )
+ return False
+
+ if user_choice == QMessageBox.Yes and expected_exists:
+ if self.dataset_loaded and self.loaded_mat_path == str(expected_path_obj):
+ return True
+ if self._load_mat_file(str(expected_path_obj)):
+ return True
+ self.logger.write_to_log(
+ f"Snapshot restoration cancelled: failed to load '{expected_path_obj}'."
+ )
+ return False
+
+ # Either user selected "No" or the referenced dataset is missing -> prompt for replacement.
+ replacement_dir = (
+ expected_path_obj.parent
+ if expected_path_obj.parent.exists()
+ else self._snapshot_start_directory()
+ )
+ replacement, _ = QFileDialog.getOpenFileName(
+ self,
+ "Select dataset",
+ str(replacement_dir),
+ "MAT files (*.mat)",
+ )
+ if not replacement:
+ self.logger.write_to_log(
+ "Snapshot restoration cancelled: replacement dataset not selected."
+ )
+ return False
+
+ if self._load_mat_file(replacement):
+ return True
+
+ self.logger.write_to_log(
+ f"Snapshot restoration cancelled: failed to load '{replacement}'."
+ )
+ return False
+
+ def _apply_view_snapshot(self, snapshot: ViewSnapshot) -> None:
+ window_snapshot = snapshot.window
+ geometry = self._decode_qbytearray(window_snapshot.geometry)
+ if geometry is not None:
+ self.restoreGeometry(geometry)
+ state = self._decode_qbytearray(window_snapshot.state)
+ if state is not None:
+ self.restoreState(state)
+
+ # Ensure the requested number of plot tabs and clear existing data.
+ plot_snapshots = snapshot.plots or []
+ self._ensure_plot_tab_count(len(plot_snapshots))
+ self.plotData.clear()
+ for plot_item in self.plot_items:
+ plot_item.clear_canvas()
+
+ missing_curves: set[str] = set()
+ missing_annotations: set[str] = set()
+
+ for index, plot_snapshot in enumerate(plot_snapshots):
+ plot_item = self.plot_items[index]
+
+ paths = [
+ [str(component) for component in path] for path in plot_snapshot.paths
+ ]
+ legends = [
+ [str(component) for component in legend]
+ for legend in plot_snapshot.legends
+ ]
+
+ self.plotData[index] = {"paths": paths, "legends": legends}
+
+ missing = plot_item.canvas.update_plots(paths, legends)
+ if missing:
+ missing_curves.update(missing)
+
+ missing_ann = plot_item.apply_canvas_state(plot_snapshot.canvas)
+ if missing_ann:
+ missing_annotations.update(missing_ann)
+
+ self.ui.tabPlotWidget.setTabText(index, plot_snapshot.title)
+
+ # Restore selections in the variable tree (plots).
+ missing_variable_paths: List[str] = []
+ self.ui.variableTreeWidget.clearSelection()
+ for path in snapshot.tree.selected_variables:
+ item = self._find_variable_tree_item(path)
+ if item is None:
+ missing_variable_paths.append("/".join(map(str, path)))
+ continue
+ item.setSelected(True)
+
+ # Restore Meshcat visualisations if a dataset is loaded.
+ missing_points: List[str] = []
+ missing_trajectories: List[str] = []
+ missing_arrows: List[str] = []
+
+ if self.signal_provider is not None:
+ self._clear_visualization_items()
+
+ for entry in snapshot.visualization.points:
+ key = entry.get("key")
+ path = entry.get("path", [])
+ color = entry.get("color")
+ if not key or not path:
+ continue
+ if not self._register_snapshot_point(key, path, color):
+ missing_points.append(str(key))
+
+ for entry in snapshot.visualization.trajectories:
+ key = entry.get("key")
+ path = entry.get("path", [])
+ color = entry.get("color")
+ if not key or not path:
+ continue
+ if not self._register_snapshot_trajectory(key, path, color):
+ missing_trajectories.append(str(key))
+
+ for entry in snapshot.visualization.arrows:
+ key = entry.get("key")
+ path = entry.get("path", [])
+ color = entry.get("color")
+ if not key or not path:
+ continue
+ if not self._register_snapshot_arrow(key, path, color):
+ missing_arrows.append(str(key))
+
+ robot_state_info = snapshot.visualization.robot_state or {}
+ robot_state_path = RobotStatePath()
+ robot_state_path.base_position_path = [
+ str(component)
+ for component in robot_state_info.get("base_position_path", [])
+ ]
+ robot_state_path.base_orientation_path = [
+ str(component)
+ for component in robot_state_info.get("base_orientation_path", [])
+ ]
+ robot_state_path.joints_state_path = [
+ str(component)
+ for component in robot_state_info.get("joints_state_path", [])
+ ]
+ self.robot_state_path = robot_state_path
+
+ base_frame = robot_state_info.get("base_frame")
+ if base_frame:
+ self.meshcat_provider.base_frame = str(base_frame)
+
+ self._apply_base_highlights(
+ self.robot_state_path.base_position_path,
+ self.robot_state_path.base_orientation_path,
+ )
+
+ # Restore playback state only; keep timeline position on current dataset defaults.
+ timeline = snapshot.timeline
+ if self.signal_provider is not None:
+ self._set_playback_running(timeline.is_running)
+
+ # Restore tab selections (clamp indices to available ranges).
+ if self.ui.tabWidget.count() > 0:
+ self.ui.tabWidget.setCurrentIndex(
+ max(
+ 0,
+ min(window_snapshot.main_tab_index, self.ui.tabWidget.count() - 1),
+ )
+ )
+ if self.ui.tabPlotWidget.count() > 0:
+ self.ui.tabPlotWidget.setCurrentIndex(
+ max(
+ 0,
+ min(
+ window_snapshot.plot_tab_index,
+ self.ui.tabPlotWidget.count() - 1,
+ ),
+ )
+ )
+ if self.ui.meshcatAndVideoTab.count() > 0:
+ self.ui.meshcatAndVideoTab.setCurrentIndex(
+ max(
+ 0,
+ min(
+ window_snapshot.meshcat_tab_index,
+ self.ui.meshcatAndVideoTab.count() - 1,
+ ),
+ )
+ )
+
+ # Report missing artefacts, if any.
+ if missing_curves:
+ self.logger.write_to_log(
+ "Missing signals while restoring plots: "
+ + ", ".join(sorted(missing_curves))
+ )
+ if missing_annotations:
+ self.logger.write_to_log(
+ "Annotations skipped because their signals were unavailable: "
+ + ", ".join(sorted(missing_annotations))
+ )
+ if missing_variable_paths:
+ self.logger.write_to_log(
+ "Variables not found for selection: "
+ + ", ".join(sorted(missing_variable_paths))
+ )
+ if missing_points:
+ self.logger.write_to_log(
+ "3D points not restored: " + ", ".join(sorted(missing_points))
+ )
+ if missing_trajectories:
+ self.logger.write_to_log(
+ "3D trajectories not restored: "
+ + ", ".join(sorted(missing_trajectories))
+ )
+ if missing_arrows:
+ self.logger.write_to_log(
+ "3D arrows not restored: " + ", ".join(sorted(missing_arrows))
+ )
+
+ def load_view_snapshot_from_path(
+ self, snapshot_path: pathlib.Path | str, *, quiet: bool = False
+ ) -> bool:
+ path = pathlib.Path(snapshot_path).expanduser()
+
+ try:
+ with path.open("r", encoding="utf-8") as handle:
+ raw_snapshot = json.load(handle)
+ except FileNotFoundError:
+ message = f"Snapshot file '{path}' does not exist."
+ if not quiet:
+ QMessageBox.warning(self, "Load view snapshot", message)
+ self.logger.write_to_log(message)
+ return False
+ except json.JSONDecodeError as exc:
+ message = f"Snapshot file '{path}' is not valid JSON: {exc}."
+ if not quiet:
+ QMessageBox.critical(self, "Load view snapshot", message)
+ self.logger.write_to_log(message)
+ return False
+ except OSError as exc: # pragma: no cover - filesystem errors
+ message = f"Unable to read snapshot '{path}': {exc}."
+ if not quiet:
+ QMessageBox.critical(self, "Load view snapshot", message)
+ self.logger.write_to_log(message)
+ return False
+
+ try:
+ snapshot = ViewSnapshot.from_dict(raw_snapshot)
+ except Exception as exc: # pragma: no cover - defensive branch
+ message = f"Snapshot file '{path}' is not compatible: {exc}."
+ if not quiet:
+ QMessageBox.critical(self, "Load view snapshot", message)
+ self.logger.write_to_log(message)
+ return False
+
+ self._remember_snapshot_directory(path.parent)
+
+ if snapshot.version > SNAPSHOT_VERSION:
+ warning = (
+ f"Snapshot was created with a newer version ({snapshot.version}). "
+ "Attempting to load with current version."
+ )
+ if not quiet:
+ QMessageBox.information(self, "Load view snapshot", warning)
+ self.logger.write_to_log(warning)
+
+ if not self._ensure_dataset_for_snapshot(snapshot, quiet=quiet):
+ return False
+
+ self._apply_view_snapshot(snapshot)
+ self.ui.actionSave_View.setEnabled(self.dataset_loaded)
+ self.logger.write_to_log(f"Snapshot '{path}' loaded.")
+ return True
+
+ def load_view_snapshot(self) -> None:
+ start_dir = str(self._snapshot_start_directory())
+ file_name, _ = QFileDialog.getOpenFileName(
+ self,
+ "Load view snapshot",
+ start_dir,
+ "View snapshots (*.json)",
+ )
+
+ if not file_name:
+ return
+
+ self.load_view_snapshot_from_path(file_name)
+
+ def _load_mat_file(self, file_name: str, *, quiet: bool = False) -> bool:
+ file_path = pathlib.Path(file_name).expanduser()
+
+ if not file_path.exists():
+ message = f"File '{file_path}' not found."
+ if not quiet:
+ self.logger.write_to_log(message)
+ QMessageBox.warning(self, "Load dataset", message)
+ return False
+
+ self._prepare_for_new_dataset()
+
+ provider = MatfileSignalProvider(
self.signal_provider_period, "robot_logger_device"
)
- self.signal_provider.open(file_name)
- self.signal_size = len(self.signal_provider)
- self.signal_provider.start()
- self.signal_provider.register_update_index(self.update_index)
+ try:
+ provider.open(str(file_path))
+ except Exception as exc: # pragma: no cover - defensive
+ message = f"Unable to read '{file_path}': {exc}"
+ if not quiet:
+ self.logger.write_to_log(message)
+ QMessageBox.critical(self, "Load dataset", message)
+ return False
- # add signal provider to the plot items
- self.plot_items[-1].set_signal_provider(self.signal_provider)
+ self.signal_provider = provider
+ self.signal_size = len(provider)
+ provider.start()
+
+ provider.register_update_index(self.update_index)
+
+ for plot_item in self.plot_items:
+ plot_item.set_signal_provider(provider)
# load the model and load the provider
- self.meshcat_provider.set_signal_provider(self.signal_provider)
+ self.meshcat_provider.set_signal_provider(provider)
if not self.meshcat_provider.load_model(
- self.signal_provider.joints_name, self.signal_provider.robot_name
+ provider.joints_name, provider.robot_name
):
- # if not loaded we print an error but we continue
msg = "Unable to load the model: "
if self.meshcat_provider.model_path:
- msg = msg + self.meshcat_provider.model_path
+ msg += self.meshcat_provider.model_path
else:
- msg = msg + self.signal_provider.robot_name
-
+ msg += provider.robot_name
self.logger.write_to_log(msg)
self.meshcat_provider.start()
# populate tree
- root = list(self.signal_provider.data.keys())[0]
+ root = list(provider.data.keys())[0]
root_item = QTreeWidgetItem([root])
root_item.setFlags(root_item.flags() & ~Qt.ItemIsSelectable)
- items = self.__populate_variable_tree_widget(
- self.signal_provider.data[root], root_item
- )
+ items = self.__populate_variable_tree_widget(provider.data[root], root_item)
self.ui.variableTreeWidget.insertTopLevelItems(0, [items])
# populate text logging tree
- if self.signal_provider.text_logging_data:
- root = list(self.signal_provider.text_logging_data.keys())[0]
+ if provider.text_logging_data:
+ root = list(provider.text_logging_data.keys())[0]
root_item = QTreeWidgetItem([root])
root_item.setFlags(root_item.flags() & ~Qt.ItemIsSelectable)
items = self.__populate_text_logging_tree_widget(
- self.signal_provider.text_logging_data[root], root_item
+ provider.text_logging_data[root], root_item
)
self.ui.yarpTextLogTreeWidget.insertTopLevelItems(0, [items])
# spawn the console
- self.pyconsole.push_local_ns("data", self.signal_provider.data)
+ self.pyconsole.push_local_ns("data", provider.data)
self.ui.timeSlider.setMaximum(self.signal_size)
+ self.ui.timeSlider.setValue(0)
self.ui.startButton.setEnabled(True)
+ self.ui.pauseButton.setEnabled(False)
self.ui.timeSlider.setEnabled(True)
- # get all the video associated to the datase
- filename_without_path = pathlib.Path(file_name).name
- (prefix, sep, suffix) = filename_without_path.rpartition(".")
+ # get all the videos associated to the dataset
+ filename_without_path = file_path.name
+ prefix, _, _ = filename_without_path.rpartition(".")
video_filenames = [
- str(pathlib.Path(file_name).parent.absolute() / pathlib.Path(f))
- for f in os.listdir(pathlib.Path(file_name).parent.absolute())
+ file_path.parent / pathlib.Path(f)
+ for f in os.listdir(file_path.parent)
if re.search(prefix + r"_[a-zA-Z0-9_]*\.mp4$", f)
]
- # for every video we create a video item and we append it to the tab
for video_filename in video_filenames:
- video_prefix, _, _ = pathlib.Path(video_filename).name.rpartition(".")
+ video_prefix, _, _ = video_filename.name.rpartition(".")
video_label = str(video_prefix).replace(prefix + "_", "")
- self.video_items.append(VideoItem(video_filename=video_filename))
+ video_item = VideoItem(video_filename=str(video_filename))
+ self.video_items.append(video_item)
self.ui.meshcatAndVideoTab.addTab(
- self.video_items[-1], get_icon("videocam-outline.svg"), video_label
+ video_item, get_icon("videocam-outline.svg"), video_label
)
- self.logger.write_to_log("Video '" + video_filename + "' opened.")
+ self.logger.write_to_log(f"Video '{video_filename}' opened.")
- # pause all the videos
for video_item in self.video_items:
if video_item.media_loaded:
video_item.media_player.pause()
@@ -772,19 +1685,19 @@ def __load_mat_file(self, file_name):
self.ui.actionRealtime_Connect.setEnabled(False)
self.dataset_loaded = True
+ self.loaded_mat_path = str(file_path)
+ self.ui.actionSave_View.setEnabled(True)
- # write something in the log
- self.logger.write_to_log("File '" + file_name + "' opened.")
- self.logger.write_to_log(
- "Robot name: '" + self.signal_provider.robot_name + "'."
- )
+ self.logger.write_to_log(f"File '{file_path}' opened.")
+ self.logger.write_to_log(f"Robot name: '{provider.robot_name}'.")
+ return True
def open_mat_file(self):
file_name, _ = QFileDialog.getOpenFileName(
self, "Open a mat file", ".", filter="*.mat"
)
if file_name:
- self.__load_mat_file(file_name)
+ self._load_mat_file(file_name)
return True
return False
@@ -900,7 +1813,7 @@ def dropEvent(self, event):
url = event.mimeData().urls()[0].toLocalFile()
ext = os.path.splitext(url)[-1].lower()
if ext == ".mat":
- self.__load_mat_file(url)
+ self._load_mat_file(url)
event.accept()
else:
event.ignore()
@@ -1028,18 +1941,21 @@ def variableTreeWidget_on_right_click(self, item_position):
)
self.signal_provider.register_3d_point(item_key, item_path)
self.visualized_3d_points.add(item_key)
+ self.visualized_3d_points_colors[item_key] = color.as_hex()
elif action.text() == add_3d_trajectory_str:
self.meshcat_provider.register_3d_trajectory(
item_key, list(color.as_normalized_rgb())
)
self.signal_provider.register_3d_trajectory(item_key, item_path)
self.visualized_3d_trajectories.add(item_key)
+ self.visualized_3d_trajectories_colors[item_key] = color.as_hex()
elif action.text() == add_3d_arrow_str:
self.meshcat_provider.register_3d_arrow(
item_key, list(color.as_normalized_rgb())
)
self.signal_provider.register_3d_arrow(item_key, item_path)
self.visualized_3d_arrows.add(item_key)
+ self.visualized_3d_arrows_colors[item_key] = color.as_hex()
else:
raise ValueError("Unknown action")
@@ -1047,18 +1963,21 @@ def variableTreeWidget_on_right_click(self, item_position):
self.meshcat_provider.unregister_3d_point(item_key)
self.signal_provider.unregister_3d_point(item_key)
self.visualized_3d_points.remove(item_key)
+ self.visualized_3d_points_colors.pop(item_key, None)
item.setForeground(0, QtGui.QBrush(QtGui.QColor(0, 0, 0)))
if action.text() == remove_3d_trajectory_str:
self.meshcat_provider.unregister_3d_trajectory(item_key)
self.signal_provider.unregister_3d_trajectory(item_key)
self.visualized_3d_trajectories.remove(item_key)
+ self.visualized_3d_trajectories_colors.pop(item_key, None)
item.setForeground(0, QtGui.QBrush(QtGui.QColor(0, 0, 0)))
if action.text() == remove_3d_arrow_str:
self.meshcat_provider.unregister_3d_arrow(item_key)
self.signal_provider.unregister_3d_arrow(item_key)
self.visualized_3d_arrows.remove(item_key)
+ self.visualized_3d_arrows_colors.pop(item_key, None)
item.setForeground(0, QtGui.QBrush(QtGui.QColor(0, 0, 0)))
if (
diff --git a/robot_log_visualizer/ui/misc/visualizer.ui b/robot_log_visualizer/ui/misc/visualizer.ui
index 32e531f..097f1c1 100644
--- a/robot_log_visualizer/ui/misc/visualizer.ui
+++ b/robot_log_visualizer/ui/misc/visualizer.ui
@@ -503,6 +503,8 @@
&File
+
+
@@ -565,6 +567,22 @@
Ctrl+R
+
+
+ Save View…
+
+
+ Ctrl+Shift+S
+
+
+
+
+ Load View…
+
+
+ Ctrl+Shift+O
+
+
diff --git a/robot_log_visualizer/ui/plot_item.py b/robot_log_visualizer/ui/plot_item.py
index c0889c0..5e2baaf 100644
--- a/robot_log_visualizer/ui/plot_item.py
+++ b/robot_log_visualizer/ui/plot_item.py
@@ -2,6 +2,8 @@
# This software may be modified and distributed under the terms of the
# Released under the terms of the BSD 3-Clause License
+from typing import Any, Dict, List
+
from qtpy.QtWidgets import QFrame
from robot_log_visualizer.plotter.pyqtgraph_viewer_canvas import PyQtGraphViewerCanvas
@@ -17,3 +19,18 @@ def __init__(self, period):
def set_signal_provider(self, signal_provider):
self.canvas.set_signal_provider(signal_provider)
+
+ def clear_canvas(self) -> None:
+ self.canvas.clear_curves()
+
+ def capture_canvas_state(self) -> Dict[str, Any]:
+ return self.canvas.capture_state()
+
+ def apply_canvas_state(self, state: Dict[str, Any]) -> List[str]:
+ if not state:
+ return []
+
+ self.canvas.apply_view_range(state.get("view_range", []))
+ self.canvas.set_legend_visible(state.get("legend_visible", True))
+ self.canvas.apply_curve_metadata(state.get("curves", {}))
+ return self.canvas.restore_annotations(state.get("annotations", []))
diff --git a/robot_log_visualizer/ui/video_item.py b/robot_log_visualizer/ui/video_item.py
index 3df7fba..177072a 100644
--- a/robot_log_visualizer/ui/video_item.py
+++ b/robot_log_visualizer/ui/video_item.py
@@ -4,6 +4,7 @@
from qtpy.QtCore import QUrl
from qtpy.QtMultimedia import QMediaPlayer
+
try: # Qt 6
from qtpy.QtMultimedia import QAudioOutput # type: ignore
except ImportError: # Qt 5 fallback
diff --git a/robot_log_visualizer/ui/view_snapshot.py b/robot_log_visualizer/ui/view_snapshot.py
new file mode 100644
index 0000000..7db6ca4
--- /dev/null
+++ b/robot_log_visualizer/ui/view_snapshot.py
@@ -0,0 +1,210 @@
+# Copyright (C) 2025 Istituto Italiano di Tecnologia (IIT).
+# This software may be modified and distributed under the terms of the
+# BSD 3-Clause License.
+
+"""Data structures for serialising view snapshots to/from JSON."""
+
+from __future__ import annotations
+
+from dataclasses import dataclass, field
+from typing import Any, Dict, List, Optional
+
+
+SNAPSHOT_VERSION = 1
+
+
+def _copy_path_list(value: Any) -> List[str]:
+ return [str(v) for v in (value or [])]
+
+
+@dataclass
+class DatasetSnapshot:
+ path: Optional[str] = None
+ robot_name: Optional[str] = None
+ provider: str = "offline"
+
+ def to_dict(self) -> Dict[str, Any]:
+ return {
+ "path": self.path,
+ "robot_name": self.robot_name,
+ "provider": self.provider,
+ }
+
+ @classmethod
+ def from_dict(cls, raw: Dict[str, Any]) -> "DatasetSnapshot":
+ return cls(
+ path=raw.get("path"),
+ robot_name=raw.get("robot_name"),
+ provider=raw.get("provider", "offline"),
+ )
+
+
+@dataclass
+class WindowSnapshot:
+ geometry: Optional[str] = None
+ state: Optional[str] = None
+ plot_tab_index: int = 0
+ main_tab_index: int = 0
+ meshcat_tab_index: int = 0
+
+ def to_dict(self) -> Dict[str, Any]:
+ return {
+ "geometry": self.geometry,
+ "state": self.state,
+ "plot_tab_index": self.plot_tab_index,
+ "main_tab_index": self.main_tab_index,
+ "meshcat_tab_index": self.meshcat_tab_index,
+ }
+
+ @classmethod
+ def from_dict(cls, raw: Dict[str, Any]) -> "WindowSnapshot":
+ return cls(
+ geometry=raw.get("geometry"),
+ state=raw.get("state"),
+ plot_tab_index=int(raw.get("plot_tab_index", 0)),
+ main_tab_index=int(raw.get("main_tab_index", 0)),
+ meshcat_tab_index=int(raw.get("meshcat_tab_index", 0)),
+ )
+
+
+@dataclass
+class TimelineSnapshot:
+ index: int = 0
+ slider_value: int = 0
+ slider_max: int = 0
+ is_running: bool = False
+ current_time: float = 0.0
+
+ def to_dict(self) -> Dict[str, Any]:
+ return {
+ "index": self.index,
+ "slider_value": self.slider_value,
+ "slider_max": self.slider_max,
+ "is_running": self.is_running,
+ "current_time": self.current_time,
+ }
+
+ @classmethod
+ def from_dict(cls, raw: Dict[str, Any]) -> "TimelineSnapshot":
+ return cls(
+ index=int(raw.get("index", 0)),
+ slider_value=int(raw.get("slider_value", 0)),
+ slider_max=int(raw.get("slider_max", 0)),
+ is_running=bool(raw.get("is_running", False)),
+ current_time=float(raw.get("current_time", 0.0)),
+ )
+
+
+@dataclass
+class PlotSnapshot:
+ title: str = "Plot"
+ paths: List[List[str]] = field(default_factory=list)
+ legends: List[List[str]] = field(default_factory=list)
+ canvas: Dict[str, Any] = field(default_factory=dict)
+
+ def to_dict(self) -> Dict[str, Any]:
+ return {
+ "title": self.title,
+ "paths": self.paths,
+ "legends": self.legends,
+ "canvas": self.canvas,
+ }
+
+ @classmethod
+ def from_dict(cls, raw: Dict[str, Any]) -> "PlotSnapshot":
+ return cls(
+ title=raw.get("title", "Plot"),
+ paths=[_copy_path_list(p) for p in raw.get("paths", [])],
+ legends=[_copy_path_list(l) for l in raw.get("legends", [])],
+ canvas=dict(raw.get("canvas", {})),
+ )
+
+
+@dataclass
+class TreeSnapshot:
+ selected_variables: List[List[str]] = field(default_factory=list)
+ selected_text_logs: List[List[str]] = field(default_factory=list)
+
+ def to_dict(self) -> Dict[str, Any]:
+ return {
+ "selected_variables": self.selected_variables,
+ "selected_text_logs": self.selected_text_logs,
+ }
+
+ @classmethod
+ def from_dict(cls, raw: Dict[str, Any]) -> "TreeSnapshot":
+ return cls(
+ selected_variables=[
+ _copy_path_list(path) for path in raw.get("selected_variables", [])
+ ],
+ selected_text_logs=[
+ _copy_path_list(path) for path in raw.get("selected_text_logs", [])
+ ],
+ )
+
+
+@dataclass
+class VisualizationSnapshot:
+ points: List[Dict[str, Any]] = field(default_factory=list)
+ trajectories: List[Dict[str, Any]] = field(default_factory=list)
+ arrows: List[Dict[str, Any]] = field(default_factory=list)
+ robot_state: Dict[str, Any] = field(default_factory=dict)
+
+ def to_dict(self) -> Dict[str, Any]:
+ return {
+ "points": self.points,
+ "trajectories": self.trajectories,
+ "arrows": self.arrows,
+ "robot_state": self.robot_state,
+ }
+
+ @classmethod
+ def from_dict(cls, raw: Dict[str, Any]) -> "VisualizationSnapshot":
+ return cls(
+ points=[dict(entry) for entry in raw.get("points", [])],
+ trajectories=[dict(entry) for entry in raw.get("trajectories", [])],
+ arrows=[dict(entry) for entry in raw.get("arrows", [])],
+ robot_state=dict(raw.get("robot_state", {})),
+ )
+
+
+@dataclass
+class ViewSnapshot:
+ """Container for all serialised UI state."""
+
+ version: int = SNAPSHOT_VERSION
+ metadata: Dict[str, Any] = field(default_factory=dict)
+ dataset: DatasetSnapshot = field(default_factory=DatasetSnapshot)
+ window: WindowSnapshot = field(default_factory=WindowSnapshot)
+ timeline: TimelineSnapshot = field(default_factory=TimelineSnapshot)
+ plots: List[PlotSnapshot] = field(default_factory=list)
+ tree: TreeSnapshot = field(default_factory=TreeSnapshot)
+ visualization: VisualizationSnapshot = field(default_factory=VisualizationSnapshot)
+
+ def to_dict(self) -> Dict[str, Any]:
+ return {
+ "version": self.version,
+ "metadata": self.metadata,
+ "dataset": self.dataset.to_dict(),
+ "window": self.window.to_dict(),
+ "timeline": self.timeline.to_dict(),
+ "plots": [plot.to_dict() for plot in self.plots],
+ "tree": self.tree.to_dict(),
+ "visualization": self.visualization.to_dict(),
+ }
+
+ @classmethod
+ def from_dict(cls, raw: Dict[str, Any]) -> "ViewSnapshot":
+ version = int(raw.get("version", SNAPSHOT_VERSION))
+ return cls(
+ version=version,
+ metadata=dict(raw.get("metadata", {})),
+ dataset=DatasetSnapshot.from_dict(dict(raw.get("dataset", {}))),
+ window=WindowSnapshot.from_dict(dict(raw.get("window", {}))),
+ timeline=TimelineSnapshot.from_dict(dict(raw.get("timeline", {}))),
+ plots=[PlotSnapshot.from_dict(entry) for entry in raw.get("plots", [])],
+ tree=TreeSnapshot.from_dict(dict(raw.get("tree", {}))),
+ visualization=VisualizationSnapshot.from_dict(
+ dict(raw.get("visualization", {}))
+ ),
+ )
diff --git a/robot_log_visualizer/utils/utils.py b/robot_log_visualizer/utils/utils.py
index 0c465e8..215b701 100644
--- a/robot_log_visualizer/utils/utils.py
+++ b/robot_log_visualizer/utils/utils.py
@@ -41,8 +41,7 @@ def hex_to_rgb(hex_value: str) -> tuple[int, int, int]:
hex_value = hex_value.lstrip("#")
hlen = len(hex_value)
return tuple(
- int(hex_value[i : i + hlen // 3], 16)
- for i in range(0, hlen, hlen // 3)
+ int(hex_value[i : i + hlen // 3], 16) for i in range(0, hlen, hlen // 3)
)
@staticmethod