3
3
"""
4
4
5
5
import argparse
6
- from multiprocessing import Pool , Manager
7
6
from tqdm import tqdm
7
+ from osi3trace .osi_trace import OSITrace
8
8
import os , sys
9
9
10
10
sys .path .append (os .path .join (os .path .dirname (__file__ ), "." ))
14
14
import osi_rules
15
15
import osi_validator_logger
16
16
import osi_rules_checker
17
- import osi_trace
17
+ import linked_proto_field
18
18
except Exception as e :
19
19
print (
20
20
"Make sure you have installed the requirements with 'pip install -r requirements.txt'!"
@@ -39,9 +39,9 @@ def command_line_arguments():
39
39
)
40
40
parser .add_argument (
41
41
"--data" ,
42
- default = "" ,
43
42
help = "Path to the file with OSI-serialized data." ,
44
43
type = str ,
44
+ required = True ,
45
45
)
46
46
parser .add_argument (
47
47
"--rules" ,
@@ -83,48 +83,43 @@ def command_line_arguments():
83
83
parser .add_argument (
84
84
"--parallel" ,
85
85
"-p" ,
86
- help = "Set parallel mode to ON." ,
86
+ help = "(Ignored) Set parallel mode to ON." ,
87
87
default = False ,
88
88
required = False ,
89
89
action = "store_true" ,
90
90
)
91
91
parser .add_argument (
92
92
"--format" ,
93
93
"-f" ,
94
- help = "Set the format type of the trace." ,
95
- choices = ["separated" , None ],
94
+ help = "(Ignored) Set the format type of the trace." ,
95
+ choices = [None ],
96
96
default = None ,
97
97
type = str ,
98
98
required = False ,
99
99
)
100
100
parser .add_argument (
101
101
"--blast" ,
102
102
"-bl" ,
103
- help = "Set the in-memory storage count of OSI messages during validation." ,
103
+ help = "Set the maximum in-memory storage count of OSI messages during validation." ,
104
104
default = 500 ,
105
105
type = check_positive_int ,
106
106
required = False ,
107
107
)
108
108
parser .add_argument (
109
109
"--buffer" ,
110
110
"-bu" ,
111
- help = "Set the buffer size to retrieve OSI messages from trace file. Set it to 0 if you do not want to use buffering at all." ,
112
- default = 1000000 ,
111
+ help = "(Ignored) Set the buffer size to retrieve OSI messages from trace file. Set it to 0 if you do not want to use buffering at all." ,
112
+ default = 0 ,
113
113
type = check_positive_int ,
114
114
required = False ,
115
115
)
116
116
117
117
return parser .parse_args ()
118
118
119
119
120
- MANAGER = Manager ()
121
- LOGS = MANAGER .list ()
122
- TIMESTAMP_ANALYZED = MANAGER .list ()
120
+ LOGS = []
123
121
LOGGER = osi_validator_logger .OSIValidatorLogger ()
124
122
VALIDATION_RULES = osi_rules .OSIRules ()
125
- ID_TO_TS = {}
126
- BAR_SUFFIX = "%(index)d/%(max)d [%(elapsed_td)s]"
127
- MESSAGE_CACHE = {}
128
123
129
124
130
125
def main ():
@@ -143,11 +138,7 @@ def main():
143
138
144
139
# Read data
145
140
print ("Reading data ..." )
146
- DATA = osi_trace .OSITrace (buffer_size = args .buffer )
147
- DATA .from_file (path = args .data , type_name = args .type , max_index = args .timesteps )
148
-
149
- if DATA .timestep_count < args .timesteps :
150
- args .timesteps = - 1
141
+ trace = OSITrace (path = args .data , type_name = args .type )
151
142
152
143
# Collect Validation Rules
153
144
print ("Collect validation rules ..." )
@@ -159,140 +150,50 @@ def main():
159
150
LOGGER .info (None , f"Pass the { max_timestep } first timesteps" )
160
151
else :
161
152
LOGGER .info (None , "Pass all timesteps" )
162
- max_timestep = DATA .timestep_count
163
-
164
- # Dividing in several blast to not overload the memory
165
- max_timestep_blast = 0
166
-
167
- while max_timestep_blast < max_timestep :
168
- # Clear log queue
169
- LOGS = MANAGER .list ()
170
-
171
- # Increment the max-timestep to analyze
172
- max_timestep_blast += args .blast
173
- first_of_blast = max_timestep_blast - args .blast
174
- last_of_blast = min (max_timestep_blast , max_timestep )
175
-
176
- # Cache messages
177
- DATA .cache_messages_in_index_range (first_of_blast , last_of_blast )
178
- MESSAGE_CACHE .update (DATA .message_cache )
179
-
180
- if args .parallel :
181
- # Launch parallel computation
182
- # Recreate the pool
153
+ max_timestep = None
154
+
155
+ index = 0
156
+ total_length = os .path .getsize (args .data )
157
+ current_pos = 0
158
+
159
+ with tqdm (total = total_length , unit = "B" , unit_scale = True , unit_divisor = 1024 ) as pbar :
160
+ for message in trace :
161
+ if (index % args .blast == 0 ):
162
+ LOGS = []
163
+ if max_timestep and index >= max_timestep :
164
+ pbar .update (total_length - current_pos )
165
+ break
183
166
try :
184
- argument_list = [
185
- (i , args .type ) for i in tqdm (range (first_of_blast , last_of_blast ))
186
- ]
187
- with Pool () as pool :
188
- pool .starmap (process_timestep , argument_list )
189
-
167
+ process_message (message , index , args .type )
190
168
except Exception as e :
191
169
print (str (e ))
170
+ new_pos = trace .file .tell ()
171
+ pbar .update (new_pos - current_pos )
172
+ current_pos = new_pos
173
+ index += 1
192
174
193
- finally :
194
- close_pool (pool )
195
- print ("\n Closed pool!" )
196
- else :
197
- # Launch sequential computation
198
- try :
199
- for i in tqdm (range (first_of_blast , last_of_blast )):
200
- process_timestep (i , args .type )
201
-
202
- except Exception as e :
203
- print (str (e ))
204
-
205
- MESSAGE_CACHE .clear ()
206
-
207
- DATA .trace_file .close ()
175
+ trace .close ()
208
176
display_results ()
209
177
210
178
211
- def close_pool (pool ):
212
- """Cleanly close a pool to free the memory"""
213
- pool .close ()
214
- pool .terminate ()
215
- pool .join ()
216
-
217
-
218
- def process_timestep (timestep , data_type ):
219
- """Process one timestep"""
220
- message = MESSAGE_CACHE [timestep ]
179
+ def process_message (message , timestep , data_type ):
180
+ """Process one message"""
221
181
rule_checker = osi_rules_checker .OSIRulesChecker (LOGGER )
222
- timestamp = rule_checker .set_timestamp (message .value .timestamp , timestep )
223
- ID_TO_TS [timestep ] = timestamp
182
+ timestamp = rule_checker .set_timestamp (message .timestamp , timestep )
224
183
225
184
LOGGER .log_messages [timestep ] = []
226
185
LOGGER .debug_messages [timestep ] = []
227
186
LOGGER .info (None , f"Analyze message of timestamp { timestamp } " , False )
228
187
229
- with MANAGER .Lock ():
230
- if timestamp in TIMESTAMP_ANALYZED :
231
- LOGGER .error (timestep , f"Timestamp already exists" )
232
- TIMESTAMP_ANALYZED .append (timestamp )
233
-
234
188
# Check common rules
235
189
getattr (rule_checker , "is_valid" )(
236
- message , VALIDATION_RULES .get_rules ().get_type (data_type )
190
+ linked_proto_field .LinkedProtoField (message , name = data_type ),
191
+ VALIDATION_RULES .get_rules ().get_type (data_type ),
237
192
)
238
193
239
194
LOGS .extend (LOGGER .log_messages [timestep ])
240
195
241
196
242
- def get_message_count (data , data_type = "SensorView" , from_message = 0 , to_message = None ):
243
- # Wrapper function for external use in combination with process_timestep
244
- timesteps = None
245
-
246
- if from_message != 0 :
247
- print ("Currently only validation from the first frame (0) is supported!" )
248
-
249
- if to_message is not None :
250
- timesteps = int (to_message )
251
-
252
- # Read data
253
- print ("Reading data ..." )
254
- DATA = osi_trace .OSITrace (buffer_size = 1000000 )
255
- DATA .from_file (path = data , type_name = data_type , max_index = timesteps )
256
-
257
- if DATA .timestep_count < timesteps :
258
- timesteps = - 1
259
-
260
- # Collect Validation Rules
261
- print ("Collect validation rules ..." )
262
- try :
263
- VALIDATION_RULES .from_yaml_directory ("osi-validation/rules/" )
264
- except Exception as e :
265
- print ("Error collecting validation rules:" , e )
266
-
267
- # Pass all timesteps or the number specified
268
- if timesteps != - 1 :
269
- max_timestep = timesteps
270
- LOGGER .info (None , f"Pass the { max_timestep } first timesteps" )
271
- else :
272
- LOGGER .info (None , "Pass all timesteps" )
273
- max_timestep = DATA .timestep_count
274
-
275
- # Dividing in several blast to not overload the memory
276
- max_timestep_blast = 0
277
-
278
- while max_timestep_blast < max_timestep :
279
- # Clear log queue
280
- LOGS [:] = []
281
-
282
- # Increment the max-timestep to analyze
283
- max_timestep_blast += 500
284
- first_of_blast = max_timestep_blast - 500
285
- last_of_blast = min (max_timestep_blast , max_timestep )
286
-
287
- # Cache messages
288
- DATA .cache_messages_in_index_range (first_of_blast , last_of_blast )
289
- MESSAGE_CACHE .update (DATA .message_cache )
290
-
291
- DATA .trace_file .close ()
292
-
293
- return len (MESSAGE_CACHE )
294
-
295
-
296
197
# Synthetize Logs
297
198
def display_results ():
298
199
return LOGGER .synthetize_results (LOGS )
0 commit comments