Skip to content

Commit c3d8e0e

Browse files
agramgurkadevbis
authored andcommitted
Add parallel message handling.
1 parent 219e3fb commit c3d8e0e

File tree

2 files changed

+73
-46
lines changed

2 files changed

+73
-46
lines changed

lumimqtt/commands.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -58,12 +58,12 @@ async def run_command(self, value):
5858
with self.fix_watcher():
5959
proc = await aio.create_subprocess_shell(
6060
command,
61-
loop=aio.get_event_loop(),
6261
stdout=subprocess.PIPE,
6362
stderr=subprocess.PIPE,
6463
)
6564
await proc.wait()
6665

6766
async def set(self, value):
68-
logger.info(f'{self.name}: run command with params: {value}')
67+
logger.info(f'{self.name}: run command with params: {value}.')
6968
await self.run_command(value)
69+
logger.info(f'{self.name} is done.')

lumimqtt/lumimqtt.py

Lines changed: 71 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
21
"""
32
LUMI MQTT handler
43
"""
@@ -22,6 +21,9 @@
2221
logger = logging.getLogger(__name__)
2322

2423

24+
RECONNECTION_LIMIT = 5
25+
26+
2527
@dataclass
2628
class DebounceSensor:
2729
value: ty.Any
@@ -142,12 +144,59 @@ def subscribed_topics(self):
142144
return [self._get_topic(light.topic_set) for light in self.lights] + \
143145
[self._get_topic(cmd.topic_set) for cmd in self.custom_commands]
144146

147+
async def _command_handler(self, command: Command, value):
148+
await command.set(value)
149+
reconnection_counter = 0
150+
while True:
151+
try:
152+
await self._client.publish(
153+
aio_mqtt.PublishableMessage(
154+
topic_name=self._get_topic(command.topic),
155+
payload='OFF',
156+
qos=aio_mqtt.QOSLevel.QOS_1,
157+
),
158+
)
159+
return
160+
except aio_mqtt.ConnectionClosedError:
161+
logger.exception("Connection closed")
162+
reconnection_counter += 1
163+
if reconnection_counter >= RECONNECTION_LIMIT:
164+
raise
165+
await self._client.wait_for_connect()
166+
167+
async def _light_handler(self, light: Light, value):
168+
await light.set(value, self._light_transition_period)
169+
reconnection_counter = 0
170+
while True:
171+
try:
172+
await self._publish_light(light)
173+
return
174+
except aio_mqtt.ConnectionClosedError:
175+
logger.exception("Connection closed")
176+
reconnection_counter += 1
177+
if reconnection_counter >= RECONNECTION_LIMIT:
178+
raise
179+
await self._client.wait_for_connect()
180+
145181
async def _handle_messages(self) -> None:
146-
async for message in self._client.delivered_messages(
147-
f'{self._topic_root}/#',
148-
):
149-
while True:
182+
running_message_tasks: list[aio.Task] = []
183+
try:
184+
async for message in self._client.delivered_messages(
185+
f'{self._topic_root}/#',
186+
):
187+
for task in running_message_tasks:
188+
if task.done():
189+
try:
190+
task.result()
191+
except Exception:
192+
logger.exception(
193+
"Unhandled exception during echo "
194+
"message publishing",
195+
)
196+
finally:
197+
running_message_tasks.remove(task)
150198
if message.topic_name not in self.subscribed_topics:
199+
logger.error("Invalid topic for light")
151200
continue
152201
light: ty.Optional[Light] = None
153202
for _light in self.lights:
@@ -158,23 +207,10 @@ async def _handle_messages(self) -> None:
158207
value = json.loads(message.payload)
159208
except ValueError as e:
160209
logger.exception(str(e))
161-
break
162-
163-
try:
164-
await light.set(value, self._light_transition_period)
165-
await self._publish_light(light)
166-
except aio_mqtt.ConnectionClosedError as e:
167-
logger.error("Connection closed", exc_info=e)
168-
await self._client.wait_for_connect()
169210
continue
170-
171-
except Exception as e:
172-
logger.error(
173-
"Unhandled exception during echo "
174-
"message publishing",
175-
exc_info=e)
176-
break
177-
211+
new_light_task = self._loop.create_task(self._light_handler(light, value))
212+
running_message_tasks.append(new_light_task)
213+
continue
178214
command: ty.Optional[Command] = None
179215
for _command in self.custom_commands:
180216
if message.topic_name == self._get_topic(
@@ -186,30 +222,21 @@ async def _handle_messages(self) -> None:
186222
value = json.loads(message.payload)
187223
except ValueError:
188224
value = message.payload.decode()
225+
new_command_task = self._loop.create_task(self._command_handler(command, value))
226+
running_message_tasks.append(new_command_task)
227+
except aio.CancelledError:
228+
for task in running_message_tasks:
229+
if task.done():
189230
try:
190-
await command.set(value)
191-
await self._client.publish(
192-
aio_mqtt.PublishableMessage(
193-
topic_name=self._get_topic(command.topic),
194-
payload='OFF',
195-
qos=aio_mqtt.QOSLevel.QOS_1,
196-
),
197-
)
198-
except aio_mqtt.ConnectionClosedError as e:
199-
logger.error("Connection closed", exc_info=e)
200-
await self._client.wait_for_connect()
201-
continue
202-
203-
except Exception as e:
204-
logger.error(
205-
"Unhandled exception during echo "
206-
"message publishing",
207-
exc_info=e,
208-
)
209-
break
210-
211-
logger.error("Invalid topic for light")
212-
break
231+
task.result()
232+
except Exception:
233+
pass
234+
else:
235+
task.cancel()
236+
try:
237+
await task
238+
except (Exception, aio.CancelledError):
239+
pass
213240

214241
async def send_config(self):
215242
device = {

0 commit comments

Comments
 (0)