From d57bd8aaba70f99ab9a1f37e3c743c76afa8214b Mon Sep 17 00:00:00 2001 From: Greg Albrecht Date: Wed, 18 Sep 2024 08:50:47 -0700 Subject: [PATCH] Added SimpleCOTEvent, COTEvent classes and cot2xml function. --- pytak/__init__.py | 9 +- pytak/classes.py | 96 +++++++++++++++++++--- pytak/crypto_functions.py | 6 +- pytak/functions.py | 47 ++++++++++- tests/test_classes.py | 167 ++++++++++++++++++++++++++++++++++++-- tests/test_commands.py | 26 ++++++ tests/test_functions.py | 43 ++++++++++ 7 files changed, 365 insertions(+), 29 deletions(-) create mode 100644 tests/test_commands.py diff --git a/pytak/__init__.py b/pytak/__init__.py index 3283c87..eff8eb3 100644 --- a/pytak/__init__.py +++ b/pytak/__init__.py @@ -17,7 +17,7 @@ """Python Team Awareness Kit (PyTAK) Module.""" -__version__ = "7.0.2" +__version__ = "7.1.0" from .constants import ( # NOQA LOG_LEVEL, @@ -57,16 +57,19 @@ RXWorker, QueueWorker, CLITool, + SimpleCOTEvent, + COTEvent, ) -from .functions import ( +from .functions import ( # NOQA split_host, parse_url, hello_event, cot_time, gen_cot, gen_cot_xml, -) # NOQA + cot2xml, +) from .client_functions import ( # NOQA create_udp_client, diff --git a/pytak/classes.py b/pytak/classes.py index 94f997e..cf3f1ce 100644 --- a/pytak/classes.py +++ b/pytak/classes.py @@ -17,8 +17,8 @@ """PyTAK Class Definitions.""" +import abc import asyncio -import importlib.util import ipaddress import logging import multiprocessing as mp @@ -27,19 +27,20 @@ import xml.etree.ElementTree as ET -from typing import Optional, Set, Union +from dataclasses import dataclass +from typing import Set, Union from configparser import ConfigParser, SectionProxy import pytak try: - import takproto + import takproto # type: ignore except ImportError: - pass + takproto = None -class Worker: # pylint: disable=too-few-public-methods +class Worker: """Meta class for all other Worker Classes.""" _logger = logging.getLogger(__name__) @@ -74,7 +75,7 @@ def __init__( tak_proto_version = int(self.config.get("TAK_PROTO") or pytak.DEFAULT_TAK_PROTO) - if tak_proto_version > 0 and importlib.util.find_spec("takproto") is None: + if tak_proto_version > 0 and takproto is None: self._logger.warning( "TAK_PROTO is set to '%s', but the 'takproto' Python module is not installed.\n" "Try: python -m pip install pytak[with_takproto]\n" @@ -82,9 +83,7 @@ def __init__( tak_proto_version, ) - self.use_protobuf = tak_proto_version > 0 and importlib.util.find_spec( - "takproto" - ) + self.use_protobuf = tak_proto_version > 0 and takproto is not None async def fts_compat(self) -> None: """Apply FreeTAKServer (FTS) compatibility. @@ -100,9 +99,10 @@ async def fts_compat(self) -> None: self._logger.debug("COMPAT: Sleeping for %ss", sleep_period) await asyncio.sleep(sleep_period) + @abc.abstractmethod async def handle_data(self, data: bytes) -> None: """Handle data (placeholder method, please override).""" - raise NotImplementedError("Subclasses need to override this method") + pass async def run(self, number_of_iterations=-1): """Run this Thread, reads Data from Queue & passes data to next Handler.""" @@ -131,7 +131,7 @@ async def run(self, number_of_iterations=-1): number_of_iterations -= 1 -class TXWorker(Worker): # pylint: disable=too-few-public-methods +class TXWorker(Worker): """Works data queue and hands off to Protocol Workers. You should create an TXWorker Instance using the `pytak.txworker_factory()` @@ -190,7 +190,7 @@ async def send_data(self, data: bytes) -> None: self.writer.flush() -class RXWorker(Worker): # pylint: disable=too-few-public-methods +class RXWorker(Worker): """Async receive (input) queue worker. Reads events from a `pytak.protocol_factory()` reader and adds them to @@ -212,6 +212,11 @@ def __init__( self.reader: asyncio.Protocol = reader self.reader_queue = None + @abc.abstractmethod + async def handle_data(self, data: bytes) -> None: + """Handle data (placeholder method, please override).""" + pass + async def readcot(self): """Read CoT from the wire until we hit an event boundary.""" try: @@ -241,7 +246,7 @@ async def run(self, number_of_iterations=-1) -> None: self.queue.put_nowait(data) -class QueueWorker(Worker): # pylint: disable=too-few-public-methods +class QueueWorker(Worker): """Read non-CoT Messages from an async network client. (`asyncio.Protocol` or similar async network client) @@ -263,6 +268,11 @@ def __init__( super().__init__(queue, config) self._logger.info("Using COT_URL='%s'", self.config.get("COT_URL")) + @abc.abstractmethod + async def handle_data(self, data: bytes) -> None: + """Handle data (placeholder method, please override).""" + pass + async def put_queue( self, data: bytes, queue_arg: Union[asyncio.Queue, mp.Queue, None] = None ) -> None: @@ -408,3 +418,63 @@ async def run(self): for task in done: self._logger.info("Complete: %s", task) + + +@dataclass +class SimpleCOTEvent: + """CoT Event Dataclass.""" + + lat: Union[bytes, str, float, None] = None + lon: Union[bytes, str, float, None] = None + uid: Union[str, None] = None + stale: Union[float, int, None] = None + cot_type: Union[str, None] = None + + def __str__(self) -> str: + """Return a formatted string representation of the dataclass.""" + event = self.to_xml() + return ET.tostring(event, encoding="unicode") + + def to_bytes(self) -> bytes: + """Return the class as bytes.""" + event = self.to_xml() + return ET.tostring(event, encoding="utf-8") + + def to_xml(self) -> ET.Element: + """Return a CoT Event as an XML string.""" + cotevent = COTEvent( + lat=self.lat, + lon=self.lon, + uid=self.uid, + stale=self.stale, + cot_type=self.cot_type, + le=pytak.DEFAULT_COT_VAL, + ce=pytak.DEFAULT_COT_VAL, + hae=pytak.DEFAULT_COT_VAL, + ) + event = pytak.cot2xml(cotevent) + return event + + +@dataclass +class COTEvent(SimpleCOTEvent): + """COT Event Dataclass.""" + + ce: Union[bytes, str, float, int, None] = None + hae: Union[bytes, str, float, int, None] = None + le: Union[bytes, str, float, int, None] = None + + def to_xml(self) -> ET.Element: + """Return a CoT Event as an XML string.""" + cotevent = COTEvent( + lat=self.lat, + lon=self.lon, + uid=self.uid, + stale=self.stale, + cot_type=self.cot_type, + le=self.le, + ce=self.ce, + hae=self.hae, + ) + event = pytak.cot2xml(cotevent) + return event diff --git a/pytak/crypto_functions.py b/pytak/crypto_functions.py index 4efb8e7..6901669 100644 --- a/pytak/crypto_functions.py +++ b/pytak/crypto_functions.py @@ -37,7 +37,7 @@ USE_CRYPTOGRAPHY = True except ImportError as exc: - warnings.warn(exc) + warnings.warn(str(exc)) def save_pem(pem: bytes, dest: Union[str, None] = None) -> str: @@ -60,7 +60,7 @@ def load_cert( ): # -> Set[_RSAPrivateKey, Certificate, Certificate]: """Load RSA Keys & Certs from a pkcs12 ().p12) file.""" if not USE_CRYPTOGRAPHY: - raise Exception(INSTALL_MSG) + raise ValueError(INSTALL_MSG) with open(cert_path, "br+") as cp_fd: p12_data = cp_fd.read() @@ -73,7 +73,7 @@ def load_cert( def convert_cert(cert_path: str, cert_pass: str) -> dict: """Convert a P12 cert to PEM.""" if not USE_CRYPTOGRAPHY: - raise Exception(INSTALL_MSG) + raise ValueError(INSTALL_MSG) cert_paths = { "pk_pem_path": None, diff --git a/pytak/functions.py b/pytak/functions.py index 9b98a91..01acf88 100644 --- a/pytak/functions.py +++ b/pytak/functions.py @@ -160,24 +160,65 @@ def connectString2url(conn_str: str) -> str: # pylint: disable=invalid-name return f"{uri_parts[2]}://{uri_parts[0]}:{uri_parts[1]}" +def cot2xml(event: pytak.COTEvent) -> ET.Element: + """Generate a minimum COT Event as an XML object.""" + lat = str(event.lat or "0.0") + lon = str(event.lon or "0.0") + uid = event.uid or pytak.DEFAULT_HOST_ID + stale = int(event.stale or pytak.DEFAULT_COT_STALE) + cot_type = event.cot_type or "a-u-G" + le = str(event.le or pytak.DEFAULT_COT_VAL) + hae = str(event.hae or pytak.DEFAULT_COT_VAL) + ce = str(event.ce or pytak.DEFAULT_COT_VAL) + + xevent = ET.Element("event") + xevent.set("version", "2.0") + xevent.set("type", cot_type) + xevent.set("uid", uid) + xevent.set("how", "m-g") + xevent.set("time", pytak.cot_time()) + xevent.set("start", pytak.cot_time()) + xevent.set("stale", pytak.cot_time(stale)) + + point = ET.Element("point") + point.set("lat", lat) + point.set("lon", lon) + point.set("le", le) + point.set("hae", hae) + point.set("ce", ce) + + flow_tags = ET.Element("_flow-tags_") + _ft_tag: str = f"{pytak.DEFAULT_HOST_ID}-v{pytak.__version__}".replace("@", "-") + flow_tags.set(_ft_tag, pytak.cot_time()) + + detail = ET.Element("detail") + detail.append(flow_tags) + + xevent.append(point) + xevent.append(detail) + + return xevent + + def gen_cot_xml( lat: Union[bytes, str, float, None] = None, lon: Union[bytes, str, float, None] = None, ce: Union[bytes, str, float, int, None] = None, hae: Union[bytes, str, float, int, None] = None, le: Union[bytes, str, float, int, None] = None, - uid: Union[bytes, str, None] = None, + uid: Union[str, None] = None, stale: Union[float, int, None] = None, - cot_type: Union[bytes, str, None] = None, + cot_type: Union[str, None] = None, ) -> Optional[ET.Element]: """Generate a minimum CoT Event as an XML object.""" + lat = str(lat or "0.0") lon = str(lon or "0.0") ce = str(ce or pytak.DEFAULT_COT_VAL) hae = str(hae or pytak.DEFAULT_COT_VAL) le = str(le or pytak.DEFAULT_COT_VAL) uid = uid or pytak.DEFAULT_HOST_ID - stale = stale or pytak.DEFAULT_COT_STALE + stale = int(stale or pytak.DEFAULT_COT_STALE) cot_type = cot_type or "a-u-G" event = ET.Element("event") diff --git a/tests/test_classes.py b/tests/test_classes.py index ffd1598..246d90a 100644 --- a/tests/test_classes.py +++ b/tests/test_classes.py @@ -1,7 +1,7 @@ #!/usr/bin/env python3 # -*- coding: utf-8 -*- # -# Copyright 2023 Sensors & Signals LLC +# Copyright Sensors & Signals LLC https://www.snstac.com/ # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -31,11 +31,11 @@ import pytest import pytak - - -__author__ = "Greg Albrecht " -__copyright__ = "Copyright 2023 Sensors & Signals LLC" -__license__ = "Apache License, Version 2.0" +import asyncio +import pytest +from unittest import mock +from configparser import ConfigParser +from pytak import CLITool _SENTINEL = enum.Enum("_SENTINEL", "sentinel") @@ -90,11 +90,13 @@ async def test_worker(): @pytest.mark.asyncio -async def test_eventworker(): +async def test_eventworker() -> None: + """Test EventWorker.""" event_queue: asyncio.Queue = asyncio.Queue() await event_queue.put(b"taco1") await event_queue.put(b"taco2") + transport = mock.Mock() transport.write = mock.Mock() transport.is_closing = mock.Mock() protocol._drain_helper = make_mocked_coro() @@ -113,3 +115,154 @@ async def test_eventworker(): # Python 3.7: popped[1][0] # Python 3.8+: popped.args[0] assert b"taco1" == popped[1][0] + + +def test_simple_cot_event_to_xml() -> None: + """Test SimpleCOTEvent to XML.""" + event = pytak.SimpleCOTEvent( + lat=37.7749, lon=-122.4194, uid="user1", stale=60, cot_type="a-f-G-U-C" + ) + xml = event.to_xml() + assert xml.tag == "event" + assert xml.attrib["version"] == "2.0" + assert xml.attrib["uid"] == "user1" + assert xml.attrib["type"] == "a-f-G-U-C" + + point = xml.find("point") + assert point is not None + assert point.attrib["lat"] == "37.7749" + assert point.attrib["lon"] == "-122.4194" + + +def test_cot_event_to_xml(): + """Test COTEvent to XML.""" + event = pytak.COTEvent( + lat=37.7749, + lon=-122.4194, + uid="user1", + stale=60, + cot_type="a-f-G-U-C", + le=100, + ce=200, + hae=300, + ) + xml = event.to_xml() + assert xml.tag == "event" + assert xml.attrib["version"] == "2.0" + assert xml.attrib["uid"] == "user1" + assert xml.attrib["type"] == "a-f-G-U-C" + + point = xml.find("point") + assert point is not None + assert point.attrib["lat"] == "37.7749" + assert point.attrib["lon"] == "-122.4194" + assert point.attrib["le"] == "100" + assert point.attrib["ce"] == "200" + assert point.attrib["hae"] == "300" + + +def test_simple_cot_event_as_obj(): + """Test SimpleCOTEvent as object.""" + event = pytak.SimpleCOTEvent( + lat=37.7749, lon=-122.4194, uid="user1", stale=60, cot_type="a-f-G-U-C" + ) + assert event.lat == 37.7749 + assert event.lon == -122.4194 + assert event.uid == "user1" + assert event.stale == 60 + assert event.cot_type == "a-f-G-U-C" + + +def test_simple_cot_as_str(): + """Test SimpleCOTEvent as str.""" + event = pytak.SimpleCOTEvent( + lat=37.7749, lon=-122.4194, uid="user1", stale=60, cot_type="a-f-G-U-C" + ) + event = str(event) + assert "37.7749" in event + assert "-122.4194" in event + assert "user1" in event + assert "a-f-G-U-C" in event + + +def test_cot_event_as_obj(): + """Test COTEvent as object.""" + event = pytak.COTEvent( + lat=37.7749, + lon=-122.4194, + uid="user1", + stale=60, + cot_type="a-f-G-U-C", + le=100, + ce=200, + hae=300, + ) + assert event.lat == 37.7749 + assert event.lon == -122.4194 + assert event.uid == "user1" + assert event.stale == 60 + assert event.cot_type == "a-f-G-U-C" + assert event.le == 100 + assert event.ce == 200 + assert event.hae == 300 + + +def test_cot_event_as_str(): + """Test COTEvent as str.""" + event = pytak.COTEvent( + lat=37.7749, + lon=-122.4194, + uid="user1", + stale=60, + cot_type="a-f-G-U-C", + le=100, + ce=200, + hae=300, + ) + print(event) + event = str(event) + assert "37.7749" in event + assert "-122.4194" in event + assert "user1" in event + assert "a-f-G-U-C" in event + assert "100" in event + assert "200" in event + assert "300" in event + + +@pytest.fixture +def config(): + config = ConfigParser() + config.add_section("pytak") + config.set("pytak", "COT_URL", "tcp://example.com:8087") + config.set("pytak", "COT_HOST_ID", "user1") + config = config["pytak"] + return config + + +@pytest.fixture +def tx_queue(): + return asyncio.Queue() + + +@pytest.fixture +def rx_queue(): + return asyncio.Queue() + + +@pytest.fixture +def cli_tool(config, tx_queue, rx_queue): + return CLITool(config, tx_queue, rx_queue) + + +def test_clitool_init(config, tx_queue, rx_queue): + cli_tool = CLITool(config, tx_queue, rx_queue) + assert cli_tool.config == config + assert cli_tool.tx_queue == tx_queue + assert cli_tool.rx_queue == rx_queue + + +@pytest.mark.asyncio +async def test_clitool_hello_event(cli_tool, tx_queue): + await cli_tool.hello_event() + assert tx_queue.qsize() == 1 diff --git a/tests/test_commands.py b/tests/test_commands.py new file mode 100644 index 0000000..517b60c --- /dev/null +++ b/tests/test_commands.py @@ -0,0 +1,26 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +# +# Copyright Sensors & Signals LLC https://www.snstac.com +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""Python Team Awareness Kit (PyTAK) Module Tests.""" + +import pytest +from unittest import mock + +import pytak + +import pytak.commands diff --git a/tests/test_functions.py b/tests/test_functions.py index 6177899..f8a5cda 100644 --- a/tests/test_functions.py +++ b/tests/test_functions.py @@ -112,3 +112,46 @@ def test_cot_time(): """Test that cot_time() returns the proper dateTime string.""" cot_time = pytak.cot_time() assert "Z" in cot_time[-1] + + +def test_cot2xml(): + """Test gen_cot_xml2() function.""" + event = pytak.COTEvent( + lat=37.7749, + lon=-122.4194, + ce=10, + hae=100, + le=5, + uid="test_uid", + stale=3600, + cot_type="a-f-G", + ) + xml_element = pytak.cot2xml(event) + assert xml_element is not None + assert xml_element.tag == "event" + assert xml_element.get("version") == "2.0" + assert xml_element.get("type") == "a-f-G" + assert xml_element.get("uid") == "test_uid" + assert xml_element.get("how") == "m-g" + assert xml_element.get("time") is not None + assert xml_element.get("start") is not None + assert xml_element.get("stale") is not None + + point_element = xml_element.find("point") + assert point_element is not None + assert point_element.get("lat") == "37.7749" + assert point_element.get("lon") == "-122.4194" + assert point_element.get("le") == "5" + assert point_element.get("hae") == "100" + assert point_element.get("ce") == "10" + + detail_element = xml_element.find("detail") + assert detail_element is not None + flow_tags_element = detail_element.find("_flow-tags_") + assert flow_tags_element is not None + assert ( + flow_tags_element.get( + f"{pytak.DEFAULT_HOST_ID}-v{pytak.__version__}".replace("@", "-") + ) + is not None + )