Skip to content

Commit f3579c1

Browse files
agramgurkadevbis
authored andcommitted
Rewrote program loop handling with asyncio.run function
1 parent c3d8e0e commit f3579c1

File tree

2 files changed

+26
-34
lines changed

2 files changed

+26
-34
lines changed

lumimqtt/__main__.py

Lines changed: 18 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
import json
33
import logging
44
import os
5+
import signal
56
from uuid import getnode as get_mac
67

78
from .__version__ import version
@@ -29,9 +30,12 @@ def read_mac():
2930
return mac
3031

3132

32-
def main():
33+
def signal_handler():
34+
raise KeyboardInterrupt()
35+
36+
37+
async def amain():
3338
logging.basicConfig(level='INFO')
34-
loop = aio.new_event_loop()
3539

3640
os.environ.setdefault('LUMIMQTT_CONFIG', '/etc/lumimqtt.json')
3741
config = {}
@@ -62,7 +66,6 @@ def main():
6266
replace('{MAC}', device_id) # support old configs
6367
server = LumiMqtt(
6468
reconnection_interval=10,
65-
loop=loop,
6669
device_id=config['device_id'],
6770
topic_root=topic_root,
6871
host=config['mqtt_host'],
@@ -83,17 +86,23 @@ def main():
8386
):
8487
server.register(device)
8588

89+
loop = aio.get_running_loop()
90+
for _signal in (signal.SIGTERM, signal.SIGQUIT, signal.SIGHUP):
91+
loop.add_signal_handler(_signal, signal_handler)
92+
8693
try:
8794
logger.info(f'Start lumimqtt {version}')
88-
loop.run_until_complete(server.start())
95+
await server.start()
96+
finally:
97+
await server.close()
98+
99+
100+
def main():
101+
try:
102+
aio.run(amain())
89103
except KeyboardInterrupt:
90104
pass
91105

92-
finally:
93-
loop.run_until_complete(server.close())
94-
loop.run_until_complete(loop.shutdown_asyncgens())
95-
loop.close()
96-
97106

98107
if __name__ == '__main__':
99108
main()

lumimqtt/lumimqtt.py

Lines changed: 8 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,6 @@ def __init__(
4747
sensor_debounce_period: int,
4848
light_transition_period: float,
4949
light_notification_period: float,
50-
loop: ty.Optional[aio.AbstractEventLoop] = None,
5150
) -> None:
5251
self.dev_id = device_id
5352
self._topic_root = topic_root
@@ -71,11 +70,9 @@ def __init__(
7170
self._light_transition_period = light_transition_period
7271
self._light_notification_period = light_notification_period
7372
self._light_last_sent = None
74-
7573
self._reconnection_interval = reconnection_interval
76-
self._loop = loop or aio.get_event_loop()
7774
self._client = aio_mqtt.Client(
78-
loop=self._loop,
75+
loop=aio.get_running_loop(),
7976
client_id_prefix='lumimqtt_',
8077
)
8178
self._tasks: ty.List[aio.Future] = []
@@ -89,33 +86,19 @@ def __init__(
8986

9087
async def start(self):
9188
self._tasks = [
92-
self._loop.create_task(self._connect_forever()),
93-
self._loop.create_task(self._handle_messages()),
94-
self._loop.create_task(self._periodic_publish()),
95-
self._loop.create_task(self._handle_buttons()),
89+
aio.create_task(self._connect_forever()),
90+
aio.create_task(self._handle_messages()),
91+
aio.create_task(self._periodic_publish()),
92+
aio.create_task(self._handle_buttons()),
9693
]
97-
finished, unfinished = await aio.wait(
94+
finished, _ = await aio.wait(
9895
self._tasks,
9996
return_when=aio.FIRST_COMPLETED,
10097
)
101-
for t in unfinished:
102-
t.cancel()
103-
try:
104-
await t
105-
except aio.CancelledError:
106-
pass
10798
for t in finished:
10899
t.result()
109100

110101
async def close(self) -> None:
111-
for task in self._tasks:
112-
if task.done():
113-
continue
114-
task.cancel()
115-
try:
116-
await task
117-
except aio.CancelledError:
118-
pass
119102
if self._client.is_connected():
120103
await self._client.disconnect()
121104

@@ -208,7 +191,7 @@ async def _handle_messages(self) -> None:
208191
except ValueError as e:
209192
logger.exception(str(e))
210193
continue
211-
new_light_task = self._loop.create_task(self._light_handler(light, value))
194+
new_light_task = aio.create_task(self._light_handler(light, value))
212195
running_message_tasks.append(new_light_task)
213196
continue
214197
command: ty.Optional[Command] = None
@@ -222,7 +205,7 @@ async def _handle_messages(self) -> None:
222205
value = json.loads(message.payload)
223206
except ValueError:
224207
value = message.payload.decode()
225-
new_command_task = self._loop.create_task(self._command_handler(command, value))
208+
new_command_task = aio.create_task(self._command_handler(command, value))
226209
running_message_tasks.append(new_command_task)
227210
except aio.CancelledError:
228211
for task in running_message_tasks:

0 commit comments

Comments
 (0)