From 8ad2881fbc35d7f3c0a787de86990ff1d4bcf59c Mon Sep 17 00:00:00 2001 From: Giulio Romualdi Date: Wed, 8 Oct 2025 21:59:24 +0200 Subject: [PATCH] Add view saving and loading functionality, enhance plot item management, and implement view snapshot serialization - Updated visualizer UI to include actions for saving and loading views. - Enhanced PlotItem class with methods to clear the canvas, capture, and apply canvas state. - Introduced view_snapshot.py to define data structures for serializing UI state to/from JSON. --- README.md | 19 + robot_log_visualizer/__main__.py | 36 +- .../plotter/pyqtgraph_viewer_canvas.py | 205 +++- .../signal_provider/signal_provider.py | 12 + robot_log_visualizer/ui/gui.py | 1007 ++++++++++++++++- robot_log_visualizer/ui/misc/visualizer.ui | 18 + robot_log_visualizer/ui/plot_item.py | 17 + robot_log_visualizer/ui/video_item.py | 1 + robot_log_visualizer/ui/view_snapshot.py | 210 ++++ robot_log_visualizer/utils/utils.py | 3 +- 10 files changed, 1467 insertions(+), 61 deletions(-) create mode 100644 robot_log_visualizer/ui/view_snapshot.py diff --git a/README.md b/README.md index 9c33a7d..6723934 100644 --- a/README.md +++ b/README.md @@ -55,6 +55,25 @@ Once you have installed the `robot-log-visualizer` you can run it from the termi You can navigate the dataset thanks to the slider or by pressing `Ctrl-f` and `Ctrl-b` to move forward and backward. +To pre-load a dataset on startup, pass it as an argument: + +```bash +robot-log-visualizer /path/to/dataset.mat +``` + +If you saved a snapshot of your favourite layout, you can restore it right away: + +```bash +robot-log-visualizer --snapshot /path/to/view.json +``` + +You can also combine both parameters; in that case the dataset argument is loaded first and then +the snapshot is applied: + +```bash +robot-log-visualizer /path/to/dataset.mat --snapshot /path/to/view.json +``` + > [!IMPORTANT] > `robot-log-visualizer` only supports reading `.mat` file [version 7.3 or newer](https://www.mathworks.com/help/matlab/import_export/mat-file-versions.html). diff --git a/robot_log_visualizer/__main__.py b/robot_log_visualizer/__main__.py index 6e7196f..fcf7aa1 100755 --- a/robot_log_visualizer/__main__.py +++ b/robot_log_visualizer/__main__.py @@ -4,7 +4,10 @@ # This software may be modified and distributed under the terms of the # Released under the terms of the BSD 3-Clause License +import argparse +import pathlib import sys +from typing import Optional, Sequence # GUI from qtpy.QtWidgets import QApplication @@ -14,7 +17,28 @@ from robot_log_visualizer.robot_visualizer.meshcat_provider import MeshcatProvider -def main(): +def _parse_arguments(argv: Sequence[str]) -> argparse.Namespace: + parser = argparse.ArgumentParser( + description="Robot Log Visualizer", + ) + parser.add_argument( + "dataset", + nargs="?", + help="Path to a MAT dataset to load on startup.", + ) + parser.add_argument( + "-s", + "--snapshot", + help="Path to a view snapshot (.json) to restore on startup.", + ) + + return parser.parse_args(argv) + + +def main(argv: Optional[Sequence[str]] = None): + parsed_argv = list(argv) if argv is not None else sys.argv[1:] + args = _parse_arguments(parsed_argv) + thread_periods = { "meshcat_provider": 0.03, "signal_provider": 0.03, @@ -36,6 +60,16 @@ def main(): # show the main window gui.show() + if args.dataset: + dataset_path = pathlib.Path(args.dataset).expanduser() + if not gui._load_mat_file(str(dataset_path), quiet=False): # noqa: SLF001 + print(f"Failed to load dataset '{dataset_path}'.", file=sys.stderr) + + if args.snapshot: + snapshot_path = pathlib.Path(args.snapshot).expanduser() + if not gui.load_view_snapshot_from_path(snapshot_path): + print(f"Failed to load snapshot '{snapshot_path}'.", file=sys.stderr) + exec_method = getattr(app, "exec", None) if exec_method is None: exec_method = app.exec_ diff --git a/robot_log_visualizer/plotter/pyqtgraph_viewer_canvas.py b/robot_log_visualizer/plotter/pyqtgraph_viewer_canvas.py index c8acaac..cba5d1b 100644 --- a/robot_log_visualizer/plotter/pyqtgraph_viewer_canvas.py +++ b/robot_log_visualizer/plotter/pyqtgraph_viewer_canvas.py @@ -4,7 +4,7 @@ from __future__ import annotations -from typing import Dict, Iterable, Sequence, Tuple +from typing import Any, Dict, Iterable, List, Sequence, Tuple import numpy as np import pyqtgraph as pg # type: ignore @@ -57,6 +57,7 @@ def __init__( self._curves: Dict[str, pg.PlotDataItem] = {} self._annotations: Dict[Point, pg.TextItem] = {} self._markers: Dict[Point, pg.ScatterPlotItem] = {} + self._annotation_sources: Dict[Point, str] = {} self._palette: Iterable = ColorPalette() # UI set‑up @@ -85,21 +86,23 @@ def set_signal_provider(self, signal_provider) -> None: self._update_realtime_curves ) - def update_plots(self, paths: Sequence[Path], legends: Sequence[Legend]) -> None: + def update_plots( + self, paths: Sequence[Path], legends: Sequence[Legend] + ) -> List[str]: """Synchronise plots with the *paths* list. New items are added, disappeared items removed. Existing ones are left untouched to avoid flicker. """ if self._signal_provider is None: - return + return [] # For real-time provider, update the set of selected signals to buffer if self._signal_provider.provider_type == ProviderType.REALTIME: selected_keys = ["::".join(path) for path in paths] self._signal_provider.add_signals_to_buffer(selected_keys) - self._add_missing_curves(paths, legends) + missing_paths = self._add_missing_curves(paths, legends) self._remove_obsolete_curves(paths) # Set the X axis range based on the provider type @@ -117,6 +120,8 @@ def update_plots(self, paths: Sequence[Path], legends: Sequence[Legend]) -> None self._signal_provider.end_time - self._signal_provider.initial_time, ) + return missing_paths + # The following trio is wired to whoever controls the replay/stream def pause_animation(self) -> None: # noqa: D401 """Pause the vertical‑line animation.""" @@ -173,23 +178,46 @@ def _connect_signals(self) -> None: def _add_missing_curves( self, paths: Sequence[Path], legends: Sequence[Legend] - ) -> None: + ) -> List[str]: """Plot curves that are present in *paths* but not yet on screen.""" + + missing: List[str] = [] for path, legend in zip(paths, legends): key = "/".join(path) if key in self._curves: continue # Drill down to the data array - data = self._signal_provider.data - for subkey in path[:-1]: - data = data[subkey] try: - y = data["data"][:, int(path[-1])] - except (IndexError, ValueError): # scalar variable - y = data["data"][:] + data = self._signal_provider.data + for subkey in path[:-1]: + data = data[subkey] + + data_array = data["data"] + timestamps = data["timestamps"] + except (KeyError, TypeError): + missing.append(key) + continue + + try: + y = data_array[:, int(path[-1])] + except ( + IndexError, + ValueError, + TypeError, + ): # scalar variable or invalid index + try: + y = data_array[:] + except Exception: + missing.append(key) + continue + + try: + x = timestamps - self._signal_provider.initial_time + except Exception: + missing.append(key) + continue - x = data["timestamps"] - self._signal_provider.initial_time palette_color = next(self._palette) pen = pg.mkPen(palette_color.as_hex(), width=2) @@ -201,12 +229,21 @@ def _add_missing_curves( symbol=None, ) + return missing + def _remove_obsolete_curves(self, paths: Sequence[Path]) -> None: """Delete curves that disappeared from *paths*.""" valid = {"/".join(p) for p in paths} for key in [k for k in self._curves if k not in valid]: self._plot.removeItem(self._curves.pop(key)) + # Remove annotations associated to the deleted curve + orphan_points = [ + pt for pt, src in self._annotation_sources.items() if src == key + ] + for point in orphan_points: + self._deselect(point) + def _update_vline(self) -> None: """Move the vertical line to ``current_time``.""" if self._signal_provider is None: @@ -279,10 +316,11 @@ def _on_mouse_click(self, event) -> None: # noqa: N802 self._deselect(candidate) else: assert nearest_curve is not None # mypy‑friendly - self._select(candidate, nearest_curve.opts["pen"]) + self._select(candidate, nearest_curve) - def _select(self, pt: Point, pen: pg.QtGui.QPen) -> None: + def _select(self, pt: Point, curve: pg.PlotDataItem) -> None: """Add label + circle marker on *pt*.""" + pen = curve.opts["pen"] x_span = np.diff(self._plot.viewRange()[0])[0] y_span = np.diff(self._plot.viewRange()[1])[0] x_prec = max(0, int(-np.log10(max(x_span, 1e-12))) + 2) @@ -313,10 +351,15 @@ def _select(self, pt: Point, pen: pg.QtGui.QPen) -> None: self._plot.addItem(marker) self._markers[pt] = marker + curve_key = self._curve_key(curve) + if curve_key is not None: + self._annotation_sources[pt] = curve_key + def _deselect(self, pt: Point) -> None: """Remove annotation + marker on *pt*.""" self._plot.removeItem(self._annotations.pop(pt)) self._plot.removeItem(self._markers.pop(pt)) + self._annotation_sources.pop(pt, None) def clear_selections(self) -> None: # noqa: D401 """Remove **all** annotations and markers.""" @@ -324,3 +367,137 @@ def clear_selections(self) -> None: # noqa: D401 self._plot.removeItem(item) self._annotations.clear() self._markers.clear() + self._annotation_sources.clear() + + def clear_curves(self) -> None: + """Remove every plotted curve and related markers.""" + for key in list(self._curves.keys()): + self._plot.removeItem(self._curves.pop(key)) + self.clear_selections() + + def capture_state(self) -> Dict[str, Any]: + """Return a snapshot of the current canvas configuration.""" + + annotations: List[Dict[str, Any]] = [] + for pt, label in self._annotations.items(): + source = self._annotation_sources.get(pt) + if source is None: + continue + annotations.append( + { + "path": source, + "point": [float(pt[0]), float(pt[1])], + "label": label.toPlainText(), + } + ) + + curve_meta: Dict[str, Dict[str, Any]] = {} + for key, curve in self._curves.items(): + pen_info: Dict[str, Any] = {} + pen = curve.opts.get("pen") + if pen is not None: + qcol = pen.color() + pen_info["color"] = ( + f"#{qcol.red():02x}{qcol.green():02x}{qcol.blue():02x}" + ) + pen_info["width"] = pen.width() + curve_meta[key] = { + "label": curve.opts.get("name", ""), + "pen": pen_info, + } + + return { + "curves": curve_meta, + "view_range": self._plot.viewRange(), + "legend_visible": bool(self._plot.plotItem.legend.isVisible()), + "annotations": annotations, + } + + def apply_view_range(self, view_range: Sequence[Sequence[float]]) -> None: + """Restore the axes limits from a snapshot.""" + + if len(view_range) != 2: + return + + x_range, y_range = view_range + if len(x_range) == 2: + self._plot.setXRange(float(x_range[0]), float(x_range[1]), padding=0) + if len(y_range) == 2: + self._plot.setYRange(float(y_range[0]), float(y_range[1]), padding=0) + + def set_legend_visible(self, visible: bool) -> None: + """Toggle legend visibility.""" + + legend = getattr(self._plot.plotItem, "legend", None) + if legend is not None: + legend.setVisible(bool(visible)) + + def restore_annotations(self, annotations: Sequence[Dict[str, Any]]) -> List[str]: + """Re-create selection markers from saved data. + + Returns: + List of curve identifiers that could not be restored. + """ + + missing: List[str] = [] + for ann in annotations: + key = ann.get("path") + point = ann.get("point") + if key is None or point is None: + continue + curve = self._curves.get(str(key)) + if curve is None: + missing.append(str(key)) + continue + try: + pt_tuple: Point = (float(point[0]), float(point[1])) + except (TypeError, ValueError, IndexError): + missing.append(str(key)) + continue + self._select(pt_tuple, curve) + return missing + + def _curve_key(self, curve: pg.PlotDataItem) -> str | None: + for key, item in self._curves.items(): + if item is curve: + return key + return None + + def apply_curve_metadata(self, metadata: Dict[str, Dict[str, Any]]) -> None: + for key, info in metadata.items(): + curve = self._curves.get(key) + if curve is None: + continue + + label = info.get("label") + if label is not None: + label_text = str(label) + if hasattr(curve, "setName"): + curve.setName(label_text) + else: + curve.opts["name"] = label_text + legend = getattr(self._plot.plotItem, "legend", None) + if legend is not None: + if hasattr(legend, "itemChanged"): + legend.itemChanged(curve) + else: # compatibility fallback for older pyqtgraph releases + try: + legend.removeItem(curve) + except Exception: + pass + legend.addItem(curve, label_text) + + pen_info = info.get("pen", {}) + color = pen_info.get("color") + width = pen_info.get("width") + if color is not None or width is not None: + kwargs: Dict[str, Any] = {} + if color is not None: + kwargs["color"] = color + if width is not None: + try: + kwargs["width"] = float(width) + except (TypeError, ValueError): + pass + if kwargs: + curve.setPen(pg.mkPen(**kwargs)) diff --git a/robot_log_visualizer/signal_provider/signal_provider.py b/robot_log_visualizer/signal_provider/signal_provider.py index 285fb49..d1eb544 100644 --- a/robot_log_visualizer/signal_provider/signal_provider.py +++ b/robot_log_visualizer/signal_provider/signal_provider.py @@ -421,6 +421,18 @@ def get_3d_trajectory_at_index(self, index): return trajectories + def export_registered_3d_points(self): + locker = QMutexLocker(self._3d_points_path_lock) + return {key: list(value) for key, value in self._3d_points_path.items()} + + def export_registered_3d_trajectories(self): + locker = QMutexLocker(self._3d_trajectories_path_lock) + return {key: list(value) for key, value in self._3d_trajectories_path.items()} + + def export_registered_3d_arrows(self): + locker = QMutexLocker(self._3d_arrows_path_lock) + return {key: list(value) for key, value in self._3d_arrows.items()} + def register_3d_point(self, key, points_path): self._3d_points_path_lock.lock() self._3d_points_path[key] = points_path diff --git a/robot_log_visualizer/ui/gui.py b/robot_log_visualizer/ui/gui.py index 02ec82a..0883237 100644 --- a/robot_log_visualizer/ui/gui.py +++ b/robot_log_visualizer/ui/gui.py @@ -3,13 +3,15 @@ # Released under the terms of the BSD 3-Clause License # QtPy abstraction +from __future__ import annotations from qtpy import QtWidgets, QtGui, QtCore from qtpy import QtWebEngineWidgets # noqa: F401 # Ensure WebEngine is initialised -from qtpy.QtCore import QMutex, QMutexLocker, QUrl, Qt, Slot +from qtpy.QtCore import QMutex, QMutexLocker, QSaveFile, QUrl, Qt, Slot, QIODevice from qtpy.QtWidgets import ( QDialog, QDialogButtonBox, QFileDialog, + QMessageBox, QLineEdit, QToolButton, QTreeWidgetItem, @@ -33,17 +35,32 @@ from robot_log_visualizer.ui.plot_item import PlotItem from robot_log_visualizer.ui.video_item import VideoItem from robot_log_visualizer.ui.text_logging import TextLoggingItem +from robot_log_visualizer.ui.view_snapshot import ( + SNAPSHOT_VERSION, + ViewSnapshot, + PlotSnapshot, + VisualizationSnapshot, + TimelineSnapshot, + DatasetSnapshot, + WindowSnapshot, + TreeSnapshot, +) from robot_log_visualizer.utils.utils import ( PeriodicThreadState, RobotStatePath, ColorPalette, + Color, ) import sys import os +import json +import base64 import pathlib import re +from datetime import datetime +from typing import Any, Dict, List, Optional, Sequence import numpy as np @@ -222,6 +239,9 @@ def __init__(self, signal_provider_period, meshcat_provider, animation_period): self.visualized_3d_points = set() self.visualized_3d_trajectories = set() self.visualized_3d_arrows = set() + self.visualized_3d_points_colors = {} + self.visualized_3d_trajectories_colors = {} + self.visualized_3d_arrows_colors = {} self.visualized_3d_points_colors_palette = ColorPalette() self.toolButton_on_click() @@ -238,6 +258,17 @@ def __init__(self, signal_provider_period, meshcat_provider, animation_period): self.dataset_loaded = False + self.loaded_mat_path = None + self._settings = QtCore.QSettings("ami-iit", "robot-log-visualizer") + self._snapshot_dir_key = "snapshot/lastDirectory" + self._snapshot_last_dir = pathlib.Path( + self._settings.value(self._snapshot_dir_key, pathlib.Path.home()) + ) + + self.ui.actionSave_View.setEnabled(False) + self.ui.actionSave_View.triggered.connect(self.save_view_snapshot) + self.ui.actionLoad_View.triggered.connect(self.load_view_snapshot) + # connect action self.ui.actionQuit.triggered.connect(self.close) self.ui.actionOpen.triggered.connect(self.open_mat_file) @@ -485,9 +516,15 @@ def variableTreeWidget_on_click(self): "legends": legends, } - self.plot_items[self.ui.tabPlotWidget.currentIndex()].canvas.update_plots( - paths, legends - ) + missing_curves = self.plot_items[ + self.ui.tabPlotWidget.currentIndex() + ].canvas.update_plots(paths, legends) + + if missing_curves: + self.logger.write_to_log( + "Some signals could not be plotted: " + + ", ".join(sorted(set(missing_curves))) + ) def find_text_log_index(self, path): current_time = self.signal_provider.current_time @@ -688,82 +725,958 @@ def __populate_text_logging_tree_widget(self, obj, parent) -> QTreeWidgetItem: parent.addChild(item) return parent - def __load_mat_file(self, file_name): - self.signal_provider = MatfileSignalProvider( + def _clear_plot_tabs(self) -> None: + while self.ui.tabPlotWidget.count() > 0: + widget = self.ui.tabPlotWidget.widget(0) + if isinstance(widget, PlotItem): + widget.canvas.quit_animation() + widget.deleteLater() + self.ui.tabPlotWidget.removeTab(0) + self.plot_items.clear() + self.plotData.clear() + + def _ensure_plot_tab_count(self, count: int) -> None: + count = max(1, count) + while self.ui.tabPlotWidget.count() > count: + self.plotTabCloseButton_on_click(self.ui.tabPlotWidget.count() - 1) + while self.ui.tabPlotWidget.count() < count: + self.toolButton_on_click() + + def _clear_visualization_items(self) -> None: + """Remove all registered 3D primitives from Meshcat and reset trackers.""" + + if self.meshcat_provider is not None: + for point in list(self.visualized_3d_points): + try: + self.meshcat_provider.unregister_3d_point(point) + except KeyError: + pass + for trajectory in list(self.visualized_3d_trajectories): + try: + self.meshcat_provider.unregister_3d_trajectory(trajectory) + except KeyError: + pass + for arrow in list(self.visualized_3d_arrows): + try: + self.meshcat_provider.unregister_3d_arrow(arrow) + except KeyError: + pass + + self.visualized_3d_points.clear() + self.visualized_3d_trajectories.clear() + self.visualized_3d_arrows.clear() + self.visualized_3d_points_colors.clear() + self.visualized_3d_trajectories_colors.clear() + self.visualized_3d_arrows_colors.clear() + self.visualized_3d_points_colors_palette.reset() + + def _prepare_for_new_dataset(self) -> None: + if self.signal_provider is not None: + try: + self.signal_provider.update_index_signal.disconnect(self.update_index) + except (TypeError, RuntimeError): + pass + self.signal_provider.state = PeriodicThreadState.closed + self.signal_provider.wait() + self.signal_provider = None + + self.dataset_loaded = False + self.loaded_mat_path = None + + self.ui.variableTreeWidget.clear() + self.ui.yarpTextLogTreeWidget.clear() + self.robot_state_path = RobotStatePath() + + # Clear registered 3D items in Meshcat + self._clear_visualization_items() + + # Remove video tabs (keep meshcat tab at index 0) + while self.ui.meshcatAndVideoTab.count() > 1: + widget = self.ui.meshcatAndVideoTab.widget(1) + widget.deleteLater() + self.ui.meshcatAndVideoTab.removeTab(1) + for video_item in self.video_items: + video_item.deleteLater() + self.video_items.clear() + + self._clear_plot_tabs() + self.toolButton_on_click() + self.ui.tabPlotWidget.setCurrentIndex(0) + self.ui.tabPlotWidget.setTabsClosable(False) + + self.ui.timeSlider.setEnabled(False) + self.ui.timeSlider.setValue(0) + self.ui.startButton.setEnabled(False) + self.ui.pauseButton.setEnabled(False) + + self.realtime_connection_enabled = False + self.ui.actionRealtime_Connect.setEnabled(True) + self.ui.actionSave_View.setEnabled(False) + + def _snapshot_start_directory(self) -> pathlib.Path: + if self._snapshot_last_dir.exists(): + return self._snapshot_last_dir + return pathlib.Path.home() + + def _remember_snapshot_directory(self, path: pathlib.Path) -> None: + self._snapshot_last_dir = path + self._settings.setValue(self._snapshot_dir_key, str(path)) + + @staticmethod + def _encode_qbytearray(data: QtCore.QByteArray | None) -> Optional[str]: + if data is None or data.isEmpty(): + return None + return bytes(data.toBase64()).decode("ascii") + + @staticmethod + def _decode_qbytearray(encoded: Optional[str]) -> Optional[QtCore.QByteArray]: + if not encoded: + return None + return QtCore.QByteArray.fromBase64(encoded.encode("ascii")) + + def _selected_variable_paths(self) -> List[List[str]]: + paths: List[List[str]] = [] + for index in self.ui.variableTreeWidget.selectedIndexes(): + if index.column() != 0: + continue + path: List[str] = [] + is_leaf = True + current = index + while current.data() is not None: + if is_leaf: + path.append(str(current.row())) + is_leaf = False + else: + path.append(str(current.data())) + current = current.parent() + path.reverse() + if path: + paths.append(path) + return paths + + def _selected_text_log_paths(self) -> List[List[str]]: + paths: List[List[str]] = [] + for index in self.ui.yarpTextLogTreeWidget.selectedIndexes(): + if index.column() != 0: + continue + path: List[str] = [] + current = index + while current.data() is not None: + path.append(str(current.data())) + current = current.parent() + path.reverse() + if path: + paths.append(path) + return paths + + def _find_variable_tree_item( + self, path: Sequence[str] + ) -> Optional[QTreeWidgetItem]: + if not path: + return None + + matches = self.ui.variableTreeWidget.findItems( + path[0], Qt.MatchFixedString | Qt.MatchExactly + ) + if not matches: + return None + item = matches[0] + + for part in path[1:]: + try: + child_index = int(part) + except (TypeError, ValueError): + child_index = None + + if child_index is not None: + if 0 <= child_index < item.childCount(): + item = item.child(child_index) + else: + return None + else: + found = None + for i in range(item.childCount()): + if item.child(i).text(0) == part: + found = item.child(i) + break + if found is None: + return None + item = found + + return item + + def _find_text_log_tree_item( + self, path: Sequence[str] + ) -> Optional[QTreeWidgetItem]: + if not path: + return None + + matches = self.ui.yarpTextLogTreeWidget.findItems( + path[0], Qt.MatchFixedString | Qt.MatchExactly + ) + if not matches: + return None + item = matches[0] + + for part in path[1:]: + found = None + for i in range(item.childCount()): + if item.child(i).text(0) == part: + found = item.child(i) + break + if found is None: + return None + item = found + + return item + + def _collect_plot_snapshots(self) -> List[PlotSnapshot]: + snapshots: List[PlotSnapshot] = [] + for index, plot_item in enumerate(self.plot_items): + plot_info = self.plotData.get(index, {"paths": [], "legends": []}) + paths = [ + [str(component) for component in path] + for path in plot_info.get("paths", []) + ] + legends = [ + [str(component) for component in legend] + for legend in plot_info.get("legends", []) + ] + snapshots.append( + PlotSnapshot( + title=self.ui.tabPlotWidget.tabText(index) or "Plot", + paths=paths, + legends=legends, + canvas=plot_item.capture_canvas_state(), + ) + ) + return snapshots + + def _collect_visualization_snapshot(self) -> VisualizationSnapshot: + visualization = VisualizationSnapshot() + + if self.signal_provider is None: + return visualization + + for key, path in self.signal_provider.export_registered_3d_points().items(): + visualization.points.append( + { + "key": key, + "path": list(path), + "color": self.visualized_3d_points_colors.get(key), + } + ) + + for ( + key, + path, + ) in self.signal_provider.export_registered_3d_trajectories().items(): + visualization.trajectories.append( + { + "key": key, + "path": list(path), + "color": self.visualized_3d_trajectories_colors.get(key), + } + ) + + for key, path in self.signal_provider.export_registered_3d_arrows().items(): + visualization.arrows.append( + { + "key": key, + "path": list(path), + "color": self.visualized_3d_arrows_colors.get(key), + } + ) + + visualization.robot_state = { + "base_position_path": list(self.robot_state_path.base_position_path), + "base_orientation_path": list(self.robot_state_path.base_orientation_path), + "joints_state_path": list(self.robot_state_path.joints_state_path), + } + if getattr(self.meshcat_provider, "base_frame", ""): + visualization.robot_state["base_frame"] = self.meshcat_provider.base_frame + + return visualization + + def _collect_timeline_snapshot(self) -> TimelineSnapshot: + if self.signal_provider is None: + return TimelineSnapshot( + slider_value=self.ui.timeSlider.value(), + slider_max=self.ui.timeSlider.maximum(), + ) + + return TimelineSnapshot( + index=self.signal_provider.index, + slider_value=self.ui.timeSlider.value(), + slider_max=self.ui.timeSlider.maximum(), + is_running=self.signal_provider.state == PeriodicThreadState.running, + current_time=self.signal_provider.current_time, + ) + + def _apply_base_highlights( + self, base_position_path: Sequence[str], base_orientation_path: Sequence[str] + ) -> None: + selected_color = QtGui.QColor(255, 0, 0, 127) + + if base_position_path: + item = self._find_variable_tree_item(base_position_path) + if item is not None: + item.setBackground(0, QtGui.QBrush(selected_color)) + + if base_orientation_path: + item = self._find_variable_tree_item(base_orientation_path) + if item is not None: + item.setBackground(0, QtGui.QBrush(selected_color)) + + def _register_snapshot_point( + self, key: str, path: Sequence[str], color_hex: Optional[str] + ) -> bool: + if self.signal_provider is None: + return False + + item = self._find_variable_tree_item(path) + if item is None: + return False + + color_obj = ( + Color(color_hex) + if color_hex + else next(self.visualized_3d_points_colors_palette) + ) + color_hex_value = color_obj.as_hex() + + item.setForeground(0, QtGui.QBrush(QtGui.QColor(color_hex_value))) + self.meshcat_provider.register_3d_point( + key, list(color_obj.as_normalized_rgb()) + ) + self.signal_provider.register_3d_point(key, list(path)) + + self.visualized_3d_points.add(key) + self.visualized_3d_points_colors[key] = color_hex_value + return True + + def _register_snapshot_trajectory( + self, key: str, path: Sequence[str], color_hex: Optional[str] + ) -> bool: + if self.signal_provider is None: + return False + + item = self._find_variable_tree_item(path) + if item is None: + return False + + color_obj = ( + Color(color_hex) + if color_hex + else next(self.visualized_3d_points_colors_palette) + ) + color_hex_value = color_obj.as_hex() + + item.setForeground(0, QtGui.QBrush(QtGui.QColor(color_hex_value))) + self.meshcat_provider.register_3d_trajectory( + key, list(color_obj.as_normalized_rgb()) + ) + self.signal_provider.register_3d_trajectory(key, list(path)) + + self.visualized_3d_trajectories.add(key) + self.visualized_3d_trajectories_colors[key] = color_hex_value + return True + + def _register_snapshot_arrow( + self, key: str, path: Sequence[str], color_hex: Optional[str] + ) -> bool: + if self.signal_provider is None: + return False + + item = self._find_variable_tree_item(path) + if item is None: + return False + + color_obj = ( + Color(color_hex) + if color_hex + else next(self.visualized_3d_points_colors_palette) + ) + color_hex_value = color_obj.as_hex() + + item.setForeground(0, QtGui.QBrush(QtGui.QColor(color_hex_value))) + self.meshcat_provider.register_3d_arrow( + key, list(color_obj.as_normalized_rgb()) + ) + self.signal_provider.register_3d_arrow(key, list(path)) + + self.visualized_3d_arrows.add(key) + self.visualized_3d_arrows_colors[key] = color_hex_value + return True + + def _set_playback_running(self, running: bool) -> None: + if self.signal_provider is None: + return + + if running: + if self.signal_provider.state != PeriodicThreadState.running: + self.startButton_on_click() + else: + if self.signal_provider.state == PeriodicThreadState.running: + self.pauseButton_on_click() + + def _build_view_snapshot(self) -> ViewSnapshot: + dataset_provider = "offline" + dataset_path: Optional[str] = None + robot_name: Optional[str] = None + if self.signal_provider is not None: + robot_name = getattr(self.signal_provider, "robot_name", None) + if self.signal_provider.provider_type == ProviderType.REALTIME: + dataset_provider = "realtime" + else: + dataset_path = self.loaded_mat_path + + window_snapshot = WindowSnapshot( + geometry=self._encode_qbytearray(self.saveGeometry()), + state=self._encode_qbytearray(self.saveState()), + plot_tab_index=self.ui.tabPlotWidget.currentIndex(), + main_tab_index=self.ui.tabWidget.currentIndex(), + meshcat_tab_index=self.ui.meshcatAndVideoTab.currentIndex(), + ) + + metadata = { + "created_at": datetime.utcnow().isoformat(timespec="seconds"), + "application": "robot-log-visualizer", + } + + return ViewSnapshot( + metadata=metadata, + dataset=DatasetSnapshot( + path=dataset_path, + robot_name=robot_name, + provider=dataset_provider, + ), + window=window_snapshot, + timeline=self._collect_timeline_snapshot(), + plots=self._collect_plot_snapshots(), + tree=TreeSnapshot( + selected_variables=self._selected_variable_paths(), + selected_text_logs=self._selected_text_log_paths(), + ), + visualization=self._collect_visualization_snapshot(), + ) + + def save_view_snapshot(self) -> None: + if self.signal_provider is None: + QMessageBox.information( + self, + "Save view snapshot", + "No dataset is currently loaded. Load a dataset before saving a view.", + ) + return + + start_dir = str(self._snapshot_start_directory()) + file_name, _ = QFileDialog.getSaveFileName( + self, + "Save view snapshot", + start_dir, + "View snapshots (*.json)", + ) + + if not file_name: + return + + snapshot_path = pathlib.Path(file_name) + if snapshot_path.suffix.lower() != ".json": + snapshot_path = snapshot_path.with_suffix(".json") + + if snapshot_path.exists(): + answer = QMessageBox.question( + self, + "Overwrite snapshot", + f"The file '{snapshot_path}' already exists. Overwrite it?", + QMessageBox.Yes | QMessageBox.No, + QMessageBox.No, + ) + if answer != QMessageBox.Yes: + return + + snapshot = self._build_view_snapshot() + + try: + payload = json.dumps(snapshot.to_dict(), indent=2, sort_keys=True) + except TypeError as exc: # pragma: no cover - defensive branch + message = f"Unable to serialise snapshot: {exc}" + QMessageBox.critical(self, "Save view snapshot", message) + self.logger.write_to_log(message) + return + + save_file = QSaveFile(str(snapshot_path)) + if not save_file.open(QIODevice.WriteOnly | QIODevice.Text): + message = f"Cannot open '{snapshot_path}' for writing." + QMessageBox.critical(self, "Save view snapshot", message) + self.logger.write_to_log(message) + return + + save_file.write(payload.encode("utf-8")) + if not save_file.commit(): # pragma: no cover - defensive branch + message = f"Failed to write snapshot to '{snapshot_path}'." + QMessageBox.critical(self, "Save view snapshot", message) + self.logger.write_to_log(message) + return + + self._remember_snapshot_directory(snapshot_path.parent) + self.logger.write_to_log(f"Snapshot saved to '{snapshot_path}'.") + + def _ensure_dataset_for_snapshot( + self, snapshot: ViewSnapshot, *, quiet: bool = False + ) -> bool: + provider_mode = (snapshot.dataset.provider or "offline").lower() + + if provider_mode == "realtime": + QMessageBox.warning( + self, + "Load view snapshot", + "Snapshots captured from real-time sessions cannot be restored yet.", + ) + self.logger.write_to_log( + "Realtime snapshot restoration is not supported; aborting load." + ) + return False + + expected_path = snapshot.dataset.path + + # If no dataset is currently loaded and no path is provided, ask the user. + if not expected_path: + if self.dataset_loaded: + return True + replacement, _ = QFileDialog.getOpenFileName( + self, + "Select dataset for snapshot", + str(self._snapshot_start_directory()), + "MAT files (*.mat)", + ) + if not replacement: + self.logger.write_to_log( + "Snapshot restoration cancelled: dataset selection skipped." + ) + return False + return self._load_mat_file(replacement) + + expected_path_obj = pathlib.Path(expected_path).expanduser() + expected_exists = expected_path_obj.exists() + + if not expected_exists: + warning = f"Dataset '{expected_path_obj}' referenced in the snapshot could not be found." + if not quiet: + QMessageBox.warning(self, "Load view snapshot", warning) + self.logger.write_to_log(warning) + + user_choice = None + if not quiet: + msg_box = QMessageBox(self) + msg_box.setIcon(QMessageBox.Question) + msg_box.setWindowTitle("Load dataset") + if expected_exists: + msg_box.setText( + "The snapshot references the dataset:\n" f"{expected_path_obj}" + ) + else: + msg_box.setText("The dataset referenced in the snapshot was not found.") + msg_box.setInformativeText( + "Yes: load the referenced dataset.\n" + "No: choose a different dataset.\n" + "Cancel: keep the current dataset." + ) + msg_box.setStandardButtons( + QMessageBox.Yes | QMessageBox.No | QMessageBox.Cancel + ) + msg_box.setDefaultButton(QMessageBox.Yes) + user_choice = msg_box.exec_() + else: + user_choice = QMessageBox.Yes if expected_exists else QMessageBox.No + + if user_choice == QMessageBox.Cancel: + if self.dataset_loaded: + self.logger.write_to_log( + "Snapshot dataset load skipped by user; keeping current dataset." + ) + return True + self.logger.write_to_log( + "Snapshot restoration cancelled: no dataset loaded." + ) + return False + + if user_choice == QMessageBox.Yes and expected_exists: + if self.dataset_loaded and self.loaded_mat_path == str(expected_path_obj): + return True + if self._load_mat_file(str(expected_path_obj)): + return True + self.logger.write_to_log( + f"Snapshot restoration cancelled: failed to load '{expected_path_obj}'." + ) + return False + + # Either user selected "No" or the referenced dataset is missing -> prompt for replacement. + replacement_dir = ( + expected_path_obj.parent + if expected_path_obj.parent.exists() + else self._snapshot_start_directory() + ) + replacement, _ = QFileDialog.getOpenFileName( + self, + "Select dataset", + str(replacement_dir), + "MAT files (*.mat)", + ) + if not replacement: + self.logger.write_to_log( + "Snapshot restoration cancelled: replacement dataset not selected." + ) + return False + + if self._load_mat_file(replacement): + return True + + self.logger.write_to_log( + f"Snapshot restoration cancelled: failed to load '{replacement}'." + ) + return False + + def _apply_view_snapshot(self, snapshot: ViewSnapshot) -> None: + window_snapshot = snapshot.window + geometry = self._decode_qbytearray(window_snapshot.geometry) + if geometry is not None: + self.restoreGeometry(geometry) + state = self._decode_qbytearray(window_snapshot.state) + if state is not None: + self.restoreState(state) + + # Ensure the requested number of plot tabs and clear existing data. + plot_snapshots = snapshot.plots or [] + self._ensure_plot_tab_count(len(plot_snapshots)) + self.plotData.clear() + for plot_item in self.plot_items: + plot_item.clear_canvas() + + missing_curves: set[str] = set() + missing_annotations: set[str] = set() + + for index, plot_snapshot in enumerate(plot_snapshots): + plot_item = self.plot_items[index] + + paths = [ + [str(component) for component in path] for path in plot_snapshot.paths + ] + legends = [ + [str(component) for component in legend] + for legend in plot_snapshot.legends + ] + + self.plotData[index] = {"paths": paths, "legends": legends} + + missing = plot_item.canvas.update_plots(paths, legends) + if missing: + missing_curves.update(missing) + + missing_ann = plot_item.apply_canvas_state(plot_snapshot.canvas) + if missing_ann: + missing_annotations.update(missing_ann) + + self.ui.tabPlotWidget.setTabText(index, plot_snapshot.title) + + # Restore selections in the variable tree (plots). + missing_variable_paths: List[str] = [] + self.ui.variableTreeWidget.clearSelection() + for path in snapshot.tree.selected_variables: + item = self._find_variable_tree_item(path) + if item is None: + missing_variable_paths.append("/".join(map(str, path))) + continue + item.setSelected(True) + + # Restore Meshcat visualisations if a dataset is loaded. + missing_points: List[str] = [] + missing_trajectories: List[str] = [] + missing_arrows: List[str] = [] + + if self.signal_provider is not None: + self._clear_visualization_items() + + for entry in snapshot.visualization.points: + key = entry.get("key") + path = entry.get("path", []) + color = entry.get("color") + if not key or not path: + continue + if not self._register_snapshot_point(key, path, color): + missing_points.append(str(key)) + + for entry in snapshot.visualization.trajectories: + key = entry.get("key") + path = entry.get("path", []) + color = entry.get("color") + if not key or not path: + continue + if not self._register_snapshot_trajectory(key, path, color): + missing_trajectories.append(str(key)) + + for entry in snapshot.visualization.arrows: + key = entry.get("key") + path = entry.get("path", []) + color = entry.get("color") + if not key or not path: + continue + if not self._register_snapshot_arrow(key, path, color): + missing_arrows.append(str(key)) + + robot_state_info = snapshot.visualization.robot_state or {} + robot_state_path = RobotStatePath() + robot_state_path.base_position_path = [ + str(component) + for component in robot_state_info.get("base_position_path", []) + ] + robot_state_path.base_orientation_path = [ + str(component) + for component in robot_state_info.get("base_orientation_path", []) + ] + robot_state_path.joints_state_path = [ + str(component) + for component in robot_state_info.get("joints_state_path", []) + ] + self.robot_state_path = robot_state_path + + base_frame = robot_state_info.get("base_frame") + if base_frame: + self.meshcat_provider.base_frame = str(base_frame) + + self._apply_base_highlights( + self.robot_state_path.base_position_path, + self.robot_state_path.base_orientation_path, + ) + + # Restore playback state only; keep timeline position on current dataset defaults. + timeline = snapshot.timeline + if self.signal_provider is not None: + self._set_playback_running(timeline.is_running) + + # Restore tab selections (clamp indices to available ranges). + if self.ui.tabWidget.count() > 0: + self.ui.tabWidget.setCurrentIndex( + max( + 0, + min(window_snapshot.main_tab_index, self.ui.tabWidget.count() - 1), + ) + ) + if self.ui.tabPlotWidget.count() > 0: + self.ui.tabPlotWidget.setCurrentIndex( + max( + 0, + min( + window_snapshot.plot_tab_index, + self.ui.tabPlotWidget.count() - 1, + ), + ) + ) + if self.ui.meshcatAndVideoTab.count() > 0: + self.ui.meshcatAndVideoTab.setCurrentIndex( + max( + 0, + min( + window_snapshot.meshcat_tab_index, + self.ui.meshcatAndVideoTab.count() - 1, + ), + ) + ) + + # Report missing artefacts, if any. + if missing_curves: + self.logger.write_to_log( + "Missing signals while restoring plots: " + + ", ".join(sorted(missing_curves)) + ) + if missing_annotations: + self.logger.write_to_log( + "Annotations skipped because their signals were unavailable: " + + ", ".join(sorted(missing_annotations)) + ) + if missing_variable_paths: + self.logger.write_to_log( + "Variables not found for selection: " + + ", ".join(sorted(missing_variable_paths)) + ) + if missing_points: + self.logger.write_to_log( + "3D points not restored: " + ", ".join(sorted(missing_points)) + ) + if missing_trajectories: + self.logger.write_to_log( + "3D trajectories not restored: " + + ", ".join(sorted(missing_trajectories)) + ) + if missing_arrows: + self.logger.write_to_log( + "3D arrows not restored: " + ", ".join(sorted(missing_arrows)) + ) + + def load_view_snapshot_from_path( + self, snapshot_path: pathlib.Path | str, *, quiet: bool = False + ) -> bool: + path = pathlib.Path(snapshot_path).expanduser() + + try: + with path.open("r", encoding="utf-8") as handle: + raw_snapshot = json.load(handle) + except FileNotFoundError: + message = f"Snapshot file '{path}' does not exist." + if not quiet: + QMessageBox.warning(self, "Load view snapshot", message) + self.logger.write_to_log(message) + return False + except json.JSONDecodeError as exc: + message = f"Snapshot file '{path}' is not valid JSON: {exc}." + if not quiet: + QMessageBox.critical(self, "Load view snapshot", message) + self.logger.write_to_log(message) + return False + except OSError as exc: # pragma: no cover - filesystem errors + message = f"Unable to read snapshot '{path}': {exc}." + if not quiet: + QMessageBox.critical(self, "Load view snapshot", message) + self.logger.write_to_log(message) + return False + + try: + snapshot = ViewSnapshot.from_dict(raw_snapshot) + except Exception as exc: # pragma: no cover - defensive branch + message = f"Snapshot file '{path}' is not compatible: {exc}." + if not quiet: + QMessageBox.critical(self, "Load view snapshot", message) + self.logger.write_to_log(message) + return False + + self._remember_snapshot_directory(path.parent) + + if snapshot.version > SNAPSHOT_VERSION: + warning = ( + f"Snapshot was created with a newer version ({snapshot.version}). " + "Attempting to load with current version." + ) + if not quiet: + QMessageBox.information(self, "Load view snapshot", warning) + self.logger.write_to_log(warning) + + if not self._ensure_dataset_for_snapshot(snapshot, quiet=quiet): + return False + + self._apply_view_snapshot(snapshot) + self.ui.actionSave_View.setEnabled(self.dataset_loaded) + self.logger.write_to_log(f"Snapshot '{path}' loaded.") + return True + + def load_view_snapshot(self) -> None: + start_dir = str(self._snapshot_start_directory()) + file_name, _ = QFileDialog.getOpenFileName( + self, + "Load view snapshot", + start_dir, + "View snapshots (*.json)", + ) + + if not file_name: + return + + self.load_view_snapshot_from_path(file_name) + + def _load_mat_file(self, file_name: str, *, quiet: bool = False) -> bool: + file_path = pathlib.Path(file_name).expanduser() + + if not file_path.exists(): + message = f"File '{file_path}' not found." + if not quiet: + self.logger.write_to_log(message) + QMessageBox.warning(self, "Load dataset", message) + return False + + self._prepare_for_new_dataset() + + provider = MatfileSignalProvider( self.signal_provider_period, "robot_logger_device" ) - self.signal_provider.open(file_name) - self.signal_size = len(self.signal_provider) - self.signal_provider.start() - self.signal_provider.register_update_index(self.update_index) + try: + provider.open(str(file_path)) + except Exception as exc: # pragma: no cover - defensive + message = f"Unable to read '{file_path}': {exc}" + if not quiet: + self.logger.write_to_log(message) + QMessageBox.critical(self, "Load dataset", message) + return False - # add signal provider to the plot items - self.plot_items[-1].set_signal_provider(self.signal_provider) + self.signal_provider = provider + self.signal_size = len(provider) + provider.start() + + provider.register_update_index(self.update_index) + + for plot_item in self.plot_items: + plot_item.set_signal_provider(provider) # load the model and load the provider - self.meshcat_provider.set_signal_provider(self.signal_provider) + self.meshcat_provider.set_signal_provider(provider) if not self.meshcat_provider.load_model( - self.signal_provider.joints_name, self.signal_provider.robot_name + provider.joints_name, provider.robot_name ): - # if not loaded we print an error but we continue msg = "Unable to load the model: " if self.meshcat_provider.model_path: - msg = msg + self.meshcat_provider.model_path + msg += self.meshcat_provider.model_path else: - msg = msg + self.signal_provider.robot_name - + msg += provider.robot_name self.logger.write_to_log(msg) self.meshcat_provider.start() # populate tree - root = list(self.signal_provider.data.keys())[0] + root = list(provider.data.keys())[0] root_item = QTreeWidgetItem([root]) root_item.setFlags(root_item.flags() & ~Qt.ItemIsSelectable) - items = self.__populate_variable_tree_widget( - self.signal_provider.data[root], root_item - ) + items = self.__populate_variable_tree_widget(provider.data[root], root_item) self.ui.variableTreeWidget.insertTopLevelItems(0, [items]) # populate text logging tree - if self.signal_provider.text_logging_data: - root = list(self.signal_provider.text_logging_data.keys())[0] + if provider.text_logging_data: + root = list(provider.text_logging_data.keys())[0] root_item = QTreeWidgetItem([root]) root_item.setFlags(root_item.flags() & ~Qt.ItemIsSelectable) items = self.__populate_text_logging_tree_widget( - self.signal_provider.text_logging_data[root], root_item + provider.text_logging_data[root], root_item ) self.ui.yarpTextLogTreeWidget.insertTopLevelItems(0, [items]) # spawn the console - self.pyconsole.push_local_ns("data", self.signal_provider.data) + self.pyconsole.push_local_ns("data", provider.data) self.ui.timeSlider.setMaximum(self.signal_size) + self.ui.timeSlider.setValue(0) self.ui.startButton.setEnabled(True) + self.ui.pauseButton.setEnabled(False) self.ui.timeSlider.setEnabled(True) - # get all the video associated to the datase - filename_without_path = pathlib.Path(file_name).name - (prefix, sep, suffix) = filename_without_path.rpartition(".") + # get all the videos associated to the dataset + filename_without_path = file_path.name + prefix, _, _ = filename_without_path.rpartition(".") video_filenames = [ - str(pathlib.Path(file_name).parent.absolute() / pathlib.Path(f)) - for f in os.listdir(pathlib.Path(file_name).parent.absolute()) + file_path.parent / pathlib.Path(f) + for f in os.listdir(file_path.parent) if re.search(prefix + r"_[a-zA-Z0-9_]*\.mp4$", f) ] - # for every video we create a video item and we append it to the tab for video_filename in video_filenames: - video_prefix, _, _ = pathlib.Path(video_filename).name.rpartition(".") + video_prefix, _, _ = video_filename.name.rpartition(".") video_label = str(video_prefix).replace(prefix + "_", "") - self.video_items.append(VideoItem(video_filename=video_filename)) + video_item = VideoItem(video_filename=str(video_filename)) + self.video_items.append(video_item) self.ui.meshcatAndVideoTab.addTab( - self.video_items[-1], get_icon("videocam-outline.svg"), video_label + video_item, get_icon("videocam-outline.svg"), video_label ) - self.logger.write_to_log("Video '" + video_filename + "' opened.") + self.logger.write_to_log(f"Video '{video_filename}' opened.") - # pause all the videos for video_item in self.video_items: if video_item.media_loaded: video_item.media_player.pause() @@ -772,19 +1685,19 @@ def __load_mat_file(self, file_name): self.ui.actionRealtime_Connect.setEnabled(False) self.dataset_loaded = True + self.loaded_mat_path = str(file_path) + self.ui.actionSave_View.setEnabled(True) - # write something in the log - self.logger.write_to_log("File '" + file_name + "' opened.") - self.logger.write_to_log( - "Robot name: '" + self.signal_provider.robot_name + "'." - ) + self.logger.write_to_log(f"File '{file_path}' opened.") + self.logger.write_to_log(f"Robot name: '{provider.robot_name}'.") + return True def open_mat_file(self): file_name, _ = QFileDialog.getOpenFileName( self, "Open a mat file", ".", filter="*.mat" ) if file_name: - self.__load_mat_file(file_name) + self._load_mat_file(file_name) return True return False @@ -900,7 +1813,7 @@ def dropEvent(self, event): url = event.mimeData().urls()[0].toLocalFile() ext = os.path.splitext(url)[-1].lower() if ext == ".mat": - self.__load_mat_file(url) + self._load_mat_file(url) event.accept() else: event.ignore() @@ -1028,18 +1941,21 @@ def variableTreeWidget_on_right_click(self, item_position): ) self.signal_provider.register_3d_point(item_key, item_path) self.visualized_3d_points.add(item_key) + self.visualized_3d_points_colors[item_key] = color.as_hex() elif action.text() == add_3d_trajectory_str: self.meshcat_provider.register_3d_trajectory( item_key, list(color.as_normalized_rgb()) ) self.signal_provider.register_3d_trajectory(item_key, item_path) self.visualized_3d_trajectories.add(item_key) + self.visualized_3d_trajectories_colors[item_key] = color.as_hex() elif action.text() == add_3d_arrow_str: self.meshcat_provider.register_3d_arrow( item_key, list(color.as_normalized_rgb()) ) self.signal_provider.register_3d_arrow(item_key, item_path) self.visualized_3d_arrows.add(item_key) + self.visualized_3d_arrows_colors[item_key] = color.as_hex() else: raise ValueError("Unknown action") @@ -1047,18 +1963,21 @@ def variableTreeWidget_on_right_click(self, item_position): self.meshcat_provider.unregister_3d_point(item_key) self.signal_provider.unregister_3d_point(item_key) self.visualized_3d_points.remove(item_key) + self.visualized_3d_points_colors.pop(item_key, None) item.setForeground(0, QtGui.QBrush(QtGui.QColor(0, 0, 0))) if action.text() == remove_3d_trajectory_str: self.meshcat_provider.unregister_3d_trajectory(item_key) self.signal_provider.unregister_3d_trajectory(item_key) self.visualized_3d_trajectories.remove(item_key) + self.visualized_3d_trajectories_colors.pop(item_key, None) item.setForeground(0, QtGui.QBrush(QtGui.QColor(0, 0, 0))) if action.text() == remove_3d_arrow_str: self.meshcat_provider.unregister_3d_arrow(item_key) self.signal_provider.unregister_3d_arrow(item_key) self.visualized_3d_arrows.remove(item_key) + self.visualized_3d_arrows_colors.pop(item_key, None) item.setForeground(0, QtGui.QBrush(QtGui.QColor(0, 0, 0))) if ( diff --git a/robot_log_visualizer/ui/misc/visualizer.ui b/robot_log_visualizer/ui/misc/visualizer.ui index 32e531f..097f1c1 100644 --- a/robot_log_visualizer/ui/misc/visualizer.ui +++ b/robot_log_visualizer/ui/misc/visualizer.ui @@ -503,6 +503,8 @@ &File + + @@ -565,6 +567,22 @@ Ctrl+R + + + Save View… + + + Ctrl+Shift+S + + + + + Load View… + + + Ctrl+Shift+O + + diff --git a/robot_log_visualizer/ui/plot_item.py b/robot_log_visualizer/ui/plot_item.py index c0889c0..5e2baaf 100644 --- a/robot_log_visualizer/ui/plot_item.py +++ b/robot_log_visualizer/ui/plot_item.py @@ -2,6 +2,8 @@ # This software may be modified and distributed under the terms of the # Released under the terms of the BSD 3-Clause License +from typing import Any, Dict, List + from qtpy.QtWidgets import QFrame from robot_log_visualizer.plotter.pyqtgraph_viewer_canvas import PyQtGraphViewerCanvas @@ -17,3 +19,18 @@ def __init__(self, period): def set_signal_provider(self, signal_provider): self.canvas.set_signal_provider(signal_provider) + + def clear_canvas(self) -> None: + self.canvas.clear_curves() + + def capture_canvas_state(self) -> Dict[str, Any]: + return self.canvas.capture_state() + + def apply_canvas_state(self, state: Dict[str, Any]) -> List[str]: + if not state: + return [] + + self.canvas.apply_view_range(state.get("view_range", [])) + self.canvas.set_legend_visible(state.get("legend_visible", True)) + self.canvas.apply_curve_metadata(state.get("curves", {})) + return self.canvas.restore_annotations(state.get("annotations", [])) diff --git a/robot_log_visualizer/ui/video_item.py b/robot_log_visualizer/ui/video_item.py index 3df7fba..177072a 100644 --- a/robot_log_visualizer/ui/video_item.py +++ b/robot_log_visualizer/ui/video_item.py @@ -4,6 +4,7 @@ from qtpy.QtCore import QUrl from qtpy.QtMultimedia import QMediaPlayer + try: # Qt 6 from qtpy.QtMultimedia import QAudioOutput # type: ignore except ImportError: # Qt 5 fallback diff --git a/robot_log_visualizer/ui/view_snapshot.py b/robot_log_visualizer/ui/view_snapshot.py new file mode 100644 index 0000000..7db6ca4 --- /dev/null +++ b/robot_log_visualizer/ui/view_snapshot.py @@ -0,0 +1,210 @@ +# Copyright (C) 2025 Istituto Italiano di Tecnologia (IIT). +# This software may be modified and distributed under the terms of the +# BSD 3-Clause License. + +"""Data structures for serialising view snapshots to/from JSON.""" + +from __future__ import annotations + +from dataclasses import dataclass, field +from typing import Any, Dict, List, Optional + + +SNAPSHOT_VERSION = 1 + + +def _copy_path_list(value: Any) -> List[str]: + return [str(v) for v in (value or [])] + + +@dataclass +class DatasetSnapshot: + path: Optional[str] = None + robot_name: Optional[str] = None + provider: str = "offline" + + def to_dict(self) -> Dict[str, Any]: + return { + "path": self.path, + "robot_name": self.robot_name, + "provider": self.provider, + } + + @classmethod + def from_dict(cls, raw: Dict[str, Any]) -> "DatasetSnapshot": + return cls( + path=raw.get("path"), + robot_name=raw.get("robot_name"), + provider=raw.get("provider", "offline"), + ) + + +@dataclass +class WindowSnapshot: + geometry: Optional[str] = None + state: Optional[str] = None + plot_tab_index: int = 0 + main_tab_index: int = 0 + meshcat_tab_index: int = 0 + + def to_dict(self) -> Dict[str, Any]: + return { + "geometry": self.geometry, + "state": self.state, + "plot_tab_index": self.plot_tab_index, + "main_tab_index": self.main_tab_index, + "meshcat_tab_index": self.meshcat_tab_index, + } + + @classmethod + def from_dict(cls, raw: Dict[str, Any]) -> "WindowSnapshot": + return cls( + geometry=raw.get("geometry"), + state=raw.get("state"), + plot_tab_index=int(raw.get("plot_tab_index", 0)), + main_tab_index=int(raw.get("main_tab_index", 0)), + meshcat_tab_index=int(raw.get("meshcat_tab_index", 0)), + ) + + +@dataclass +class TimelineSnapshot: + index: int = 0 + slider_value: int = 0 + slider_max: int = 0 + is_running: bool = False + current_time: float = 0.0 + + def to_dict(self) -> Dict[str, Any]: + return { + "index": self.index, + "slider_value": self.slider_value, + "slider_max": self.slider_max, + "is_running": self.is_running, + "current_time": self.current_time, + } + + @classmethod + def from_dict(cls, raw: Dict[str, Any]) -> "TimelineSnapshot": + return cls( + index=int(raw.get("index", 0)), + slider_value=int(raw.get("slider_value", 0)), + slider_max=int(raw.get("slider_max", 0)), + is_running=bool(raw.get("is_running", False)), + current_time=float(raw.get("current_time", 0.0)), + ) + + +@dataclass +class PlotSnapshot: + title: str = "Plot" + paths: List[List[str]] = field(default_factory=list) + legends: List[List[str]] = field(default_factory=list) + canvas: Dict[str, Any] = field(default_factory=dict) + + def to_dict(self) -> Dict[str, Any]: + return { + "title": self.title, + "paths": self.paths, + "legends": self.legends, + "canvas": self.canvas, + } + + @classmethod + def from_dict(cls, raw: Dict[str, Any]) -> "PlotSnapshot": + return cls( + title=raw.get("title", "Plot"), + paths=[_copy_path_list(p) for p in raw.get("paths", [])], + legends=[_copy_path_list(l) for l in raw.get("legends", [])], + canvas=dict(raw.get("canvas", {})), + ) + + +@dataclass +class TreeSnapshot: + selected_variables: List[List[str]] = field(default_factory=list) + selected_text_logs: List[List[str]] = field(default_factory=list) + + def to_dict(self) -> Dict[str, Any]: + return { + "selected_variables": self.selected_variables, + "selected_text_logs": self.selected_text_logs, + } + + @classmethod + def from_dict(cls, raw: Dict[str, Any]) -> "TreeSnapshot": + return cls( + selected_variables=[ + _copy_path_list(path) for path in raw.get("selected_variables", []) + ], + selected_text_logs=[ + _copy_path_list(path) for path in raw.get("selected_text_logs", []) + ], + ) + + +@dataclass +class VisualizationSnapshot: + points: List[Dict[str, Any]] = field(default_factory=list) + trajectories: List[Dict[str, Any]] = field(default_factory=list) + arrows: List[Dict[str, Any]] = field(default_factory=list) + robot_state: Dict[str, Any] = field(default_factory=dict) + + def to_dict(self) -> Dict[str, Any]: + return { + "points": self.points, + "trajectories": self.trajectories, + "arrows": self.arrows, + "robot_state": self.robot_state, + } + + @classmethod + def from_dict(cls, raw: Dict[str, Any]) -> "VisualizationSnapshot": + return cls( + points=[dict(entry) for entry in raw.get("points", [])], + trajectories=[dict(entry) for entry in raw.get("trajectories", [])], + arrows=[dict(entry) for entry in raw.get("arrows", [])], + robot_state=dict(raw.get("robot_state", {})), + ) + + +@dataclass +class ViewSnapshot: + """Container for all serialised UI state.""" + + version: int = SNAPSHOT_VERSION + metadata: Dict[str, Any] = field(default_factory=dict) + dataset: DatasetSnapshot = field(default_factory=DatasetSnapshot) + window: WindowSnapshot = field(default_factory=WindowSnapshot) + timeline: TimelineSnapshot = field(default_factory=TimelineSnapshot) + plots: List[PlotSnapshot] = field(default_factory=list) + tree: TreeSnapshot = field(default_factory=TreeSnapshot) + visualization: VisualizationSnapshot = field(default_factory=VisualizationSnapshot) + + def to_dict(self) -> Dict[str, Any]: + return { + "version": self.version, + "metadata": self.metadata, + "dataset": self.dataset.to_dict(), + "window": self.window.to_dict(), + "timeline": self.timeline.to_dict(), + "plots": [plot.to_dict() for plot in self.plots], + "tree": self.tree.to_dict(), + "visualization": self.visualization.to_dict(), + } + + @classmethod + def from_dict(cls, raw: Dict[str, Any]) -> "ViewSnapshot": + version = int(raw.get("version", SNAPSHOT_VERSION)) + return cls( + version=version, + metadata=dict(raw.get("metadata", {})), + dataset=DatasetSnapshot.from_dict(dict(raw.get("dataset", {}))), + window=WindowSnapshot.from_dict(dict(raw.get("window", {}))), + timeline=TimelineSnapshot.from_dict(dict(raw.get("timeline", {}))), + plots=[PlotSnapshot.from_dict(entry) for entry in raw.get("plots", [])], + tree=TreeSnapshot.from_dict(dict(raw.get("tree", {}))), + visualization=VisualizationSnapshot.from_dict( + dict(raw.get("visualization", {})) + ), + ) diff --git a/robot_log_visualizer/utils/utils.py b/robot_log_visualizer/utils/utils.py index 0c465e8..215b701 100644 --- a/robot_log_visualizer/utils/utils.py +++ b/robot_log_visualizer/utils/utils.py @@ -41,8 +41,7 @@ def hex_to_rgb(hex_value: str) -> tuple[int, int, int]: hex_value = hex_value.lstrip("#") hlen = len(hex_value) return tuple( - int(hex_value[i : i + hlen // 3], 16) - for i in range(0, hlen, hlen // 3) + int(hex_value[i : i + hlen // 3], 16) for i in range(0, hlen, hlen // 3) ) @staticmethod