Skip to content

Commit

Permalink
6.3.0-beta1
Browse files Browse the repository at this point in the history
- Fixes #58: TypeError: can't multiply sequence by non-int of type 'float'.
- Fixes #64 (?): Cryptography functions deprecated
- Fixes #65: Performance issues with large queues, sleep only on empty queue.
- Fixes #66: Add config params MAX_OUT_QUEUE & MAX_IN_QUEUE to allow queue tuning.
  • Loading branch information
ampledata committed Mar 1, 2024
1 parent f1e884d commit 03a8ad6
Show file tree
Hide file tree
Showing 8 changed files with 79 additions and 27 deletions.
7 changes: 7 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,10 @@
## PyTAK 6.3.0

- Fixes #58: TypeError: can't multiply sequence by non-int of type 'float'.
- Fixes #64 (?): Cryptography functions deprecated
- Fixes #65: Performance issues with large queues, sleep only on empty queue.
- Fixes #66: Add config params MAX_OUT_QUEUE & MAX_IN_QUEUE to allow queue tuning.

## PyTAK 6.2.4

- Fixes #63: Python 3.6: AttributeError: module 'asyncio' has no attribute 'exceptions'.
Expand Down
Binary file modified docs/atak_screenshot_with_pytak_logo.jpg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
4 changes: 3 additions & 1 deletion pytak/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
:source: <https://github.com/snstac/pytak>
"""

__version__ = "6.2.4"
__version__ = "6.3.0-beta1"


from .constants import ( # NOQA
Expand Down Expand Up @@ -50,6 +50,8 @@
DEFAULT_COT_QOS,
DEFAULT_COT_OPEX,
DEFAULT_COT_VAL,
DEFAULT_MAX_OUT_QUEUE,
DEFAULT_MAX_IN_QUEUE,
)

from .classes import ( # NOQA
Expand Down
64 changes: 45 additions & 19 deletions pytak/classes.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import queue as _queue
import random

import xml.etree.ElementTree as ET

from typing import Optional, Set, Union

from configparser import ConfigParser, SectionProxy
Expand Down Expand Up @@ -96,7 +98,7 @@ async def fts_compat(self) -> None:
pytak_sleep: int = int(self.config.get("PYTAK_SLEEP") or 0)
if bool(self.config.get("FTS_COMPAT") or pytak_sleep):
sleep_period: int = int(
pytak_sleep or (pytak.DEFAULT_SLEEP * random.random())
pytak_sleep or (int(pytak.DEFAULT_SLEEP) * random.random())
)
self._logger.debug("COMPAT: Sleeping for %ss", sleep_period)
await asyncio.sleep(sleep_period)
Expand All @@ -112,10 +114,12 @@ async def run(self, number_of_iterations=-1):
# We're instantiating the while loop this way, and using get_nowait(),
# to allow unit testing of at least one call of this loop.
while number_of_iterations != 0:
await asyncio.sleep(self.min_period)
if self.queue.qsize() == 0:
await asyncio.sleep(self.min_period)
continue

# self._logger.debug("TX queue size=%s", self.queue.qsize())
data = None

try:
data = self.queue.get_nowait()
except (asyncio.QueueEmpty, _queue.Empty):
Expand Down Expand Up @@ -171,7 +175,11 @@ async def send_data(self, data: bytes) -> None:
else:
proto = takproto.TAKProtoVer.STREAM

data = takproto.xml2proto(data, proto)
try:
data = takproto.xml2proto(data, proto)
except ET.ParseError as exc:
self._logger.warning(exc)
self._logger.warning("Could not convert XML to Proto.")

if hasattr(self.writer, "send"):
await self.writer.send(data)
Expand Down Expand Up @@ -256,20 +264,24 @@ def __init__(
config: Union[None, SectionProxy, dict],
) -> None:
super().__init__(queue, config)
self._logger.info("COT_URL: %s", self.config.get("COT_URL"))
self._logger.info("Using COT_URL='%s'", self.config.get("COT_URL"))

async def put_queue(
self, data: bytes, queue_arg: Union[asyncio.Queue, mp.Queue, None] = None
) -> None:
"""Put Data onto the Queue."""
_queue = queue_arg or self.queue
try:
if isinstance(_queue, asyncio.Queue):
await _queue.put(data)
else:
_queue.put(data)
except asyncio.QueueFull:
self._logger.warning("Lost Data (queue full): '%s'", data)
self._logger.debug("Queue size=%s", _queue.qsize())
if isinstance(_queue, asyncio.Queue):
if _queue.full():
self._logger.warning("Queue full, dropping oldest data.")
await _queue.get()
await _queue.put(data)
else:
if _queue.full():
self._logger.warning("Queue full, dropping oldest data.")
_queue.get_nowait()
_queue.put_nowait(data)


class CLITool:
Expand All @@ -296,8 +308,19 @@ def __init__(
self.running_tasks: Set = set()
self._config = config
self.queues: dict = {}
self.tx_queue: Union[asyncio.Queue, mp.Queue] = tx_queue or asyncio.Queue()
self.rx_queue: Union[asyncio.Queue, mp.Queue] = rx_queue or asyncio.Queue()

self.max_in_queue = int(
self._config.get("MAX_IN_QUEUE") or pytak.DEFAULT_MAX_IN_QUEUE
)
self.max_out_queue = int(
self._config.get("MAX_OUT_QUEUE") or pytak.DEFAULT_MAX_OUT_QUEUE
)
self.tx_queue: Union[asyncio.Queue, mp.Queue] = tx_queue or asyncio.Queue(
self.max_out_queue
)
self.rx_queue: Union[asyncio.Queue, mp.Queue] = rx_queue or asyncio.Queue(
self.max_in_queue
)

if bool(self._config.get("DEBUG") or 0):
for handler in self._logger.handlers:
Expand All @@ -312,23 +335,25 @@ def config(self, val):
self._config = val

async def create_workers(self, i_config):
"""Create and run queue workers with specified config parameters.
"""
Create and run queue workers with specified config parameters.
Parameters
----------
i_config : `configparser.SectionProxy`
Configuration options & values.
"""
reader, writer = await pytak.protocol_factory(i_config)
tx_queue = asyncio.Queue()
rx_queue = asyncio.Queue()
tx_queue = asyncio.Queue(self.max_out_queue)
rx_queue = asyncio.Queue(self.max_in_queue)
if len(self.queues) == 0:
# If the queue list is empty, make this the default.
self.tx_queue = tx_queue
self.rx_queue = rx_queue
self.queues[i_config.name] = {"tx_queue": tx_queue, "rx_queue": rx_queue}

