UniSmith MQTT 集成采用一个配置对应一个主题的设计,这样可以:
- ✅ 每个主题独立配置字段映射
- ✅ 每个主题独立配置分发目标
- ✅ 更清晰的配置管理和错误隔离
- ✅ 符合单一职责原则
etcd key: mqtt-temperature-sensor
{
"id": "mqtt-temperature-sensor",
"type": "mqtt",
"enabled": true,
"topic": "sensors/temperature",
"qos": 1,
"isArrayData": false,
"fieldStart": "",
"fieldConfigs": [
{
"fieldKey": "temperature",
"fieldPath": "temp",
"fieldType": "string",
"fieldFormat": "",
"fieldTimezone": ""
},
{
"fieldKey": "timestamp",
"fieldPath": "timestamp",
"fieldType": "string",
"fieldFormat": "2006-01-02T15:04:05Z",
"fieldTimezone": "Asia/Shanghai"
},
{
"fieldKey": "unit",
"fieldPath": "unit",
"fieldType": "string",
"fieldFormat": "",
"fieldTimezone": ""
}
],
"dispatchConfig": [
{
"type": "skylark_flows",
"skylarkAppID": "temperature-app",
"skylarkFlowsID": 12345
}
]
}对应的 MQTT 消息格式:
{
"temp": 23.5,
"timestamp": "2025-01-15T10:30:00Z",
"unit": "C",
"device_id": "temp_001"
}etcd key: mqtt-humidity-sensor
{
"id": "mqtt-humidity-sensor",
"type": "mqtt",
"enabled": true,
"topic": "sensors/humidity",
"qos": 0,
"isArrayData": false,
"fieldStart": "",
"fieldConfigs": [
{
"fieldKey": "humidity",
"fieldPath": "humidity_value",
"fieldType": "string",
"fieldFormat": "",
"fieldTimezone": ""
},
{
"fieldKey": "location",
"fieldPath": "sensor_location",
"fieldType": "string",
"fieldFormat": "",
"fieldTimezone": ""
}
],
"dispatchConfig": [
{
"type": "skylark_forms",
"skylarkAppID": "humidity-app",
"skylarkFormsID": 67890
}
]
}对应的 MQTT 消息格式:
{
"humidity_value": 65,
"sensor_location": "warehouse_a",
"battery_level": 87
}etcd key: mqtt-batch-sensors
{
"id": "mqtt-batch-sensors",
"type": "mqtt",
"enabled": true,
"topic": "sensors/batch",
"qos": 1,
"isArrayData": true,
"fieldStart": "readings",
"fieldConfigs": [
{
"fieldKey": "sensor_id",
"fieldPath": "id",
"fieldType": "string",
"fieldFormat": "",
"fieldTimezone": ""
},
{
"fieldKey": "value",
"fieldPath": "val",
"fieldType": "string",
"fieldFormat": "",
"fieldTimezone": ""
}
],
"dispatchConfig": [
{
"type": "log"
}
]
}对应的 MQTT 消息格式:
{
"timestamp": "2025-01-15T10:30:00Z",
"readings": [
{"id": "temp_001", "val": 23.5},
{"id": "temp_002", "val": 24.1},
{"id": "temp_003", "val": 22.8}
]
}etcd key: mqtt-all-device-status
{
"id": "mqtt-all-device-status",
"type": "mqtt",
"enabled": true,
"topic": "devices/+/status",
"qos": 0,
"isArrayData": false,
"fieldStart": "",
"fieldConfigs": [
{
"fieldKey": "device_status",
"fieldPath": "status",
"fieldType": "string",
"fieldFormat": "",
"fieldTimezone": ""
},
{
"fieldKey": "last_seen",
"fieldPath": "last_seen",
"fieldType": "string",
"fieldFormat": "2006-01-02T15:04:05Z",
"fieldTimezone": "Asia/Shanghai"
}
],
"dispatchConfig": [
{
"type": "skylark_flows",
"skylarkAppID": "device-monitor",
"skylarkFlowsID": 11111
}
]
}匹配的主题:
devices/sensor001/statusdevices/gateway002/statusdevices/camera003/status
# 温度数据
mosquitto_pub -h localhost -t "sensors/temperature" -m '{"temp":23.5,"timestamp":"2025-01-15T10:30:00Z","unit":"C","device_id":"temp_001"}'
# 湿度数据
mosquitto_pub -h localhost -t "sensors/humidity" -m '{"humidity_value":65,"sensor_location":"warehouse_a","battery_level":87}'
# 批量数据
mosquitto_pub -h localhost -t "sensors/batch" -m '{"timestamp":"2025-01-15T10:30:00Z","readings":[{"id":"temp_001","val":23.5},{"id":"temp_002","val":24.1}]}'
# 设备状态(通配符匹配)
mosquitto_pub -h localhost -t "devices/sensor001/status" -m '{"status":"online","last_seen":"2025-01-15T10:30:00Z"}'# 监听所有传感器数据
mosquitto_sub -h localhost -t "sensors/#"
# 监听特定设备状态
mosquitto_sub -h localhost -t "devices/+/status"# 使用 etcdctl 添加配置
etcdctl put mqtt-temperature-sensor '{"id":"mqtt-temperature-sensor",...}'# 查看所有 MQTT 配置
etcdctl get --prefix mqtt-
# 查看特定配置
etcdctl get mqtt-temperature-sensor# 删除配置(会自动取消订阅)
etcdctl del mqtt-temperature-sensor- 主题命名建议:使用层级结构,如
sensors/temperature、devices/gateway001/status - QoS 选择:
- QoS 0:最多一次,适用于不重要的数据
- QoS 1:至少一次,适用于重要数据
- QoS 2:只有一次,适用于关键数据
- 通配符使用:
+:单级通配符#:多级通配符(必须在末尾)
- 字段映射:确保
fieldPath与实际 MQTT 消息结构一致 - 数据类型:所有提取的字段当前都按 string 处理