Skip to content

Commit

Permalink
updates
Browse files Browse the repository at this point in the history
  • Loading branch information
ampledata committed Dec 31, 2024
1 parent 53612d3 commit 03eec9d
Show file tree
Hide file tree
Showing 7 changed files with 367 additions and 207 deletions.
1 change: 1 addition & 0 deletions debian/dronecot.service
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ User=dronecot
ExecStart=/usr/bin/dronecot
RuntimeDirectory=dronecot
SyslogIdentifier=dronecot
EnvironmentFile=/etc/aryaos/aryaos-config.txt
EnvironmentFile=/etc/default/dronecot
Type=simple
Restart=always
Expand Down
2 changes: 2 additions & 0 deletions dronecot/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
DEFAULT_MQTT_TOPIC,
DEFAULT_GPS_INFO_CMD,
DEFAULT_SENSOR_COT_TYPE,
DEFAULT_SENSOR_ID,
DEFAULT_SENSOR_PAYLOAD_TYPE,
)
from .functions import ( # NOQA
xml_to_cot,
Expand Down
120 changes: 85 additions & 35 deletions dronecot/classes.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,9 @@

import base64
import json
import os

from typing import Optional
from typing import Optional, Union

import lzma
import asyncio_mqtt as aiomqtt
Expand All @@ -38,50 +39,87 @@ def __init__(self, queue, config):
super().__init__(queue, config)
self.sensor_positions = {}

async def parse_message(self, message):
async def handle_data(self, data: Union[dict, aiomqtt.Message]) -> None:
"""Handle Open Drone ID message from MQTT.
Parameters
----------
data : `list[dict, aiomqtt.Message]`
List of craft data as key/value arrays.
"""
self._logger.debug("Handling data: %s", data)
if isinstance(data, aiomqtt.Message):
await self.parse_message(data)

async def parse_message(self, message: aiomqtt.Message):
"""Parse Open Drone ID message from MQTT."""
topic = message.topic.value
self._logger.debug("Message topic: %s", topic)

_payload = message.payload
if not isinstance(_payload, bytes):
return

payload = await self.decode_payload(_payload)
if not payload:
self._logger.error("Failed to decode message payload")
return

await self.process_payload(payload, topic)

async def decode_payload(self, payload: bytes) -> Optional[str]:
"""Decode the MQTT message payload, which could be either plain JSON or LZMA compressed JSON."""
_payload = None

try:
# not compressed
payload = message.payload.decode()
# remove newline (\n) char or \0 char as it will prevent decoding of json
if ord(payload[-1:]) == 0 or ord(payload[-1:]) == 10:
payload = payload[:-1]
# Not compressed
_payload = payload.decode()
# Remove newline (\n) char or \0 char as it will prevent decoding of JSON
if ord(payload[-1:]) in {0, 10}:
_payload = _payload[:-1]
except (UnicodeDecodeError, AttributeError):
# lzma compressed
payload = lzma.decompress(message.payload).decode()
# remove \0 char as it will prevent decoding of json
if ord(payload[-1:]) == 0:
payload = payload[:-1]

self._logger.debug("Message payload: %s", payload)

position = 0
while position != -1:
position = payload.find("}{")
if position == -1:
json_obj = json.loads(payload)
else:
message_payload = payload[0 : position + 1]
payload = payload[position + 1 :]
json_obj = json.loads(message_payload)

if "position" in topic:
json_obj["topic"] = topic
# LZMA compressed
try:
_payload = lzma.decompress(payload).decode()
except lzma.LZMAError as e:
self._logger.error("LZMA decompression error: %s", e)
return None
# Remove \0 char as it will prevent decoding of JSON
if ord(_payload[-1:]) == 0:
_payload = _payload[:-1]

return _payload

async def process_payload(self, payload: str, topic: str) -> None:
"""Process the payload into individual JSON objects and handle them."""
json_end_position = 0
while json_end_position != -1:
message_payload = payload

# Look for the next JSON object in the payload, which is (sometimes)
# separated by "}{". If found, split the payload at that position.
json_end_position = payload.find("}{")
if json_end_position != -1:
message_payload = payload[0 : json_end_position + 1]
# Start payload over at the next JSON object
payload = payload[json_end_position + 1 :]

json_obj = json.loads(message_payload)
json_obj["topic"] = topic

if json_obj.get("position"):
await self.handle_sensor_position(json_obj)
elif json_obj.get("data"):
json_obj["topic"] = topic
await self.handle_sensor_data(json_obj)
elif json_obj.get("status"):
json_obj["topic"] = topic
await self.handle_sensor_status(json_obj)

async def handle_sensor_position(self, message):
"""Process sensor position messages."""
topic = message.get("topic")
sensor = topic.split("/")[2]
self.sensor_positions[sensor] = {
"sensor ID": sensor,
"lat": message.get("lat"),
"lon": message.get("lon"),
"altHAE": message.get("altHAE"),
Expand All @@ -92,9 +130,16 @@ async def handle_sensor_position(self, message):
"speed": message.get("speed"),
}

async def handle_sensor_data(self, message):
async def handle_sensor_data(self, message: dict):
"""Process decoded data from the sensor."""
data = message.get("data")
if os.getenv("DEBUG"):
with open("messages.json", "a") as f:
json.dump(message, f)

protocol = message.get("protocol")
if not protocol or str(protocol) != "1.0":
return
data = message.get("data", {})
uasdata = data.get("UASdata")
if not uasdata:
return
Expand All @@ -103,9 +148,12 @@ async def handle_sensor_data(self, message):

valid_blocks = dronecot.decode_valid_blocks(uasdata, dronecot.ODIDValidBlocks())
pl = dronecot.parse_payload(uasdata, valid_blocks)
del data["UASdata"]
pl["data"] = data
pl["topic"] = message["topic"]
await self.put_queue(pl)

async def handle_sensor_status(self, message):
async def handle_sensor_status(self, message: dict):
"""Process sensor status messages."""
status = message.get("status")
if not status:
Expand All @@ -116,6 +164,7 @@ async def handle_sensor_status(self, message):

position = self.sensor_positions.get(sensor) or {}
pl = position | message
pl["sensor ID"] = sensor
self._logger.info("Publishing status for sensor: %s", sensor)
await self.put_queue(pl)

Expand Down Expand Up @@ -146,8 +195,8 @@ async def run(self, _=-1) -> None:
async with client.messages() as messages:
await client.subscribe(topic)
async for message in messages:
self._logger.debug("Received message: %s", message)
await self.parse_message(message)
self._logger.debug("Received MQTT message: %s", message)
await self.handle_data(message)


class RIDWorker(pytak.QueueWorker):
Expand All @@ -168,7 +217,8 @@ async def handle_data(self, data: dict) -> None:
List of craft data as key/value arrays.
"""
self._logger.debug("Handling data: %s", data)
if "status" in data:

if "status" in data and "position" not in data.get("topic", ""):
event = dronecot.xml_to_cot(data, self.config, "sensor_status_to_cot")
await self.put_queue(event)
else:
Expand Down
3 changes: 3 additions & 0 deletions dronecot/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,3 +23,6 @@
DEFAULT_MQTT_TOPIC: str = "#"
DEFAULT_GPS_INFO_CMD: str = "gpspipe --json -n 5"
DEFAULT_SENSOR_COT_TYPE: str = "a-f-G-E-S-E"

DEFAULT_SENSOR_ID: str = "Uknown-Sensor-ID"
DEFAULT_SENSOR_PAYLOAD_TYPE: str = "Uknown-Sensor-Payload-Type"
65 changes: 54 additions & 11 deletions dronecot/functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
"""DroneCOT Functions."""

import asyncio
import base64
import json
import subprocess
import xml.etree.ElementTree as ET
Expand Down Expand Up @@ -204,21 +205,21 @@ def rid_uas_to_cot_xml( # NOQA pylint: disable=too-many-locals,too-many-branche

config = config or {}
remarks_fields: list = []
src_data = data.get("data", {})

uasid = data.get("BasicID", data.get("BasicID_0", "Unknown-BasicID_0"))
op_id = data.get("OperatorID", uasid)
op_uid = f"RID.{op_id}.op"

cot_uid: str = f"RID.{uasid}.uas"
cot_type: str = "a-n-A-M-H-Q"

cot_stale: int = int(config.get("COT_STALE", pytak.DEFAULT_COT_STALE))
cot_host_id: str = config.get("COT_HOST_ID", pytak.DEFAULT_HOST_ID)

cotx = ET.Element("_dronecot_")
cotx.set("cot_host_id", cot_host_id)
remarks_fields.append(f"UAS: {uasid}")
remarks_fields.append(f"Operator: {op_id}")

remarks_fields.append(f"OperatorID={op_id}")
cotx.set("OperatorID", op_id)
callsign = uasid

contact: ET.Element = ET.Element("contact")
Expand All @@ -228,17 +229,31 @@ def rid_uas_to_cot_xml( # NOQA pylint: disable=too-many-locals,too-many-branche
track.set("speed", str(data.get("SpeedHorizontal", 0)))

link: ET.Element = ET.Element("link")
link.set("uid", op_id)
link.set("uid", op_uid)
link.set("production_time", pytak.cot_time())
link.set("type", "a-n-G")
link.set("parent_callsign", op_id)
link.set("relation", "p-p")

cuas: ET.Element = ET.Element("__cuas")
sensor_id = src_data.get(
"sensor ID", src_data.get("sensor_id", dronecot.DEFAULT_SENSOR_ID)
)
cuas.set("sensor_id", sensor_id)
cuas.set("rssi", str(src_data.get("RSSI")))
cuas.set("channel", str(src_data.get("channel")))
cuas.set("timestamp", str(src_data.get("timestamp")))
cuas.set("mac_address", str(src_data.get("MAC address")))
cuas.set("type", str(src_data.get("type", dronecot.DEFAULT_SENSOR_PAYLOAD_TYPE)))
cuas.set("host_id", cot_host_id)
cuas.set("rid_op", op_id)
cuas.set("rid_uas", uasid)

detail = ET.Element("detail")
detail.append(contact)
detail.append(track)
detail.append(cotx)
detail.append(link)
detail.append(cuas)

remarks = ET.Element("remarks")
remarks_fields.append(f"{cot_host_id}")
Expand Down Expand Up @@ -280,10 +295,12 @@ def sensor_status_to_cot( # NOQA pylint: disable=too-many-locals,too-many-branc
status = data.get("status") or {}

if lat is None or lon is None:
gps_info = get_gps_info(config)
if gps_info:
print(gps_info)
else:
gps_info = None
try:
gps_info = get_gps_info(config)
except Exception as e:
print(f"Unable to get GPS fix: {e}")
if not gps_info:
return None

if lat is None or lon is None:
Expand All @@ -292,7 +309,9 @@ def sensor_status_to_cot( # NOQA pylint: disable=too-many-locals,too-many-branc
config = config or {}
remarks_fields: list = []

sensor_id = config.get("SENSOR_ID", status.get("sensor ID"))
sensor_id = data.get(
"sensor ID", config.get("SENSOR_ID", dronecot.DEFAULT_SENSOR_ID)
)

cot_uid: str = f"SNSTAC-CUAS.{sensor_id}"
cot_type: str = config.get("SENSOR_COT_TYPE", dronecot.DEFAULT_SENSOR_COT_TYPE)
Expand Down Expand Up @@ -386,3 +405,27 @@ def get_gps_info(config) -> Optional[dict]:

gps_info = json.loads(gps_data)
return gps_info


def parse_sensor_data(data):
"""Process decoded data from the sensor."""
message = data
protocol = message.get("protocol")
if not protocol or str(protocol) != "1.0":
return

data = message.get("data")
uasdata = data.get("UASdata")
if not uasdata:
return

uasdata = base64.b64decode(uasdata)
valid_blocks = dronecot.decode_valid_blocks(uasdata, dronecot.ODIDValidBlocks())

pl = dronecot.parse_payload(uasdata, valid_blocks)

# del data["UASdata"]
pl["data"] = data
pl["topic"] = message["topic"]

return pl
4 changes: 4 additions & 0 deletions dronecot/open_drone_id.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,8 @@ def parse_payload(payload, valid_blocks) -> dict:
if valid_blocks.AuthValid[x] == 1:
pl = pl | parse_AuthPage(payload, x)

print("Parsed payload")
print(pl)
return pl


Expand Down Expand Up @@ -178,6 +180,7 @@ def parse_Location(payload):
[Direction] = struct.unpack(
"f", payload[Location_start_byte + 4 : Location_start_byte + 4 + 4]
)

if Direction > 360 or Direction < 0:
Direction = float("NaN")

Expand All @@ -201,6 +204,7 @@ def parse_Location(payload):
[Latitude] = struct.unpack(
"d", payload[Location_start_byte + 16 : Location_start_byte + 16 + 8]
)
print("Latitude: ", Latitude)
if Latitude == 0.0 or Latitude > 90.0 or Latitude < -90.0:
Latitude = float("NaN")
[Longitude] = struct.unpack(
Expand Down
Loading

0 comments on commit 03eec9d

Please sign in to comment.