22
22
import ipaddress
23
23
import logging
24
24
import multiprocessing as mp
25
- import queue as _queue
26
25
import random
27
26
28
27
import xml .etree .ElementTree as ET
@@ -108,12 +107,12 @@ async def run_once(self) -> None:
108
107
await self .handle_data (data )
109
108
await self .fts_compat ()
110
109
111
- async def run (self ) -> None :
110
+ async def run (self , _ = - 1 ) -> None :
112
111
"""Run this Thread - calls run_once() in a loop."""
113
- self ._logger .info ("Run : %s" , self .__class__ .__name__ )
112
+ self ._logger .info ("Running : %s" , self .__class__ .__name__ )
114
113
while True :
115
114
await self .run_once ()
116
- await asyncio .sleep (0 ) # make sure other tasks have a chance to run
115
+ await asyncio .sleep (0 ) # make sure other tasks have a chance to run
117
116
118
117
119
118
class TXWorker (Worker ):
@@ -204,6 +203,7 @@ async def handle_data(self, data: bytes) -> None:
204
203
205
204
async def readcot (self ):
206
205
"""Read CoT from the wire until we hit an event boundary."""
206
+ cot = None
207
207
try :
208
208
if hasattr (self .reader , "readuntil" ):
209
209
cot = await self .reader .readuntil ("</event>" .encode ("UTF-8" ))
@@ -223,16 +223,17 @@ async def run_once(self) -> None:
223
223
if self .reader :
224
224
data : bytes = await self .readcot ()
225
225
if data :
226
- self ._logger .debug ("RX: %s" , data )
226
+ self ._logger .debug ("RX data : %s" , data )
227
227
self .queue .put_nowait (data )
228
228
229
- async def run (self ) -> None :
229
+ async def run (self , _ = - 1 ) -> None :
230
230
"""Run this worker."""
231
- self ._logger .info ("Run : %s" , self .__class__ .__name__ )
231
+ self ._logger .info ("Running : %s" , self .__class__ .__name__ )
232
232
while True :
233
233
await self .run_once ()
234
234
await asyncio .sleep (0 ) # make sure other tasks have a chance to run
235
235
236
+
236
237
class QueueWorker (Worker ):
237
238
"""Read non-CoT Messages from an async network client.
238
239
0 commit comments