Skip to content

Commit

Permalink
Feature/diagnostics (#156)
Browse files Browse the repository at this point in the history
* Generate params for new diagnostics package

* Link sim rates to config objects or match number with todo note

* Added firmware version check

* Disable MCU diagnostics for A200
  • Loading branch information
hilary-luo authored Jan 28, 2025
1 parent 8510287 commit 0a0dae2
Show file tree
Hide file tree
Showing 2 changed files with 142 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ def __init__(self, sensor: BaseLidar2D) -> None:
self.MAXIMUM_ANGLE: sensor.max_angle,
self.MINIMUM_RANGE: 0.05,
self.MAXIMUM_RANGE: 25.0,
self.UPDATE_RATE: 50
self.UPDATE_RATE: 40 # TODO: link to clearpath_config property
})

class Lidar3dDescription(BaseDescription):
Expand All @@ -127,7 +127,7 @@ def __init__(self, sensor: BaseLidar3D) -> None:
self.MAXIMUM_ANGLE_V: 0.261799,
self.MINIMUM_RANGE: 0.9,
self.MAXIMUM_RANGE: 130.0,
self.UPDATE_RATE: 50
self.UPDATE_RATE: 20 # TODO: link to clearpath_config property
})

class ImuDescription(BaseDescription):
Expand All @@ -137,7 +137,7 @@ def __init__(self, sensor: BaseIMU) -> None:
super().__init__(sensor)

self.parameters.update({
self.UPDATE_RATE: 100
self.UPDATE_RATE: sensor.update_rate
})

class CameraDescription(BaseDescription):
Expand All @@ -152,14 +152,12 @@ def __init__(self, sensor: BaseCamera) -> None:

class AxisCameraDescription(CameraDescription):
MODEL = 'model'
UPDATE_RATE = 'update_rate'

def __init__(self, sensor: AxisCamera) -> None:
super().__init__(sensor)

self.parameters.update({
self.MODEL: sensor.device_type,
self.UPDATE_RATE: 15,
})

class IntelRealsenseDescription(CameraDescription):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,23 +31,38 @@
# of Clearpath Robotics.
import os

from apt import Cache

from clearpath_config.clearpath_config import ClearpathConfig
from clearpath_config.common.types.platform import Platform
from clearpath_config.common.utils.dictionary import merge_dict, replace_dict_items
from clearpath_config.sensors.types.cameras import BaseCamera, IntelRealsense
from clearpath_config.sensors.types.gps import BaseGPS
from clearpath_config.sensors.types.imu import BaseIMU, PhidgetsSpatial
from clearpath_config.sensors.types.lidars_2d import BaseLidar2D
from clearpath_config.sensors.types.lidars_3d import BaseLidar3D
from clearpath_config.sensors.types.sensor import BaseSensor
from clearpath_generator_common.common import Package, ParamFile
from clearpath_generator_common.param.writer import ParamWriter
from clearpath_generator_common.ros import ROS_DISTRO


class PlatformParam():
CONTROL = 'control'
DIAGNOSTIC_AGGREGATOR = 'diagnostic_aggregator'
DIAGNOSTIC_UPDATER = 'diagnostic_updater'
IMU_FILTER = 'imu_filter'
LOCALIZATION = 'localization'
TELEOP_INTERACTIVE_MARKERS = 'teleop_interactive_markers'
TELEOP_JOY = 'teleop_joy'
TWIST_MUX = 'twist_mux'

NOT_APPLICABLE = 'not_applicable'

