7
7
import logging
8
8
import threading
9
9
import time
10
- import typing
11
10
from contextlib import AbstractContextManager
12
11
from types import TracebackType
13
- from typing import Any , Awaitable , Callable , Iterable , List , NamedTuple , Optional , Union
12
+ from typing import (
13
+ Any ,
14
+ Awaitable ,
15
+ Callable ,
16
+ Final ,
17
+ Iterable ,
18
+ List ,
19
+ NamedTuple ,
20
+ Optional ,
21
+ Type ,
22
+ Union ,
23
+ )
14
24
15
25
from can .bus import BusABC
16
26
from can .listener import Listener
@@ -34,7 +44,7 @@ class _NotifierRegistry:
34
44
35
45
def __init__ (self ) -> None :
36
46
"""Initialize the registry with an empty list of bus-notifier pairs and a threading lock."""
37
- self .pairs : typing . List [_BusNotifierPair ] = []
47
+ self .pairs : List [_BusNotifierPair ] = []
38
48
self .lock = threading .Lock ()
39
49
40
50
def register (self , bus : BusABC , notifier : "Notifier" ) -> None :
@@ -68,7 +78,7 @@ def unregister(self, bus: BusABC, notifier: "Notifier") -> None:
68
78
The Notifier instance associated with the bus.
69
79
"""
70
80
with self .lock :
71
- registered_pairs_to_remove : typing . List [_BusNotifierPair ] = []
81
+ registered_pairs_to_remove : List [_BusNotifierPair ] = []
72
82
for pair in self .pairs :
73
83
if pair .bus is bus and pair .notifier is notifier :
74
84
registered_pairs_to_remove .append (pair )
@@ -78,7 +88,7 @@ def unregister(self, bus: BusABC, notifier: "Notifier") -> None:
78
88
79
89
class Notifier (AbstractContextManager ):
80
90
81
- _registry : typing . Final = _NotifierRegistry ()
91
+ _registry : Final = _NotifierRegistry ()
82
92
83
93
def __init__ (
84
94
self ,
@@ -266,7 +276,7 @@ def stopped(self) -> bool:
266
276
267
277
def __exit__ (
268
278
self ,
269
- exc_type : Optional [typing . Type [BaseException ]],
279
+ exc_type : Optional [Type [BaseException ]],
270
280
exc_value : Optional [BaseException ],
271
281
traceback : Optional [TracebackType ],
272
282
) -> None :
0 commit comments