From 328435d593bd0af5c10b8f85b0992783729cda60 Mon Sep 17 00:00:00 2001 From: zariiii9003 <52598363+zariiii9003@users.noreply.github.com> Date: Wed, 6 Nov 2024 16:15:58 +0100 Subject: [PATCH 1/6] implement NotifierRegistry --- can/notifier.py | 123 +++++++++++++++++++++++++++++++++++++++++------- 1 file changed, 106 insertions(+), 17 deletions(-) diff --git a/can/notifier.py b/can/notifier.py index 088f0802e..2226c64bb 100644 --- a/can/notifier.py +++ b/can/notifier.py @@ -7,7 +7,10 @@ import logging import threading import time -from typing import Any, Awaitable, Callable, Iterable, List, Optional, Union +import typing +from contextlib import AbstractContextManager +from types import TracebackType +from typing import Any, Awaitable, Callable, Iterable, List, NamedTuple, Optional, Union from can.bus import BusABC from can.listener import Listener @@ -18,7 +21,65 @@ MessageRecipient = Union[Listener, Callable[[Message], Union[Awaitable[None], None]]] -class Notifier: +class _BusNotifierPair(NamedTuple): + bus: "BusABC" + notifier: "Notifier" + + +class _NotifierRegistry: + """A registry to manage the association between CAN buses and Notifiers. + + This class ensures that a bus is not added to multiple active Notifiers. + """ + + def __init__(self) -> None: + """Initialize the registry with an empty list of bus-notifier pairs and a threading lock.""" + self.pairs: typing.List[_BusNotifierPair] = [] + self.lock = threading.Lock() + + def register(self, bus: BusABC, notifier: "Notifier") -> None: + """Register a bus and its associated notifier. + + Ensures that a bus is not added to multiple active Notifier instances. + + :param bus: + The CAN bus to register. + :param notifier: + The Notifier instance associated with the bus. + :raises ValueError: + If the bus is already assigned to an active Notifier. + """ + with self.lock: + for pair in self.pairs: + if bus is pair.bus and not pair.notifier.stopped: + raise ValueError( + "A bus can not be added to multiple active Notifier instances." + ) + self.pairs.append(_BusNotifierPair(bus, notifier)) + + def unregister(self, bus: BusABC, notifier: "Notifier") -> None: + """Unregister a bus and its associated notifier. + + Removes the bus-notifier pair from the registry. + + :param bus: + The CAN bus to unregister. + :param notifier: + The Notifier instance associated with the bus. + """ + with self.lock: + registered_pairs_to_remove: typing.List[_BusNotifierPair] = [] + for pair in self.pairs: + if pair.bus is bus and pair.notifier is notifier: + registered_pairs_to_remove.append(pair) + for pair in registered_pairs_to_remove: + self.pairs.remove(pair) + + +class Notifier(AbstractContextManager): + + _registry: typing.Final = _NotifierRegistry() + def __init__( self, bus: Union[BusABC, List[BusABC]], @@ -32,16 +93,21 @@ def __init__( .. Note:: - Remember to call `stop()` after all messages are received as + Remember to call :meth:`~can.Notifier.stop` after all messages are received as many listeners carry out flush operations to persist data. - :param bus: A :ref:`bus` or a list of buses to listen to. + :param bus: + A :ref:`bus` or a list of buses to listen to. :param listeners: An iterable of :class:`~can.Listener` or callables that receive a :class:`~can.Message` and return nothing. - :param timeout: An optional maximum number of seconds to wait for any :class:`~can.Message`. - :param loop: An :mod:`asyncio` event loop to schedule the ``listeners`` in. + :param timeout: + An optional maximum number of seconds to wait for any :class:`~can.Message`. + :param loop: + An :mod:`asyncio` event loop to schedule the ``listeners`` in. + :raises ValueError: + If the *bus* is already assigned to an active :class:`~can.Notifier`. """ self.listeners: List[MessageRecipient] = list(listeners) self.bus = bus @@ -51,12 +117,12 @@ def __init__( #: Exception raised in thread self.exception: Optional[Exception] = None - self._running = True + self._stopped = False self._lock = threading.Lock() self._readers: List[Union[int, threading.Thread]] = [] - buses = self.bus if isinstance(self.bus, list) else [self.bus] - for each_bus in buses: + self._bus_list = self.bus if isinstance(self.bus, list) else [self.bus] + for each_bus in self._bus_list: self.add_bus(each_bus) def add_bus(self, bus: BusABC) -> None: @@ -64,18 +130,23 @@ def add_bus(self, bus: BusABC) -> None: :param bus: CAN bus instance. + :raises ValueError: + If the *bus* is already assigned to an active :class:`~can.Notifier`. """ - reader: int = -1 + # add bus to notifier registry + self._registry.register(bus, self) + + file_descriptor: int = -1 try: - reader = bus.fileno() + file_descriptor = bus.fileno() except NotImplementedError: # Bus doesn't support fileno, we fall back to thread based reader pass - if self._loop is not None and reader >= 0: + if self._loop is not None and file_descriptor >= 0: # Use bus file descriptor to watch for messages - self._loop.add_reader(reader, self._on_message_available, bus) - self._readers.append(reader) + self._loop.add_reader(file_descriptor, self._on_message_available, bus) + self._readers.append(file_descriptor) else: reader_thread = threading.Thread( target=self._rx_thread, @@ -86,7 +157,7 @@ def add_bus(self, bus: BusABC) -> None: reader_thread.start() self._readers.append(reader_thread) - def stop(self, timeout: float = 5) -> None: + def stop(self, timeout: float = 5.0) -> None: """Stop notifying Listeners when new :class:`~can.Message` objects arrive and call :meth:`~can.Listener.stop` on each Listener. @@ -94,7 +165,7 @@ def stop(self, timeout: float = 5) -> None: Max time in seconds to wait for receive threads to finish. Should be longer than timeout given at instantiation. """ - self._running = False + self._stopped = True end_time = time.time() + timeout for reader in self._readers: if isinstance(reader, threading.Thread): @@ -108,6 +179,10 @@ def stop(self, timeout: float = 5) -> None: if hasattr(listener, "stop"): listener.stop() + # remove bus from registry + for bus in self._bus_list: + self._registry.unregister(bus, self) + def _rx_thread(self, bus: BusABC) -> None: # determine message handling callable early, not inside while loop if self._loop: @@ -118,7 +193,7 @@ def _rx_thread(self, bus: BusABC) -> None: else: handle_message = self._on_message_received - while self._running: + while not self._stopped: try: if msg := bus.recv(self.timeout): with self._lock: @@ -183,3 +258,17 @@ def remove_listener(self, listener: MessageRecipient) -> None: :raises ValueError: if `listener` was never added to this notifier """ self.listeners.remove(listener) + + @property + def stopped(self) -> bool: + """Return ``True``, if Notifier was properly shut down with :meth:`~can.Notifier.stop`.""" + return self._stopped + + def __exit__( + self, + exc_type: Optional[typing.Type[BaseException]], + exc_value: Optional[BaseException], + traceback: Optional[TracebackType], + ) -> None: + if not self._stopped: + self.stop() From bcd47784e8514347a4fa5b5e21c39d0332c12bbe Mon Sep 17 00:00:00 2001 From: zariiii9003 <52598363+zariiii9003@users.noreply.github.com> Date: Wed, 6 Nov 2024 16:43:43 +0100 Subject: [PATCH 2/6] use Notifier context manager in examples --- examples/asyncio_demo.py | 42 ++++++++++++++++++------------------- examples/cyclic_checksum.py | 5 ++--- examples/print_notifier.py | 15 +++++++------ examples/send_multiple.py | 2 +- examples/serial_com.py | 2 +- examples/vcan_filtered.py | 13 +++++------- pyproject.toml | 1 + 7 files changed, 37 insertions(+), 43 deletions(-) diff --git a/examples/asyncio_demo.py b/examples/asyncio_demo.py index d29f03bc5..c3bfe8df5 100755 --- a/examples/asyncio_demo.py +++ b/examples/asyncio_demo.py @@ -5,10 +5,12 @@ """ import asyncio -from typing import List +from typing import TYPE_CHECKING, List import can -from can.notifier import MessageRecipient + +if TYPE_CHECKING: + from can.notifier import MessageRecipient def print_message(msg: can.Message) -> None: @@ -31,26 +33,22 @@ async def main() -> None: logger, # Regular Listener object ] # Create Notifier with an explicit loop to use for scheduling of callbacks - loop = asyncio.get_running_loop() - notifier = can.Notifier(bus, listeners, loop=loop) - # Start sending first message - bus.send(can.Message(arbitration_id=0)) - - print("Bouncing 10 messages...") - for _ in range(10): - # Wait for next message from AsyncBufferedReader - msg = await reader.get_message() - # Delay response - await asyncio.sleep(0.5) - msg.arbitration_id += 1 - bus.send(msg) - - # Wait for last message to arrive - await reader.get_message() - print("Done!") - - # Clean-up - notifier.stop() + with can.Notifier(bus, listeners, loop=asyncio.get_running_loop()): + # Start sending first message + bus.send(can.Message(arbitration_id=0)) + + print("Bouncing 10 messages...") + for _ in range(10): + # Wait for next message from AsyncBufferedReader + msg = await reader.get_message() + # Delay response + await asyncio.sleep(0.5) + msg.arbitration_id += 1 + bus.send(msg) + + # Wait for last message to arrive + await reader.get_message() + print("Done!") if __name__ == "__main__": diff --git a/examples/cyclic_checksum.py b/examples/cyclic_checksum.py index 3ab6c78ac..763fcd72b 100644 --- a/examples/cyclic_checksum.py +++ b/examples/cyclic_checksum.py @@ -59,6 +59,5 @@ def compute_xbr_checksum(message: can.Message, counter: int) -> int: if __name__ == "__main__": with can.Bus(channel=0, interface="virtual", receive_own_messages=True) as _bus: - notifier = can.Notifier(bus=_bus, listeners=[print]) - cyclic_checksum_send(_bus) - notifier.stop() + with can.Notifier(bus=_bus, listeners=[print]): + cyclic_checksum_send(_bus) diff --git a/examples/print_notifier.py b/examples/print_notifier.py index 8d55ca1dc..e6e11dbec 100755 --- a/examples/print_notifier.py +++ b/examples/print_notifier.py @@ -8,14 +8,13 @@ def main(): with can.Bus(interface="virtual", receive_own_messages=True) as bus: print_listener = can.Printer() - notifier = can.Notifier(bus, [print_listener]) - - bus.send(can.Message(arbitration_id=1, is_extended_id=True)) - bus.send(can.Message(arbitration_id=2, is_extended_id=True)) - bus.send(can.Message(arbitration_id=1, is_extended_id=False)) - - time.sleep(1.0) - notifier.stop() + with can.Notifier(bus, listeners=[print_listener]): + # using Notifier as a context manager automatically calls `Notifier.stop()` + # at the end of the `with` block + bus.send(can.Message(arbitration_id=1, is_extended_id=True)) + bus.send(can.Message(arbitration_id=2, is_extended_id=True)) + bus.send(can.Message(arbitration_id=1, is_extended_id=False)) + time.sleep(1.0) if __name__ == "__main__": diff --git a/examples/send_multiple.py b/examples/send_multiple.py index fdcaa5b59..9123e1bc8 100755 --- a/examples/send_multiple.py +++ b/examples/send_multiple.py @@ -4,8 +4,8 @@ This demo creates multiple processes of producers to spam a socketcan bus. """ -from time import sleep from concurrent.futures import ProcessPoolExecutor +from time import sleep import can diff --git a/examples/serial_com.py b/examples/serial_com.py index 538c8d12f..9f203b2e0 100755 --- a/examples/serial_com.py +++ b/examples/serial_com.py @@ -18,8 +18,8 @@ com0com: http://com0com.sourceforge.net/ """ -import time import threading +import time import can diff --git a/examples/vcan_filtered.py b/examples/vcan_filtered.py index 9c67390ab..22bca706c 100755 --- a/examples/vcan_filtered.py +++ b/examples/vcan_filtered.py @@ -18,14 +18,11 @@ def main(): # print all incoming messages, which includes the ones sent, # since we set receive_own_messages to True # assign to some variable so it does not garbage collected - notifier = can.Notifier(bus, [can.Printer()]) # pylint: disable=unused-variable - - bus.send(can.Message(arbitration_id=1, is_extended_id=True)) - bus.send(can.Message(arbitration_id=2, is_extended_id=True)) - bus.send(can.Message(arbitration_id=1, is_extended_id=False)) - - time.sleep(1.0) - notifier.stop() + with can.Notifier(bus, [can.Printer()]): # pylint: disable=unused-variable + bus.send(can.Message(arbitration_id=1, is_extended_id=True)) + bus.send(can.Message(arbitration_id=2, is_extended_id=True)) + bus.send(can.Message(arbitration_id=1, is_extended_id=False)) + time.sleep(1.0) if __name__ == "__main__": diff --git a/pyproject.toml b/pyproject.toml index f2b6ac04f..592eeef12 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -167,6 +167,7 @@ ignore = [ ] "can/logger.py" = ["T20"] # flake8-print "can/player.py" = ["T20"] # flake8-print +"examples/*" = ["T20"] # flake8-print [tool.ruff.lint.isort] known-first-party = ["can"] From 2b8154a94b937f785d52f4390890c97b3fe1f7d4 Mon Sep 17 00:00:00 2001 From: zariiii9003 <52598363+zariiii9003@users.noreply.github.com> Date: Wed, 6 Nov 2024 16:55:45 +0100 Subject: [PATCH 3/6] add tests --- test/notifier_test.py | 27 +++++++++++++++++++++++++++ 1 file changed, 27 insertions(+) diff --git a/test/notifier_test.py b/test/notifier_test.py index 6982130cf..b6b7d042c 100644 --- a/test/notifier_test.py +++ b/test/notifier_test.py @@ -12,16 +12,19 @@ def test_single_bus(self): with can.Bus("test", interface="virtual", receive_own_messages=True) as bus: reader = can.BufferedReader() notifier = can.Notifier(bus, [reader], 0.1) + self.assertFalse(notifier.stopped) msg = can.Message() bus.send(msg) self.assertIsNotNone(reader.get_message(1)) notifier.stop() + self.assertTrue(notifier.stopped) def test_multiple_bus(self): with can.Bus(0, interface="virtual", receive_own_messages=True) as bus1: with can.Bus(1, interface="virtual", receive_own_messages=True) as bus2: reader = can.BufferedReader() notifier = can.Notifier([bus1, bus2], [reader], 0.1) + self.assertFalse(notifier.stopped) msg = can.Message() bus1.send(msg) time.sleep(0.1) @@ -33,6 +36,30 @@ def test_multiple_bus(self): self.assertIsNotNone(recv_msg) self.assertEqual(recv_msg.channel, 1) notifier.stop() + self.assertTrue(notifier.stopped) + + def test_context_manager(self): + with can.Bus("test", interface="virtual", receive_own_messages=True) as bus: + reader = can.BufferedReader() + with can.Notifier(bus, [reader], 0.1) as notifier: + self.assertFalse(notifier.stopped) + msg = can.Message() + bus.send(msg) + self.assertIsNotNone(reader.get_message(1)) + notifier.stop() + self.assertTrue(notifier.stopped) + + def test_registry(self): + with can.Bus("test", interface="virtual", receive_own_messages=True) as bus: + reader = can.BufferedReader() + with can.Notifier(bus, [reader], 0.1): + # creating a second notifier for the same bus must fail + self.assertRaises(ValueError, can.Notifier, bus, [reader], 0.1) + + # now the first notifier is stopped, a new notifier can be created without error: + with can.Notifier(bus, [reader], 0.1): + # the next notifier call should fail again since there is an active notifier already + self.assertRaises(ValueError, can.Notifier, bus, [reader], 0.1) class AsyncNotifierTest(unittest.TestCase): From 6fc0d7da23ab26a2af9a52732618db888e744fe9 Mon Sep 17 00:00:00 2001 From: zariiii9003 Date: Wed, 6 Nov 2024 19:32:52 +0100 Subject: [PATCH 4/6] remove typing import --- can/notifier.py | 22 ++++++++++++++++------ 1 file changed, 16 insertions(+), 6 deletions(-) diff --git a/can/notifier.py b/can/notifier.py index 2226c64bb..03f79c675 100644 --- a/can/notifier.py +++ b/can/notifier.py @@ -7,10 +7,20 @@ import logging import threading import time -import typing from contextlib import AbstractContextManager from types import TracebackType -from typing import Any, Awaitable, Callable, Iterable, List, NamedTuple, Optional, Union +from typing import ( + Any, + Awaitable, + Callable, + Final, + Iterable, + List, + NamedTuple, + Optional, + Type, + Union, +) from can.bus import BusABC from can.listener import Listener @@ -34,7 +44,7 @@ class _NotifierRegistry: def __init__(self) -> None: """Initialize the registry with an empty list of bus-notifier pairs and a threading lock.""" - self.pairs: typing.List[_BusNotifierPair] = [] + self.pairs: List[_BusNotifierPair] = [] self.lock = threading.Lock() def register(self, bus: BusABC, notifier: "Notifier") -> None: @@ -68,7 +78,7 @@ def unregister(self, bus: BusABC, notifier: "Notifier") -> None: The Notifier instance associated with the bus. """ with self.lock: - registered_pairs_to_remove: typing.List[_BusNotifierPair] = [] + registered_pairs_to_remove: List[_BusNotifierPair] = [] for pair in self.pairs: if pair.bus is bus and pair.notifier is notifier: registered_pairs_to_remove.append(pair) @@ -78,7 +88,7 @@ def unregister(self, bus: BusABC, notifier: "Notifier") -> None: class Notifier(AbstractContextManager): - _registry: typing.Final = _NotifierRegistry() + _registry: Final = _NotifierRegistry() def __init__( self, @@ -266,7 +276,7 @@ def stopped(self) -> bool: def __exit__( self, - exc_type: Optional[typing.Type[BaseException]], + exc_type: Optional[Type[BaseException]], exc_value: Optional[BaseException], traceback: Optional[TracebackType], ) -> None: From e35610adb4061fd086871de70db03b84656d9876 Mon Sep 17 00:00:00 2001 From: zariiii9003 <52598363+zariiii9003@users.noreply.github.com> Date: Sat, 23 Nov 2024 10:59:15 +0100 Subject: [PATCH 5/6] implement Notifier.find_instance(bus) --- can/notifier.py | 43 ++++++++++++++++++++++++++++++++++++++++--- test/notifier_test.py | 13 +++++++++++-- 2 files changed, 51 insertions(+), 5 deletions(-) diff --git a/can/notifier.py b/can/notifier.py index 03f79c675..b678b283b 100644 --- a/can/notifier.py +++ b/can/notifier.py @@ -50,12 +50,12 @@ def __init__(self) -> None: def register(self, bus: BusABC, notifier: "Notifier") -> None: """Register a bus and its associated notifier. - Ensures that a bus is not added to multiple active Notifier instances. + Ensures that a bus is not added to multiple active :class:`~can.Notifier` instances. :param bus: The CAN bus to register. :param notifier: - The Notifier instance associated with the bus. + The :class:`~can.Notifier` instance associated with the bus. :raises ValueError: If the bus is already assigned to an active Notifier. """ @@ -75,7 +75,7 @@ def unregister(self, bus: BusABC, notifier: "Notifier") -> None: :param bus: The CAN bus to unregister. :param notifier: - The Notifier instance associated with the bus. + The :class:`~can.Notifier` instance associated with the bus. """ with self.lock: registered_pairs_to_remove: List[_BusNotifierPair] = [] @@ -85,6 +85,26 @@ def unregister(self, bus: BusABC, notifier: "Notifier") -> None: for pair in registered_pairs_to_remove: self.pairs.remove(pair) + def find_instance(self, bus: BusABC) -> Optional["Notifier"]: + """Find the :class:`~can.Notifier` instance associated with a given CAN bus. + + This method searches the registry for the :class:`~can.Notifier` + that is linked to the specified bus. If the bus is found, the + corresponding :class:`~can.Notifier` instance is returned. If the bus is not + found in the registry, `None` is returned. + + :param bus: + The CAN bus for which to find the associated :class:`~can.Notifier` . + :return: + The :class:`~can.Notifier` instance associated with the given bus, + or `None` if no such association exists. + """ + with self.lock: + for pair in self.pairs: + if bus is pair.bus: + return pair.notifier + return None + class Notifier(AbstractContextManager): @@ -274,6 +294,23 @@ def stopped(self) -> bool: """Return ``True``, if Notifier was properly shut down with :meth:`~can.Notifier.stop`.""" return self._stopped + @classmethod + def find_instance(cls, bus: BusABC) -> Optional["Notifier"]: + """Find the :class:`~can.Notifier` instance associated with a given CAN bus. + + This method searches the registry for the :class:`~can.Notifier` + that is linked to the specified bus. If the bus is found, the + corresponding :class:`~can.Notifier` instance is returned. If the bus is not + found in the registry, `None` is returned. + + :param bus: + The CAN bus for which to find the associated :class:`~can.Notifier` . + :return: + The :class:`~can.Notifier` instance associated with the given bus, + or `None` if no such association exists. + """ + return cls._registry.find_instance(bus) + def __exit__( self, exc_type: Optional[Type[BaseException]], diff --git a/test/notifier_test.py b/test/notifier_test.py index b6b7d042c..0dd3ba4b3 100644 --- a/test/notifier_test.py +++ b/test/notifier_test.py @@ -52,15 +52,24 @@ def test_context_manager(self): def test_registry(self): with can.Bus("test", interface="virtual", receive_own_messages=True) as bus: reader = can.BufferedReader() - with can.Notifier(bus, [reader], 0.1): + with can.Notifier(bus, [reader], 0.1) as notifier: # creating a second notifier for the same bus must fail self.assertRaises(ValueError, can.Notifier, bus, [reader], 0.1) + # find_instance must return the existing instance + self.assertIs(can.Notifier.find_instance(bus), notifier) + + # Notifier is stopped, find instance must return None + self.assertIsNone(can.Notifier.find_instance(bus)) + # now the first notifier is stopped, a new notifier can be created without error: - with can.Notifier(bus, [reader], 0.1): + with can.Notifier(bus, [reader], 0.1) as notifier: # the next notifier call should fail again since there is an active notifier already self.assertRaises(ValueError, can.Notifier, bus, [reader], 0.1) + # find_instance must return the existing instance + self.assertIs(can.Notifier.find_instance(bus), notifier) + class AsyncNotifierTest(unittest.TestCase): def test_asyncio_notifier(self): From f9095a05d08031c9f6c8d3f0a97b543a6b2a6f99 Mon Sep 17 00:00:00 2001 From: zariiii9003 Date: Fri, 29 Nov 2024 12:06:34 +0100 Subject: [PATCH 6/6] address review comments --- can/notifier.py | 58 +++++++++++++++++++++++++------------------ test/notifier_test.py | 8 +++--- 2 files changed, 38 insertions(+), 28 deletions(-) diff --git a/can/notifier.py b/can/notifier.py index b678b283b..cf8dd7540 100644 --- a/can/notifier.py +++ b/can/notifier.py @@ -18,6 +18,7 @@ List, NamedTuple, Optional, + Tuple, Type, Union, ) @@ -85,25 +86,25 @@ def unregister(self, bus: BusABC, notifier: "Notifier") -> None: for pair in registered_pairs_to_remove: self.pairs.remove(pair) - def find_instance(self, bus: BusABC) -> Optional["Notifier"]: - """Find the :class:`~can.Notifier` instance associated with a given CAN bus. + def find_instances(self, bus: BusABC) -> Tuple["Notifier", ...]: + """Find the :class:`~can.Notifier` instances associated with a given CAN bus. This method searches the registry for the :class:`~can.Notifier` that is linked to the specified bus. If the bus is found, the - corresponding :class:`~can.Notifier` instance is returned. If the bus is not - found in the registry, `None` is returned. + corresponding :class:`~can.Notifier` instances are returned. If the bus is not + found in the registry, an empty tuple is returned. :param bus: The CAN bus for which to find the associated :class:`~can.Notifier` . :return: - The :class:`~can.Notifier` instance associated with the given bus, - or `None` if no such association exists. + A tuple of :class:`~can.Notifier` instances associated with the given bus. """ + instance_list = [] with self.lock: for pair in self.pairs: if bus is pair.bus: - return pair.notifier - return None + instance_list.append(pair.notifier) + return tuple(instance_list) class Notifier(AbstractContextManager): @@ -128,7 +129,7 @@ def __init__( :param bus: - A :ref:`bus` or a list of buses to listen to. + A :ref:`bus` or a list of buses to consume messages from. :param listeners: An iterable of :class:`~can.Listener` or callables that receive a :class:`~can.Message` and return nothing. @@ -137,10 +138,10 @@ def __init__( :param loop: An :mod:`asyncio` event loop to schedule the ``listeners`` in. :raises ValueError: - If the *bus* is already assigned to an active :class:`~can.Notifier`. + If a passed in *bus* is already assigned to an active :class:`~can.Notifier`. """ self.listeners: List[MessageRecipient] = list(listeners) - self.bus = bus + self._bus_list: List[BusABC] = [] self.timeout = timeout self._loop = loop @@ -151,10 +152,17 @@ def __init__( self._lock = threading.Lock() self._readers: List[Union[int, threading.Thread]] = [] - self._bus_list = self.bus if isinstance(self.bus, list) else [self.bus] - for each_bus in self._bus_list: + _bus_list: List[BusABC] = bus if isinstance(bus, list) else [bus] + for each_bus in _bus_list: self.add_bus(each_bus) + @property + def bus(self) -> Union[BusABC, Tuple["BusABC", ...]]: + """Return the associated bus or a tuple of buses.""" + if len(self._bus_list) == 1: + return self._bus_list[0] + return tuple(self._bus_list) + def add_bus(self, bus: BusABC) -> None: """Add a bus for notification. @@ -164,7 +172,10 @@ def add_bus(self, bus: BusABC) -> None: If the *bus* is already assigned to an active :class:`~can.Notifier`. """ # add bus to notifier registry - self._registry.register(bus, self) + Notifier._registry.register(bus, self) + + # add bus to internal bus list + self._bus_list.append(bus) file_descriptor: int = -1 try: @@ -181,7 +192,7 @@ def add_bus(self, bus: BusABC) -> None: reader_thread = threading.Thread( target=self._rx_thread, args=(bus,), - name=f'can.notifier for bus "{bus.channel_info}"', + name=f'{self.__class__.__qualname__} for bus "{bus.channel_info}"', ) reader_thread.daemon = True reader_thread.start() @@ -211,7 +222,7 @@ def stop(self, timeout: float = 5.0) -> None: # remove bus from registry for bus in self._bus_list: - self._registry.unregister(bus, self) + Notifier._registry.unregister(bus, self) def _rx_thread(self, bus: BusABC) -> None: # determine message handling callable early, not inside while loop @@ -294,22 +305,21 @@ def stopped(self) -> bool: """Return ``True``, if Notifier was properly shut down with :meth:`~can.Notifier.stop`.""" return self._stopped - @classmethod - def find_instance(cls, bus: BusABC) -> Optional["Notifier"]: - """Find the :class:`~can.Notifier` instance associated with a given CAN bus. + @staticmethod + def find_instances(bus: BusABC) -> Tuple["Notifier", ...]: + """Find :class:`~can.Notifier` instances associated with a given CAN bus. This method searches the registry for the :class:`~can.Notifier` that is linked to the specified bus. If the bus is found, the - corresponding :class:`~can.Notifier` instance is returned. If the bus is not - found in the registry, `None` is returned. + corresponding :class:`~can.Notifier` instances are returned. If the bus is not + found in the registry, an empty tuple is returned. :param bus: The CAN bus for which to find the associated :class:`~can.Notifier` . :return: - The :class:`~can.Notifier` instance associated with the given bus, - or `None` if no such association exists. + A tuple of :class:`~can.Notifier` instances associated with the given bus. """ - return cls._registry.find_instance(bus) + return Notifier._registry.find_instances(bus) def __exit__( self, diff --git a/test/notifier_test.py b/test/notifier_test.py index 0dd3ba4b3..c21d51f04 100644 --- a/test/notifier_test.py +++ b/test/notifier_test.py @@ -57,10 +57,10 @@ def test_registry(self): self.assertRaises(ValueError, can.Notifier, bus, [reader], 0.1) # find_instance must return the existing instance - self.assertIs(can.Notifier.find_instance(bus), notifier) + self.assertEqual(can.Notifier.find_instances(bus), (notifier,)) - # Notifier is stopped, find instance must return None - self.assertIsNone(can.Notifier.find_instance(bus)) + # Notifier is stopped, find_instances() must return an empty tuple + self.assertEqual(can.Notifier.find_instances(bus), ()) # now the first notifier is stopped, a new notifier can be created without error: with can.Notifier(bus, [reader], 0.1) as notifier: @@ -68,7 +68,7 @@ def test_registry(self): self.assertRaises(ValueError, can.Notifier, bus, [reader], 0.1) # find_instance must return the existing instance - self.assertIs(can.Notifier.find_instance(bus), notifier) + self.assertEqual(can.Notifier.find_instances(bus), (notifier,)) class AsyncNotifierTest(unittest.TestCase):