diff --git a/clearpath_generator_common/clearpath_generator_common/description/sensors.py b/clearpath_generator_common/clearpath_generator_common/description/sensors.py index f1fe544..68a90f2 100644 --- a/clearpath_generator_common/clearpath_generator_common/description/sensors.py +++ b/clearpath_generator_common/clearpath_generator_common/description/sensors.py @@ -101,7 +101,7 @@ def __init__(self, sensor: BaseLidar2D) -> None: self.MAXIMUM_ANGLE: sensor.max_angle, self.MINIMUM_RANGE: 0.05, self.MAXIMUM_RANGE: 25.0, - self.UPDATE_RATE: 50 + self.UPDATE_RATE: 40 # TODO: link to clearpath_config property }) class Lidar3dDescription(BaseDescription): @@ -127,7 +127,7 @@ def __init__(self, sensor: BaseLidar3D) -> None: self.MAXIMUM_ANGLE_V: 0.261799, self.MINIMUM_RANGE: 0.9, self.MAXIMUM_RANGE: 130.0, - self.UPDATE_RATE: 50 + self.UPDATE_RATE: 20 # TODO: link to clearpath_config property }) class ImuDescription(BaseDescription): @@ -137,7 +137,7 @@ def __init__(self, sensor: BaseIMU) -> None: super().__init__(sensor) self.parameters.update({ - self.UPDATE_RATE: 100 + self.UPDATE_RATE: sensor.update_rate }) class CameraDescription(BaseDescription): @@ -152,14 +152,12 @@ def __init__(self, sensor: BaseCamera) -> None: class AxisCameraDescription(CameraDescription): MODEL = 'model' - UPDATE_RATE = 'update_rate' def __init__(self, sensor: AxisCamera) -> None: super().__init__(sensor) self.parameters.update({ self.MODEL: sensor.device_type, - self.UPDATE_RATE: 15, }) class IntelRealsenseDescription(CameraDescription): diff --git a/clearpath_generator_common/clearpath_generator_common/param/platform.py b/clearpath_generator_common/clearpath_generator_common/param/platform.py index 7461663..b3edffc 100644 --- a/clearpath_generator_common/clearpath_generator_common/param/platform.py +++ b/clearpath_generator_common/clearpath_generator_common/param/platform.py @@ -31,23 +31,38 @@ # of Clearpath Robotics. import os +from apt import Cache + from clearpath_config.clearpath_config import ClearpathConfig from clearpath_config.common.types.platform import Platform from clearpath_config.common.utils.dictionary import merge_dict, replace_dict_items +from clearpath_config.sensors.types.cameras import BaseCamera, IntelRealsense +from clearpath_config.sensors.types.gps import BaseGPS +from clearpath_config.sensors.types.imu import BaseIMU, PhidgetsSpatial +from clearpath_config.sensors.types.lidars_2d import BaseLidar2D +from clearpath_config.sensors.types.lidars_3d import BaseLidar3D +from clearpath_config.sensors.types.sensor import BaseSensor from clearpath_generator_common.common import Package, ParamFile from clearpath_generator_common.param.writer import ParamWriter +from clearpath_generator_common.ros import ROS_DISTRO class PlatformParam(): CONTROL = 'control' + DIAGNOSTIC_AGGREGATOR = 'diagnostic_aggregator' + DIAGNOSTIC_UPDATER = 'diagnostic_updater' IMU_FILTER = 'imu_filter' LOCALIZATION = 'localization' TELEOP_INTERACTIVE_MARKERS = 'teleop_interactive_markers' TELEOP_JOY = 'teleop_joy' TWIST_MUX = 'twist_mux' + NOT_APPLICABLE = 'not_applicable' + PARAMETERS = [ CONTROL, + DIAGNOSTIC_AGGREGATOR, + DIAGNOSTIC_UPDATER, IMU_FILTER, LOCALIZATION, TELEOP_INTERACTIVE_MARKERS, @@ -57,6 +72,7 @@ class PlatformParam(): class BaseParam(): CLEARPATH_CONTROL = 'clearpath_control' + CLEARPATH_DIAGNOSTICS = 'clearpath_diagnostics' def __init__(self, parameter: str, @@ -169,6 +185,127 @@ def __init__(self, self.default_parameter_file_package = self.clearpath_control_package self.default_parameter_file_path = 'config' + class DiagnosticsAggregatorParam(BaseParam): + """Parameter file that decides the aggregation of the diagnostics data for display.""" + + def __init__(self, + parameter: str, + clearpath_config: ClearpathConfig, + param_path: str) -> None: + super().__init__(parameter, clearpath_config, param_path) + self.default_parameter_file_package = Package(self.CLEARPATH_DIAGNOSTICS) + self.default_parameter_file_path = 'config' + + class DiagnosticsUpdaterParam(BaseParam): + """Parameter file for Clearpath Diagnostics indicating which topics to monitor.""" + + DIAGNOSTIC_UPDATER_NODE = 'clearpath_diagnostic_updater' + + def __init__(self, + parameter: str, + clearpath_config: ClearpathConfig, + param_path: str) -> None: + super().__init__(parameter, clearpath_config, param_path) + self.default_parameter_file_package = Package(self.CLEARPATH_DIAGNOSTICS) + self.default_parameter_file_path = 'config' + self.diag_dict = {} + + def generate_parameters(self, use_sim_time: bool = False) -> None: + super().generate_parameters(use_sim_time) + + # Read the default parameter file + self.default_param_file = ParamFile( + name=self.default_parameter, + package=self.default_parameter_file_package, + path=self.default_parameter_file_path, + parameters={} + ) + self.default_param_file.read() + + # Initialize parameters with the default parameters + self.param_file.parameters = self.default_param_file.parameters + + # Update parameters based on the robot.yaml + platform_model = self.clearpath_config.get_platform_model() + self.param_file.update({self.DIAGNOSTIC_UPDATER_NODE: { + 'serial_number': self.clearpath_config.get_serial_number(), + 'platform_model': platform_model}}) + + if use_sim_time: + latest_apt_firmware_version = 'simulated' + installed_apt_firmware_version = 'simulated' + elif platform_model == Platform.A200: + latest_apt_firmware_version = PlatformParam.NOT_APPLICABLE + installed_apt_firmware_version = PlatformParam.NOT_APPLICABLE + else: + # Check latest firmware version available and save it in the config + cache = Cache() + latest_apt_firmware_version = 'not_found' + installed_apt_firmware_version = 'none' + try: + pkg = cache[f'ros-{ROS_DISTRO}-clearpath-firmware'] + latest_apt_firmware_version = pkg.versions[0].version.split('-')[0] + if (pkg.is_installed): + installed_apt_firmware_version = pkg.installed.version.split('-')[0] + except KeyError: + print(f'\033[93mWarning: ros-{ROS_DISTRO}-clearpath-firmware' + ' package not found\033[0m') + + self.param_file.update({self.DIAGNOSTIC_UPDATER_NODE: { + 'ros_distro': ROS_DISTRO, + 'latest_apt_firmware_version': latest_apt_firmware_version, + 'installed_apt_firmware_version': installed_apt_firmware_version}}) + + # List all topics to be monitored from each launched sensor + for sensor in self.clearpath_config.sensors.get_all_sensors(): + + if not sensor.launch_enabled: + continue + + match sensor: + case IntelRealsense(): + if sensor.color_enabled: + self.add_topic(sensor, sensor.TOPICS.COLOR_IMAGE) + if sensor.depth_enabled: + self.add_topic(sensor, sensor.TOPICS.DEPTH_IMAGE) + if sensor.pointcloud_enabled: + self.add_topic(sensor, sensor.TOPICS.POINTCLOUD) + + case BaseCamera(): + self.add_topic(sensor, sensor.TOPICS.COLOR_IMAGE) + + case BaseLidar2D(): + self.add_topic(sensor, sensor.TOPICS.SCAN) + + case BaseLidar3D(): + self.add_topic(sensor, sensor.TOPICS.SCAN) + self.add_topic(sensor, sensor.TOPICS.POINTS) + + case PhidgetsSpatial(): + self.add_topic(sensor, sensor.TOPICS.RAW_DATA), + self.add_topic(sensor, sensor.TOPICS.MAG), + + case BaseIMU(): + self.add_topic(sensor, sensor.TOPICS.DATA) + self.add_topic(sensor, sensor.TOPICS.MAG) + + case BaseGPS(): + self.add_topic(sensor, sensor.TOPICS.FIX) + + # Output the list of topics into the parameter file + self.param_file.update({self.DIAGNOSTIC_UPDATER_NODE: {'topics': self.diag_dict}}) + + def add_topic(self, sensor: BaseSensor, topic_key: str) -> None: + """ + Add a sensor topic to the dictionary using the topic key string. + + :param sensor: The sensor object from which the topic info will be gotten + :param topic_key: The key used to identify the topic to be monitored + """ + self.diag_dict[sensor.get_topic_name(topic_key, local=True)] = { + 'type': sensor.get_topic_type(topic_key), + 'rate': float(sensor.get_topic_rate(topic_key))} + class LocalizationParam(BaseParam): EKF_NODE = 'ekf_node' imu_config = [False, False, False, @@ -247,6 +384,8 @@ def __init__(self, PARAMETER = { IMU_FILTER: ImuFilterParam, + DIAGNOSTIC_AGGREGATOR: DiagnosticsAggregatorParam, + DIAGNOSTIC_UPDATER: DiagnosticsUpdaterParam, LOCALIZATION: LocalizationParam, TELEOP_JOY: TeleopJoyParam, TWIST_MUX: TwistMuxParam,