diff --git a/CHANGELOG.md b/CHANGELOG.md index 39ffbe8..64d92e8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,10 @@ +## PyTAK 6.3.0 + +- Fixes #58: TypeError: can't multiply sequence by non-int of type 'float'. +- Fixes #64 (?): Cryptography functions deprecated +- Fixes #65: Performance issues with large queues, sleep only on empty queue. +- Fixes #66: Add config params MAX_OUT_QUEUE & MAX_IN_QUEUE to allow queue tuning. + ## PyTAK 6.2.4 - Fixes #63: Python 3.6: AttributeError: module 'asyncio' has no attribute 'exceptions'. diff --git a/docs/atak_screenshot_with_pytak_logo.jpg b/docs/atak_screenshot_with_pytak_logo.jpg index 0dbe23d..ba5dea8 100755 Binary files a/docs/atak_screenshot_with_pytak_logo.jpg and b/docs/atak_screenshot_with_pytak_logo.jpg differ diff --git a/pytak/__init__.py b/pytak/__init__.py index 30a1227..ed134b0 100644 --- a/pytak/__init__.py +++ b/pytak/__init__.py @@ -19,7 +19,7 @@ :source: """ -__version__ = "6.2.4" +__version__ = "6.3.0-beta1" from .constants import ( # NOQA @@ -50,6 +50,8 @@ DEFAULT_COT_QOS, DEFAULT_COT_OPEX, DEFAULT_COT_VAL, + DEFAULT_MAX_OUT_QUEUE, + DEFAULT_MAX_IN_QUEUE, ) from .classes import ( # NOQA diff --git a/pytak/classes.py b/pytak/classes.py index 27a0f21..46ad192 100644 --- a/pytak/classes.py +++ b/pytak/classes.py @@ -24,6 +24,8 @@ import queue as _queue import random +import xml.etree.ElementTree as ET + from typing import Optional, Set, Union from configparser import ConfigParser, SectionProxy @@ -96,7 +98,7 @@ async def fts_compat(self) -> None: pytak_sleep: int = int(self.config.get("PYTAK_SLEEP") or 0) if bool(self.config.get("FTS_COMPAT") or pytak_sleep): sleep_period: int = int( - pytak_sleep or (pytak.DEFAULT_SLEEP * random.random()) + pytak_sleep or (int(pytak.DEFAULT_SLEEP) * random.random()) ) self._logger.debug("COMPAT: Sleeping for %ss", sleep_period) await asyncio.sleep(sleep_period) @@ -112,10 +114,12 @@ async def run(self, number_of_iterations=-1): # We're instantiating the while loop this way, and using get_nowait(), # to allow unit testing of at least one call of this loop. while number_of_iterations != 0: - await asyncio.sleep(self.min_period) + if self.queue.qsize() == 0: + await asyncio.sleep(self.min_period) + continue + # self._logger.debug("TX queue size=%s", self.queue.qsize()) data = None - try: data = self.queue.get_nowait() except (asyncio.QueueEmpty, _queue.Empty): @@ -171,7 +175,11 @@ async def send_data(self, data: bytes) -> None: else: proto = takproto.TAKProtoVer.STREAM - data = takproto.xml2proto(data, proto) + try: + data = takproto.xml2proto(data, proto) + except ET.ParseError as exc: + self._logger.warning(exc) + self._logger.warning("Could not convert XML to Proto.") if hasattr(self.writer, "send"): await self.writer.send(data) @@ -256,20 +264,24 @@ def __init__( config: Union[None, SectionProxy, dict], ) -> None: super().__init__(queue, config) - self._logger.info("COT_URL: %s", self.config.get("COT_URL")) + self._logger.info("Using COT_URL='%s'", self.config.get("COT_URL")) async def put_queue( self, data: bytes, queue_arg: Union[asyncio.Queue, mp.Queue, None] = None ) -> None: """Put Data onto the Queue.""" _queue = queue_arg or self.queue - try: - if isinstance(_queue, asyncio.Queue): - await _queue.put(data) - else: - _queue.put(data) - except asyncio.QueueFull: - self._logger.warning("Lost Data (queue full): '%s'", data) + self._logger.debug("Queue size=%s", _queue.qsize()) + if isinstance(_queue, asyncio.Queue): + if _queue.full(): + self._logger.warning("Queue full, dropping oldest data.") + await _queue.get() + await _queue.put(data) + else: + if _queue.full(): + self._logger.warning("Queue full, dropping oldest data.") + _queue.get_nowait() + _queue.put_nowait(data) class CLITool: @@ -296,8 +308,19 @@ def __init__( self.running_tasks: Set = set() self._config = config self.queues: dict = {} - self.tx_queue: Union[asyncio.Queue, mp.Queue] = tx_queue or asyncio.Queue() - self.rx_queue: Union[asyncio.Queue, mp.Queue] = rx_queue or asyncio.Queue() + + self.max_in_queue = int( + self._config.get("MAX_IN_QUEUE") or pytak.DEFAULT_MAX_IN_QUEUE + ) + self.max_out_queue = int( + self._config.get("MAX_OUT_QUEUE") or pytak.DEFAULT_MAX_OUT_QUEUE + ) + self.tx_queue: Union[asyncio.Queue, mp.Queue] = tx_queue or asyncio.Queue( + self.max_out_queue + ) + self.rx_queue: Union[asyncio.Queue, mp.Queue] = rx_queue or asyncio.Queue( + self.max_in_queue + ) if bool(self._config.get("DEBUG") or 0): for handler in self._logger.handlers: @@ -312,23 +335,25 @@ def config(self, val): self._config = val async def create_workers(self, i_config): - """Create and run queue workers with specified config parameters. + """ + Create and run queue workers with specified config parameters. Parameters ---------- i_config : `configparser.SectionProxy` Configuration options & values. """ - reader, writer = await pytak.protocol_factory(i_config) - tx_queue = asyncio.Queue() - rx_queue = asyncio.Queue() + tx_queue = asyncio.Queue(self.max_out_queue) + rx_queue = asyncio.Queue(self.max_in_queue) if len(self.queues) == 0: # If the queue list is empty, make this the default. self.tx_queue = tx_queue self.rx_queue = rx_queue + self.queues[i_config.name] = {"tx_queue": tx_queue, "rx_queue": rx_queue} + + reader, writer = await pytak.protocol_factory(i_config) write_worker = pytak.TXWorker(tx_queue, i_config, writer) read_worker = pytak.RXWorker(rx_queue, i_config, reader) - self.queues[i_config.name] = {"tx_queue": tx_queue, "rx_queue": rx_queue} self.add_task(write_worker) self.add_task(read_worker) @@ -364,6 +389,7 @@ def run_task(self, task): """Run the given coroutine task.""" self._logger.debug("Run Task: %s", task) self.running_tasks.add(asyncio.ensure_future(task.run())) + # self.running_tasks.add(run_coroutine_in_thread(task.run())) def run_tasks(self, tasks=None): """Run the given list or set of couroutine tasks.""" diff --git a/pytak/constants.py b/pytak/constants.py index 9f50d19..5aadd27 100644 --- a/pytak/constants.py +++ b/pytak/constants.py @@ -102,3 +102,6 @@ DEFAULT_COT_RELTO: Optional[str] = os.getenv("COT_RELTO", "") DEFAULT_COT_QOS: Optional[str] = os.getenv("COT_QOS", "") DEFAULT_COT_OPEX: Optional[str] = os.getenv("COT_OPEX", "") + +DEFAULT_MAX_OUT_QUEUE = 100 +DEFAULT_MAX_IN_QUEUE = 500 diff --git a/pytak/crypto_functions.py b/pytak/crypto_functions.py index 5a89614..89f5533 100644 --- a/pytak/crypto_functions.py +++ b/pytak/crypto_functions.py @@ -30,13 +30,13 @@ USE_CRYPTOGRAPHY = False try: - from cryptography.hazmat.backends.openssl.rsa import _RSAPrivateKey from cryptography.hazmat.primitives import serialization from cryptography.hazmat.primitives.serialization import pkcs12 from cryptography.x509 import Certificate USE_CRYPTOGRAPHY = True -except ImportError: +except ImportError as exc: + warnings.warn(exc) warnings.warn(INSTALL_MSG) diff --git a/setup.cfg b/setup.cfg index f5db703..4f68920 100644 --- a/setup.cfg +++ b/setup.cfg @@ -70,3 +70,17 @@ test = flake8 black cryptography + +[isort] +profile = black + +[flake8] +max-line-length = 88 +extend-ignore = E203, E704 + +[pylint] +max-line-length = 88 + +[pycodestyle] +ignore = E203 +max_line_length = 88 \ No newline at end of file diff --git a/tests/test_pref_packages.py b/tests/test_pref_packages.py index 396f3c0..0346869 100644 --- a/tests/test_pref_packages.py +++ b/tests/test_pref_packages.py @@ -1,7 +1,7 @@ #!/usr/bin/env python3 # -*- coding: utf-8 -*- # -# Copyright 2023 Sensors & Signals LLC +# Copyright Sensors & Signals LLC https://www.snstac.com # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -25,18 +25,18 @@ import pytak.crypto_functions __author__ = "Greg Albrecht " -__copyright__ = "Copyright 2023 Sensors & Signals LLC" +__copyright__ = "Copyright Sensors & Signals LLC https://www.snstac.com" __license__ = "Apache License, Version 2.0" -def test_load_preferences(): +def test_load_preferences() -> None: """Test loading a preferences file.""" test_pref: str = "tests/data/test_pref.pref" prefs: dict = pytak.functions.load_preferences(test_pref, "tests/data") assert all(prefs) -def test_load_connectString2url(): +def test_load_connectString2url() -> None: """Test converting a TAK connectString to a URL""" test_pref: str = "tests/data/test_pref.pref" prefs: dict = pytak.functions.load_preferences(test_pref, "tests/data") @@ -45,7 +45,7 @@ def test_load_connectString2url(): assert url == "ssl://takserver.example.com:8089" -def test_load_cert(): +def test_load_cert() -> None: cert: list = pytak.crypto_functions.load_cert( "tests/data/test_user_cert.p12", "atakatak" )