diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 7108951..c2127e6 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -48,6 +48,7 @@ jobs: - name: Checkout uses: actions/checkout@v4 with: + submodules: recursive lfs: true - name: Set up Python ${{ matrix.python-version }} @@ -55,35 +56,12 @@ jobs: with: python-version: ${{ matrix.python-version }} - - name: Cache Dependencies - id: cache-depends - uses: actions/cache@v3 - with: - path: protobuf-3.20.1 - key: ${{ runner.os }}-v2-depends - - - name: Download ProtoBuf - if: steps.cache-depends.outputs.cache-hit != 'true' - run: curl -OL https://github.com/protocolbuffers/protobuf/releases/download/v3.20.1/protobuf-all-3.20.1.tar.gz && tar xzvf protobuf-all-3.20.1.tar.gz - - - name: Build ProtoBuf - if: steps.cache-depends.outputs.cache-hit != 'true' - working-directory: protobuf-3.20.1 - run: ./configure DIST_LANG=cpp --prefix=/usr && make - - - name: Install ProtoBuf - working-directory: protobuf-3.20.1 - run: sudo make install && sudo ldconfig - - - name: Install Open Simulation Interface - shell: bash + - name: Set up Virtual Environment run: | - git submodule update --init python -m venv .venv source .venv/bin/activate python -m pip install --upgrade pip pip install -r requirements_develop.txt - cd open-simulation-interface && pip install . && cd .. - name: Generate parsed rules run: | @@ -100,7 +78,5 @@ jobs: run: | source .venv/bin/activate pip install . - osivalidator --data data/20210818T150542Z_sv_312_50_one_moving_object.txt -r rules - osivalidator --data data/20210818T150542Z_sv_312_50_one_moving_object.txt -r rules --parallel osivalidator --data data/20210818T150542Z_sv_312_50_one_moving_object.osi -r rules osivalidator --data data/20210818T150542Z_sv_312_50_one_moving_object.osi -r rules --parallel diff --git a/README.md b/README.md index c25b13f..0cf81bf 100644 --- a/README.md +++ b/README.md @@ -11,7 +11,7 @@ The full documentation on the validator and customization of the rules is availa ## Usage ```bash -usage: osivalidator [-h] [--data DATA] [--rules RULES] [--type {SensorView,GroundTruth,SensorData}] [--output OUTPUT] [--timesteps TIMESTEPS] [--debug] [--verbose] [--parallel] [--format {separated,None}] +usage: osivalidator [-h] --data DATA [--rules RULES] [--type {SensorView,GroundTruth,SensorData}] [--output OUTPUT] [--timesteps TIMESTEPS] [--debug] [--verbose] [--parallel] [--format {None}] [--blast BLAST] [--buffer BUFFER] Validate data defined at the input @@ -29,13 +29,13 @@ optional arguments: Number of timesteps to analyze. If -1, all. --debug Set the debug mode to ON. --verbose, -v Set the verbose mode to ON. - --parallel, -p Set parallel mode to ON. - --format {separated,None}, -f {separated,None} - Set the format type of the trace. + --parallel, -p (Ignored) Set parallel mode to ON. + --format {None}, -f {None} + (Ignored) Set the format type of the trace. --blast BLAST, -bl BLAST - Set the in-memory storage count of OSI messages during validation. + Set the maximum in-memory storage count of OSI messages during validation. --buffer BUFFER, -bu BUFFER - Set the buffer size to retrieve OSI messages from trace file. Set it to 0 if you do not want to use buffering at all. + (Ignored) Set the buffer size to retrieve OSI messages from trace file. Set it to 0 if you do not want to use buffering at all. ``` ## Installation @@ -80,5 +80,5 @@ $ source .venv/Scripts/activate ## Example command ```bash -$ osivalidator --data data/20210818T150542Z_sv_312_50_one_moving_object.txt --rules rules/ +$ osivalidator --data data/20210818T150542Z_sv_312_50_one_moving_object.osi --rules rules/ ``` diff --git a/data/20210818T150542Z_sv_312_50_one_moving_object.txt b/data/20210818T150542Z_sv_312_50_one_moving_object.txt deleted file mode 100644 index b5146c1..0000000 Binary files a/data/20210818T150542Z_sv_312_50_one_moving_object.txt and /dev/null differ diff --git a/doc/usage.adoc b/doc/usage.adoc index 580f3c0..cbee4ed 100644 --- a/doc/usage.adoc +++ b/doc/usage.adoc @@ -7,57 +7,39 @@ terminal which has the following usage: [source,bash] ---- -usage: osivalidator [-h] [--data DATA] [--rules RULES] [--type {SensorView,GroundTruth,SensorData}] [--output OUTPUT] [--timesteps TIMESTEPS] [--debug] [--verbose] [--parallel] [--format {separated,None}] +usage: osivalidator [-h] --data DATA [--rules RULES] [--type {SensorView,GroundTruth,SensorData}] [--output OUTPUT] [--timesteps TIMESTEPS] [--debug] [--verbose] [--parallel] [--format {None}] [--blast BLAST] [--buffer BUFFER] Validate data defined at the input +mandatory arguments: +--data DATA Path to the file with OSI-serialized data. + optional arguments: -h, --help show this help message and exit ---data DATA Path to the file with OSI-serialized data. --rules RULES, -r RULES - Directory with text files containig rules. + Directory with text files containig rules. --type {SensorView,GroundTruth,SensorData}, -t {SensorView,GroundTruth,SensorData} - Name of the type used to serialize data. + Name of the type used to serialize data. --output OUTPUT, -o OUTPUT - Output folder of the log files. + Output folder of the log files. --timesteps TIMESTEPS - Number of timesteps to analyze. If -1, all. + Number of timesteps to analyze. If -1, all. --debug Set the debug mode to ON. --verbose, -v Set the verbose mode to ON. ---parallel, -p Set parallel mode to ON. ---format {separated,None}, -f {separated,None} - Set the format type of the trace. +--parallel, -p (Ignored) Set parallel mode to ON. +--format {None}, -f {None} + (Ignored) Set the format type of the trace. --blast BLAST, -bl BLAST - Set the in-memory storage count of OSI messages during validation. + Set the maximum in-memory storage count of OSI messages during validation. --buffer BUFFER, -bu BUFFER - Set the buffer size to retrieve OSI messages from trace file. Set it to 0 if you do not want to use buffering at all. + (Ignored) Set the buffer size to retrieve OSI messages from trace file. Set it to 0 if you do not want to use buffering at all. ---- To run the validation first you need an OSI trace file which consists of multiple OSI messages. In the directory `+data+` of the repository we already provide an example trace file which is called -`+20210818T150542Z_sv_312_50_one_moving_object.txt+`. Use the -https://github.com/OpenSimulationInterface/open-simulation-interface/blob/master/format/txt2osi.py[txt2osi.py] -of the OSI repo in the format directory to convert from `+*.txt+` to -`+*.osi+` files. See usage below: - -[source,bash] ----- -usage: txt2osi converter [-h] [--file FILE] - [--type {SensorView,GroundTruth,SensorData}] - [--output OUTPUT] [--compress] - -Convert txt trace file to osi trace files. - -optional arguments: --h, --help show this help message and exit ---file FILE, -f FILE Path to the file with serialized data. ---type {SensorView,GroundTruth,SensorData}, -t {SensorView,GroundTruth,SensorData} - Name of the type used to serialize data. ---output OUTPUT, -o OUTPUT - Output name of the file. ----- +`+20210818T150542Z_sv_312_50_one_moving_object.osi+`. To validate the trace files you simply call `+osivalidator+` and provide the path to the trace: @@ -65,7 +47,6 @@ the path to the trace: [source,bash] ---- osivalidator --data data/20210818T150542Z_sv_312_50_one_moving_object.osi -osivalidator --data data/20210818T150542Z_sv_312_50_one_moving_object.txt ---- You can also validate the traces in parallel to increase the speed of @@ -84,7 +65,7 @@ need to generate them and then specify them: [source,bash] ---- python rules2yml.py # Generates the rule directory -osivalidator --data data/20210818T150542Z_sv_312_50_one_moving_object.txt --rules rules/ -p +osivalidator --data data/20210818T150542Z_sv_312_50_one_moving_object.osi --rules rules -p ---- After successfully running the validation the following output is diff --git a/osivalidator/osi_general_validator.py b/osivalidator/osi_general_validator.py index 675a94c..7e7e127 100755 --- a/osivalidator/osi_general_validator.py +++ b/osivalidator/osi_general_validator.py @@ -3,8 +3,8 @@ """ import argparse -from multiprocessing import Pool, Manager from tqdm import tqdm +from osi3trace.osi_trace import OSITrace import os, sys sys.path.append(os.path.join(os.path.dirname(__file__), ".")) @@ -14,7 +14,7 @@ import osi_rules import osi_validator_logger import osi_rules_checker - import osi_trace + import linked_proto_field except Exception as e: print( "Make sure you have installed the requirements with 'pip install -r requirements.txt'!" @@ -39,9 +39,9 @@ def command_line_arguments(): ) parser.add_argument( "--data", - default="", help="Path to the file with OSI-serialized data.", type=str, + required=True, ) parser.add_argument( "--rules", @@ -83,7 +83,7 @@ def command_line_arguments(): parser.add_argument( "--parallel", "-p", - help="Set parallel mode to ON.", + help="(Ignored) Set parallel mode to ON.", default=False, required=False, action="store_true", @@ -91,8 +91,8 @@ def command_line_arguments(): parser.add_argument( "--format", "-f", - help="Set the format type of the trace.", - choices=["separated", None], + help="(Ignored) Set the format type of the trace.", + choices=[None], default=None, type=str, required=False, @@ -100,7 +100,7 @@ def command_line_arguments(): parser.add_argument( "--blast", "-bl", - help="Set the in-memory storage count of OSI messages during validation.", + help="Set the maximum in-memory storage count of OSI messages during validation.", default=500, type=check_positive_int, required=False, @@ -108,8 +108,8 @@ def command_line_arguments(): parser.add_argument( "--buffer", "-bu", - help="Set the buffer size to retrieve OSI messages from trace file. Set it to 0 if you do not want to use buffering at all.", - default=1000000, + help="(Ignored) Set the buffer size to retrieve OSI messages from trace file. Set it to 0 if you do not want to use buffering at all.", + default=0, type=check_positive_int, required=False, ) @@ -117,14 +117,9 @@ def command_line_arguments(): return parser.parse_args() -MANAGER = Manager() -LOGS = MANAGER.list() -TIMESTAMP_ANALYZED = MANAGER.list() +LOGS = [] LOGGER = osi_validator_logger.OSIValidatorLogger() VALIDATION_RULES = osi_rules.OSIRules() -ID_TO_TS = {} -BAR_SUFFIX = "%(index)d/%(max)d [%(elapsed_td)s]" -MESSAGE_CACHE = {} def main(): @@ -143,11 +138,7 @@ def main(): # Read data print("Reading data ...") - DATA = osi_trace.OSITrace(buffer_size=args.buffer) - DATA.from_file(path=args.data, type_name=args.type, max_index=args.timesteps) - - if DATA.timestep_count < args.timesteps: - args.timesteps = -1 + trace = OSITrace(path=args.data, type_name=args.type) # Collect Validation Rules print("Collect validation rules ...") @@ -159,140 +150,48 @@ def main(): LOGGER.info(None, f"Pass the {max_timestep} first timesteps") else: LOGGER.info(None, "Pass all timesteps") - max_timestep = DATA.timestep_count - - # Dividing in several blast to not overload the memory - max_timestep_blast = 0 - - while max_timestep_blast < max_timestep: - # Clear log queue - LOGS = MANAGER.list() - - # Increment the max-timestep to analyze - max_timestep_blast += args.blast - first_of_blast = max_timestep_blast - args.blast - last_of_blast = min(max_timestep_blast, max_timestep) - - # Cache messages - DATA.cache_messages_in_index_range(first_of_blast, last_of_blast) - MESSAGE_CACHE.update(DATA.message_cache) - - if args.parallel: - # Launch parallel computation - # Recreate the pool + max_timestep = None + + total_length = os.path.getsize(args.data) + current_pos = 0 + + with tqdm(total=total_length, unit="B", unit_scale=True, unit_divisor=1024) as pbar: + for index, message in enumerate(trace): + if index % args.blast == 0: + LOGS = [] + if max_timestep and index >= max_timestep: + pbar.update(total_length - current_pos) + break try: - argument_list = [ - (i, args.type) for i in tqdm(range(first_of_blast, last_of_blast)) - ] - with Pool() as pool: - pool.starmap(process_timestep, argument_list) - + process_message(message, index, args.type) except Exception as e: print(str(e)) + new_pos = trace.file.tell() + pbar.update(new_pos - current_pos) + current_pos = new_pos - finally: - close_pool(pool) - print("\nClosed pool!") - else: - # Launch sequential computation - try: - for i in tqdm(range(first_of_blast, last_of_blast)): - process_timestep(i, args.type) - - except Exception as e: - print(str(e)) - - MESSAGE_CACHE.clear() - - DATA.trace_file.close() + trace.close() display_results() -def close_pool(pool): - """Cleanly close a pool to free the memory""" - pool.close() - pool.terminate() - pool.join() - - -def process_timestep(timestep, data_type): - """Process one timestep""" - message = MESSAGE_CACHE[timestep] +def process_message(message, timestep, data_type): + """Process one message""" rule_checker = osi_rules_checker.OSIRulesChecker(LOGGER) - timestamp = rule_checker.set_timestamp(message.value.timestamp, timestep) - ID_TO_TS[timestep] = timestamp + timestamp = rule_checker.set_timestamp(message.timestamp, timestep) LOGGER.log_messages[timestep] = [] LOGGER.debug_messages[timestep] = [] LOGGER.info(None, f"Analyze message of timestamp {timestamp}", False) - with MANAGER.Lock(): - if timestamp in TIMESTAMP_ANALYZED: - LOGGER.error(timestep, f"Timestamp already exists") - TIMESTAMP_ANALYZED.append(timestamp) - # Check common rules getattr(rule_checker, "is_valid")( - message, VALIDATION_RULES.get_rules().get_type(data_type) + linked_proto_field.LinkedProtoField(message, name=data_type), + VALIDATION_RULES.get_rules().get_type(data_type), ) LOGS.extend(LOGGER.log_messages[timestep]) -def get_message_count(data, data_type="SensorView", from_message=0, to_message=None): - # Wrapper function for external use in combination with process_timestep - timesteps = None - - if from_message != 0: - print("Currently only validation from the first frame (0) is supported!") - - if to_message is not None: - timesteps = int(to_message) - - # Read data - print("Reading data ...") - DATA = osi_trace.OSITrace(buffer_size=1000000) - DATA.from_file(path=data, type_name=data_type, max_index=timesteps) - - if DATA.timestep_count < timesteps: - timesteps = -1 - - # Collect Validation Rules - print("Collect validation rules ...") - try: - VALIDATION_RULES.from_yaml_directory("osi-validation/rules/") - except Exception as e: - print("Error collecting validation rules:", e) - - # Pass all timesteps or the number specified - if timesteps != -1: - max_timestep = timesteps - LOGGER.info(None, f"Pass the {max_timestep} first timesteps") - else: - LOGGER.info(None, "Pass all timesteps") - max_timestep = DATA.timestep_count - - # Dividing in several blast to not overload the memory - max_timestep_blast = 0 - - while max_timestep_blast < max_timestep: - # Clear log queue - LOGS[:] = [] - - # Increment the max-timestep to analyze - max_timestep_blast += 500 - first_of_blast = max_timestep_blast - 500 - last_of_blast = min(max_timestep_blast, max_timestep) - - # Cache messages - DATA.cache_messages_in_index_range(first_of_blast, last_of_blast) - MESSAGE_CACHE.update(DATA.message_cache) - - DATA.trace_file.close() - - return len(MESSAGE_CACHE) - - # Synthetize Logs def display_results(): return LOGGER.synthetize_results(LOGS) diff --git a/osivalidator/osi_trace.py b/osivalidator/osi_trace.py deleted file mode 100644 index 990f833..0000000 --- a/osivalidator/osi_trace.py +++ /dev/null @@ -1,350 +0,0 @@ -""" -Module that contains OSIDataContainer class to handle and manage OSI traces. -""" -from collections import deque -import time -import struct - -from osi3.osi_sensorview_pb2 import SensorView -from osi3.osi_groundtruth_pb2 import GroundTruth -from osi3.osi_sensordata_pb2 import SensorData -from tqdm import tqdm - -import warnings - -warnings.simplefilter("default") -import os, sys - -sys.path.append(os.path.join(os.path.dirname(__file__), ".")) -import linked_proto_field - -SEPARATOR = b"$$__$$" -SEPARATOR_LENGTH = len(SEPARATOR) - - -def get_size_from_file_stream(file_object): - """ - Return a file size from a file stream given in parameters - """ - current_position = file_object.tell() - file_object.seek(0, 2) - size = file_object.tell() - file_object.seek(current_position) - return size - - -MESSAGES_TYPE = { - "SensorView": SensorView, - "GroundTruth": GroundTruth, - "SensorData": SensorData, -} - - -class OSITrace: - """This class wrap OSI data. It can import and decode OSI traces.""" - - def __init__(self, buffer_size, show_progress=True, type_name="SensorView"): - self.trace_file = None - self.message_offsets = None - self.buffer_size = buffer_size - self._int_length = len(struct.pack(" self.buffer_size * (counter + 1) - - # Check if reached end of file - if self.trace_file.tell() == trace_size: - self.retrieved_trace_size = self.message_offsets[-1] - self.message_offsets.pop() # Remove the last element since after that there is no message coming - break - - while eof: - # Counter increment and cursor placement update. The cursor is set absolute in the file. - if message_offset >= len(serialized_message): - progress_bar.update(message_offset - last_offset) - last_offset = message_offset - counter += 1 - self.trace_file.seek(counter * self.buffer_size) - eof = False - - else: - serialized_message = self.trace_file.read() - while message_offset < trace_size: - message_length = struct.unpack( - "= trace_size: - break - self.message_offsets.append(message_offset) - progress_bar.update(message_offset - last_offset) - last_offset = message_offset - - if eof: - self.retrieved_trace_size = trace_size - else: - self.retrieved_trace_size = self.message_offsets[-1] - self.message_offsets.pop() - - if self.show_progress: - progress_bar.close() - print( - len(self.message_offsets), - "messages has been discovered in", - time.time() - start_time, - "s", - ) - - return len(self.message_offsets) - - def get_message_by_index(self, index): - """ - Get a message by its index. Try first to get it from the cache made - by the method ``cache_messages_in_index_range``. - """ - message = self.message_cache.get(index, None) - - if message is not None: - return message - - message = next(self.get_messages_in_index_range(index, index + 1)) - return linked_proto_field.LinkedProtoField(message, name=self.type_name) - - def get_messages_in_index_range(self, begin, end): - """ - Yield an iterator over messages of indexes between begin and end included. - """ - - self.trace_file.seek(self.message_offsets[begin]) - abs_first_offset = self.message_offsets[begin] - abs_last_offset = ( - self.message_offsets[end] - if end < len(self.message_offsets) - else self.retrieved_trace_size - ) - - rel_message_offsets = [ - abs_message_offset - abs_first_offset - for abs_message_offset in self.message_offsets[begin:end] - ] - - if self.path.lower().endswith((".txt")): - message_sequence_len = abs_last_offset - abs_first_offset - SEPARATOR_LENGTH - serialized_messages_extract = self.trace_file.read(message_sequence_len) - - pbar = tqdm(rel_message_offsets) - for rel_index, rel_message_offset in enumerate(pbar): - pbar.set_description( - f"Processing index {rel_index} with offset {rel_message_offset}" - ) - rel_begin = rel_message_offset - rel_end = ( - rel_message_offsets[rel_index + 1] - SEPARATOR_LENGTH - if rel_index + 1 < len(rel_message_offsets) - else message_sequence_len - ) - - message = MESSAGES_TYPE[self.type_name]() - serialized_message = serialized_messages_extract[rel_begin:rel_end] - message.ParseFromString(serialized_message) - yield linked_proto_field.LinkedProtoField(message, name=self.type_name) - - elif self.path.lower().endswith((".osi")): - message_sequence_len = abs_last_offset - abs_first_offset - serialized_messages_extract = self.trace_file.read(message_sequence_len) - message_length = 0 - i = 0 - while i < len(serialized_messages_extract): - message = MESSAGES_TYPE[self.type_name]() - message_length = struct.unpack( - "=0.9.0 ruamel.yaml>=0.18.5 defusedxml>=0.7.1 iso3166>=2.1.1 -protobuf==3.20.1 -open-simulation-interface @ git+https://github.com/OpenSimulationInterface/open-simulation-interface.git@v3.6.0 +protobuf>=4.24.4 +open-simulation-interface @ git+https://github.com/OpenSimulationInterface/open-simulation-interface.git@v3.7.0-rc1 diff --git a/setup.py b/setup.py index 4e3c40b..6c492d7 100644 --- a/setup.py +++ b/setup.py @@ -49,8 +49,8 @@ "ruamel.yaml>=0.18.5", "defusedxml>=0.7.1", "iso3166>=2.1.1", - "protobuf==3.20.1", - "open-simulation-interface @ git+https://github.com/OpenSimulationInterface/open-simulation-interface.git@v3.6.0", + "protobuf==4.24.4", + "open-simulation-interface @ git+https://github.com/OpenSimulationInterface/open-simulation-interface.git@v3.7.0-rc1", ], entry_points={ "console_scripts": ["osivalidator=osivalidator.osi_general_validator:main"], diff --git a/tests/test_trace.py b/tests/test_trace.py deleted file mode 100644 index c10f4ad..0000000 --- a/tests/test_trace.py +++ /dev/null @@ -1,83 +0,0 @@ -"""Module for test class of OSIValidationRules class""" - -import unittest -from osivalidator.osi_trace import OSITrace -import subprocess - - -class TestDataContainer(unittest.TestCase): - """Test class of OSITrace class""" - - def setUp(self): - self.MESSAGE_LENGTH = 15 - - self.txt = OSITrace(buffer_size=1000000) - self.osi = OSITrace(buffer_size=1000000) - self.osi_nobuffer = OSITrace(buffer_size=0) - - self.txt.from_file( - path="data/20210818T150542Z_sv_312_50_one_moving_object.txt", - type_name="SensorView", - ) - self.osi.from_file( - path="data/20210818T150542Z_sv_312_50_one_moving_object.osi", - type_name="SensorView", - ) - self.osi_nobuffer.from_file( - path="data/20210818T150542Z_sv_312_50_one_moving_object.osi", - type_name="SensorView", - ) - - def tearDown(self): - self.txt.trace_file.close() - del self.txt - - self.osi.trace_file.close() - del self.osi - - self.osi_nobuffer.trace_file.close() - del self.osi_nobuffer - - def test_get_messages_in_index_range(self): - """Test getting messages in range""" - - for _ in self.txt.get_messages_in_index_range(0, self.MESSAGE_LENGTH): - pass - - for _ in self.osi.get_messages_in_index_range(0, self.MESSAGE_LENGTH): - pass - - for _ in self.osi_nobuffer.get_messages_in_index_range(0, self.MESSAGE_LENGTH): - pass - - def test_get_message_in_index(self): - """Test getting messages by index""" - - self.txt.get_message_by_index(0) - self.osi.get_message_by_index(0) - self.osi_nobuffer.get_message_by_index(0) - - def test_cache_messages(self): - """Test caching messages""" - - self.txt.cache_messages_in_index_range(0, self.MESSAGE_LENGTH) - self.osi.cache_messages_in_index_range(0, self.MESSAGE_LENGTH) - self.osi_nobuffer.cache_messages_in_index_range(0, self.MESSAGE_LENGTH) - - def test_accessing_cache_messages(self): - """Test accessing of cached messages""" - - self.txt.cache_messages_in_index_range(0, self.MESSAGE_LENGTH) - - for index in range(self.MESSAGE_LENGTH): - self.txt.get_message_by_index(index) - - self.osi.cache_messages_in_index_range(0, self.MESSAGE_LENGTH) - - for index in range(self.MESSAGE_LENGTH): - self.osi.get_message_by_index(index) - - self.osi_nobuffer.cache_messages_in_index_range(0, self.MESSAGE_LENGTH) - - for index in range(self.MESSAGE_LENGTH): - self.osi_nobuffer.get_message_by_index(index)