-
Notifications
You must be signed in to change notification settings - Fork 7
Expand file tree
/
Copy pathmatfile_signal_provider.py
More file actions
186 lines (156 loc) · 7.26 KB
/
matfile_signal_provider.py
File metadata and controls
186 lines (156 loc) · 7.26 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
# Copyright (C) 2022 Istituto Italiano di Tecnologia (IIT). All rights reserved.
# This software may be modified and distributed under the terms of the
# Released under the terms of the BSD 3-Clause License
import time
import h5py
import numpy as np
from robot_log_visualizer.signal_provider.signal_provider import (
SignalProvider,
ProviderType,
TextLoggingMsg,
)
from robot_log_visualizer.utils.utils import PeriodicThreadState
class MatfileSignalProvider(SignalProvider):
def __init__(self, period: float, signal_root_name: str):
super().__init__(period, signal_root_name, ProviderType.OFFLINE)
def __len__(self):
return len(self.timestamps)
def __populate_text_logging_data(self, file_object):
data = {}
for key, value in file_object.items():
if not isinstance(value, h5py._hl.group.Group):
continue
if key == "#refs#":
continue
if "data" in value.keys():
data[key] = {}
level_ref = value["data"]["level"]
text_ref = value["data"]["text"]
data[key]["timestamps"] = np.squeeze(np.array(value["timestamps"]))
# New way to store the struct array in robometry https://github.com/robotology/robometry/pull/175
if text_ref.shape[0] == len(data[key]["timestamps"]):
# If len(value[text[0]].shape) == 2 then the text contains a string, otherwise it is empty
# We need to manually check the shape to handle the case in which the text is empty
data[key]["data"] = [
(
TextLoggingMsg(
text="".join(chr(c[0]) for c in value[text[0]]),
level="".join(chr(c[0]) for c in value[level[0]]),
)
if len(value[text[0]].shape) == 2
else TextLoggingMsg(
text="",
level="".join(chr(c[0]) for c in value[level[0]]),
)
)
for text, level in zip(text_ref, level_ref)
]
# Old approach (before https://github.com/robotology/robometry/pull/175)
else:
data[key]["data"] = [
TextLoggingMsg(
text="".join(chr(c[0]) for c in value[text]),
level="".join(chr(c[0]) for c in value[level]),
)
for text, level in zip(text_ref[0], level_ref[0])
]
else:
data[key] = self.__populate_text_logging_data(file_object=value)
return data
def __populate_numerical_data(self, file_object):
data = {}
# Cache for timestamp deduplication: share identical arrays
ts_cache = getattr(self, '_ts_cache', {})
self._ts_cache = ts_cache
for key, value in file_object.items():
if not isinstance(value, h5py._hl.group.Group):
continue
if key == "#refs#":
continue
if key == "log":
continue
if "data" in value.keys():
data[key] = {}
data[key]["data"] = np.squeeze(np.array(value["data"]))
raw_ts = np.squeeze(np.array(value["timestamps"]))
# Deduplicate: reuse an existing identical timestamp array
ts_key = (len(raw_ts), raw_ts[0], raw_ts[-1])
if ts_key in ts_cache and np.array_equal(ts_cache[ts_key], raw_ts):
data[key]["timestamps"] = ts_cache[ts_key]
else:
ts_cache[ts_key] = raw_ts
data[key]["timestamps"] = raw_ts
# if the initial or end time has been updated we can also update the entire timestamps dataset
if data[key]["timestamps"][0] < self.initial_time:
self.timestamps = data[key]["timestamps"]
self.initial_time = self.timestamps[0]
if data[key]["timestamps"][-1] > self.end_time:
self.timestamps = data[key]["timestamps"]
self.end_time = self.timestamps[-1]
# In yarp telemetry v0.4.0 the elements_names was saved.
if "elements_names" in value.keys():
elements_names_ref = value["elements_names"]
data[key]["elements_names"] = [
"".join(chr(c[0]) for c in value[ref])
for ref in elements_names_ref[0]
]
else:
data[key] = self.__populate_numerical_data(file_object=value)
return data
def open(self, source: str) -> bool:
with h5py.File(source, "r") as file:
root_variable = file.get(self.root_name)
self.data = self.__populate_numerical_data(file)
if "log" in root_variable.keys():
self.text_logging_data["log"] = self.__populate_text_logging_data(
root_variable["log"]
)
for name in file.keys():
if "description_list" in file[name].keys():
self.root_name = name
break
joint_ref = root_variable["description_list"]
self.joints_name = [
"".join(chr(c[0]) for c in file[ref]) for ref in joint_ref[0]
]
if "yarp_robot_name" in root_variable.keys():
robot_name_ref = root_variable["yarp_robot_name"]
try:
self.robot_name = "".join(chr(c[0]) for c in robot_name_ref)
except:
pass
self.index = 0
# Pre-compute relative timestamps for binary search in run()
self._relative_timestamps = self.timestamps - self.initial_time
return True
def run(self):
prev_wall = time.time()
was_running = False
while True:
start = time.time()
if self.state == PeriodicThreadState.running:
now = time.time()
if not was_running:
# Just resumed — ignore stale elapsed time
prev_wall = now
wall_elapsed = now - prev_wall
prev_wall = now
was_running = True
self.index_lock.lock()
self._current_time += wall_elapsed * self.playback_speed
self._current_time = min(
self._current_time, self._relative_timestamps[-1]
)
# Binary search (O(log n)) instead of linear scan
self._index = int(
np.searchsorted(self._relative_timestamps, self._current_time)
)
self.index_lock.unlock()
self.update_index_signal.emit()
else:
was_running = False
sleep_time = self.period - (time.time() - start)
if sleep_time > 0:
time.sleep(sleep_time)
if self.state == PeriodicThreadState.closed:
return