PARAMETERS = [
CONTROL,
DIAGNOSTIC_AGGREGATOR,
DIAGNOSTIC_UPDATER,
IMU_FILTER,
LOCALIZATION,
TELEOP_INTERACTIVE_MARKERS,
Expand All @@ -57,6 +72,7 @@ class PlatformParam():

class BaseParam():
CLEARPATH_CONTROL = 'clearpath_control'
CLEARPATH_DIAGNOSTICS = 'clearpath_diagnostics'

def __init__(self,
parameter: str,
Expand Down Expand Up @@ -169,6 +185,127 @@ def __init__(self,
self.default_parameter_file_package = self.clearpath_control_package
self.default_parameter_file_path = 'config'

class DiagnosticsAggregatorParam(BaseParam):
"""Parameter file that decides the aggregation of the diagnostics data for display."""

def __init__(self,
parameter: str,
clearpath_config: ClearpathConfig,
param_path: str) -> None:
super().__init__(parameter, clearpath_config, param_path)
self.default_parameter_file_package = Package(self.CLEARPATH_DIAGNOSTICS)
self.default_parameter_file_path = 'config'

class DiagnosticsUpdaterParam(BaseParam):
"""Parameter file for Clearpath Diagnostics indicating which topics to monitor."""

DIAGNOSTIC_UPDATER_NODE = 'clearpath_diagnostic_updater'

def __init__(self,
parameter: str,
clearpath_config: ClearpathConfig,
param_path: str) -> None:
super().__init__(parameter, clearpath_config, param_path)
self.default_parameter_file_package = Package(self.CLEARPATH_DIAGNOSTICS)
self.default_parameter_file_path = 'config'
self.diag_dict = {}

def generate_parameters(self, use_sim_time: bool = False) -> None:
super().generate_parameters(use_sim_time)

# Read the default parameter file
self.default_param_file = ParamFile(
name=self.default_parameter,
package=self.default_parameter_file_package,
path=self.default_parameter_file_path,
parameters={}
)
self.default_param_file.read()

# Initialize parameters with the default parameters
self.param_file.parameters = self.default_param_file.parameters

# Update parameters based on the robot.yaml
platform_model = self.clearpath_config.get_platform_model()
self.param_file.update({self.DIAGNOSTIC_UPDATER_NODE: {
'serial_number': self.clearpath_config.get_serial_number(),
'platform_model': platform_model}})

if use_sim_time:
latest_apt_firmware_version = 'simulated'
installed_apt_firmware_version = 'simulated'
elif platform_model == Platform.A200:
latest_apt_firmware_version = PlatformParam.NOT_APPLICABLE
installed_apt_firmware_version = PlatformParam.NOT_APPLICABLE
else:
# Check latest firmware version available and save it in the config
cache = Cache()
latest_apt_firmware_version = 'not_found'
installed_apt_firmware_version = 'none'
try:
pkg = cache[f'ros-{ROS_DISTRO}-clearpath-firmware']
latest_apt_firmware_version = pkg.versions[0].version.split('-')[0]
if (pkg.is_installed):
installed_apt_firmware_version = pkg.installed.version.split('-')[0]
except KeyError:
print(f'\033[93mWarning: ros-{ROS_DISTRO}-clearpath-firmware'
' package not found\033[0m')

self.param_file.update({self.DIAGNOSTIC_UPDATER_NODE: {
'ros_distro': ROS_DISTRO,
'latest_apt_firmware_version': latest_apt_firmware_version,
'installed_apt_firmware_version': installed_apt_firmware_version}})

# List all topics to be monitored from each launched sensor
for sensor in self.clearpath_config.sensors.get_all_sensors():

if not sensor.launch_enabled:
continue

match sensor:
case IntelRealsense():
if sensor.color_enabled:
self.add_topic(sensor, sensor.TOPICS.COLOR_IMAGE)
if sensor.depth_enabled:
self.add_topic(sensor, sensor.TOPICS.DEPTH_IMAGE)
if sensor.pointcloud_enabled:
self.add_topic(sensor, sensor.TOPICS.POINTCLOUD)

case BaseCamera():
self.add_topic(sensor, sensor.TOPICS.COLOR_IMAGE)

case BaseLidar2D():
self.add_topic(sensor, sensor.TOPICS.SCAN)

case BaseLidar3D():
self.add_topic(sensor, sensor.TOPICS.SCAN)
self.add_topic(sensor, sensor.TOPICS.POINTS)

case PhidgetsSpatial():
self.add_topic(sensor, sensor.TOPICS.RAW_DATA),
self.add_topic(sensor, sensor.TOPICS.MAG),

case BaseIMU():
self.add_topic(sensor, sensor.TOPICS.DATA)
self.add_topic(sensor, sensor.TOPICS.MAG)

case BaseGPS():
self.add_topic(sensor, sensor.TOPICS.FIX)

# Output the list of topics into the parameter file
self.param_file.update({self.DIAGNOSTIC_UPDATER_NODE: {'topics': self.diag_dict}})

def add_topic(self, sensor: BaseSensor, topic_key: str) -> None:
"""
Add a sensor topic to the dictionary using the topic key string.
:param sensor: The sensor object from which the topic info will be gotten
:param topic_key: The key used to identify the topic to be monitored
"""
self.diag_dict[sensor.get_topic_name(topic_key, local=True)] = {
'type': sensor.get_topic_type(topic_key),
'rate': float(sensor.get_topic_rate(topic_key))}

class LocalizationParam(BaseParam):
EKF_NODE = 'ekf_node'
imu_config = [False, False, False,
Expand Down Expand Up @@ -247,6 +384,8 @@ def __init__(self,

PARAMETER = {
IMU_FILTER: ImuFilterParam,
DIAGNOSTIC_AGGREGATOR: DiagnosticsAggregatorParam,
DIAGNOSTIC_UPDATER: DiagnosticsUpdaterParam,
LOCALIZATION: LocalizationParam,
TELEOP_JOY: TeleopJoyParam,
TWIST_MUX: TwistMuxParam,
Expand Down

0 comments on commit 0a0dae2

Please sign in to comment.