reader, writer = await pytak.protocol_factory(i_config)
write_worker = pytak.TXWorker(tx_queue, i_config, writer)
read_worker = pytak.RXWorker(rx_queue, i_config, reader)
self.queues[i_config.name] = {"tx_queue": tx_queue, "rx_queue": rx_queue}
self.add_task(write_worker)
self.add_task(read_worker)

Expand Down Expand Up @@ -364,6 +389,7 @@ def run_task(self, task):
"""Run the given coroutine task."""
self._logger.debug("Run Task: %s", task)
self.running_tasks.add(asyncio.ensure_future(task.run()))
# self.running_tasks.add(run_coroutine_in_thread(task.run()))

def run_tasks(self, tasks=None):
"""Run the given list or set of couroutine tasks."""
Expand Down
3 changes: 3 additions & 0 deletions pytak/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,3 +102,6 @@
DEFAULT_COT_RELTO: Optional[str] = os.getenv("COT_RELTO", "")
DEFAULT_COT_QOS: Optional[str] = os.getenv("COT_QOS", "")
DEFAULT_COT_OPEX: Optional[str] = os.getenv("COT_OPEX", "")

DEFAULT_MAX_OUT_QUEUE = 100
DEFAULT_MAX_IN_QUEUE = 500
4 changes: 2 additions & 2 deletions pytak/crypto_functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,13 @@

USE_CRYPTOGRAPHY = False
try:
from cryptography.hazmat.backends.openssl.rsa import _RSAPrivateKey
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.serialization import pkcs12
from cryptography.x509 import Certificate

USE_CRYPTOGRAPHY = True
except ImportError:
except ImportError as exc:
warnings.warn(exc)
warnings.warn(INSTALL_MSG)


Expand Down
14 changes: 14 additions & 0 deletions setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -70,3 +70,17 @@ test =
flake8
black
cryptography

[isort]
profile = black

[flake8]
max-line-length = 88
extend-ignore = E203, E704

[pylint]
max-line-length = 88

[pycodestyle]
ignore = E203
max_line_length = 88
10 changes: 5 additions & 5 deletions tests/test_pref_packages.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
#
# Copyright 2023 Sensors & Signals LLC
# Copyright Sensors & Signals LLC https://www.snstac.com
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand All @@ -25,18 +25,18 @@
import pytak.crypto_functions

__author__ = "Greg Albrecht <[email protected]>"
__copyright__ = "Copyright 2023 Sensors & Signals LLC"
__copyright__ = "Copyright Sensors & Signals LLC https://www.snstac.com"
__license__ = "Apache License, Version 2.0"


def test_load_preferences():
def test_load_preferences() -> None:
"""Test loading a preferences file."""
test_pref: str = "tests/data/test_pref.pref"
prefs: dict = pytak.functions.load_preferences(test_pref, "tests/data")
assert all(prefs)


def test_load_connectString2url():
def test_load_connectString2url() -> None:
"""Test converting a TAK connectString to a URL"""
test_pref: str = "tests/data/test_pref.pref"
prefs: dict = pytak.functions.load_preferences(test_pref, "tests/data")
Expand All @@ -45,7 +45,7 @@ def test_load_connectString2url():
assert url == "ssl://takserver.example.com:8089"


def test_load_cert():
def test_load_cert() -> None:
cert: list = pytak.crypto_functions.load_cert(
"tests/data/test_user_cert.p12", "atakatak"
)
Expand Down

0 comments on commit 03a8ad6

Please sign in to comment.