Skip to content

Commit 26cf5a1

Browse files
authored
Refactor mqtt connections (#6)
* Split into methods , add recurrent retry * Fix arguments for disconnect method * Rewrite nasty functions into nested classes
1 parent 7e14d4e commit 26cf5a1

File tree

1 file changed

+71
-21
lines changed

1 file changed

+71
-21
lines changed

app/main.py

+71-21
Original file line numberDiff line numberDiff line change
@@ -75,8 +75,8 @@ def __init__(self, serconf, ecconf, callback=None):
7575
self.rtu = None
7676
self.reinit_count = 3
7777
self.callback = callback
78-
self.brightness = [255, 255, 255]
79-
self.register = [255, 255, 255]
78+
self.brightness = [255] * 3
79+
self.register = [255] * 3
8080
self.lock = threading.Lock()
8181

8282
def __del__(self):
@@ -183,10 +183,9 @@ def set_channel(self, client, userdata, message):
183183
# raise e
184184
else:
185185
time.sleep(0.02)
186+
self.lock.release()
186187
if bool(self.callback):
187-
188188
self.callback(ch, payload)
189-
self.lock.release()
190189

191190

192191
class Mqtt:
@@ -196,28 +195,79 @@ def __init__(self, mqconf, command_topics, state_topics, callback):
196195
self.ctopics = command_topics
197196
self.stopics = state_topics
198197
self.callback = callback
199-
self.connhandlers = []
198+
self.consumers = [None] * 3
200199

201200
def __del__(self):
202201
msg("Stopping all mq connections")
203-
for h in self.connhandlers:
204-
h.loop_stop()
205-
h.disconnect()
206-
207-
def _consume_topic(self, channel):
208-
c = subscribe.Client()
209-
c.on_message = self.callback
210-
c.user_data_set({'channel': channel})
211-
if self.mqconf['username'] is not None:
212-
c.username_pw_set(self.mqconf['username'], password=self.mqconf['password'])
213-
c.connect(self.mqconf['address'], port=self.mqconf['port'], keepalive=15)
214-
c.subscribe(self.ctopics[channel], qos=self.mqconf['qos'])
215-
c.loop_start()
216-
self.connhandlers.append(c)
202+
for h in self.consumers:
203+
if bool(h):
204+
del h
205+
206+
class Consumer:
207+
208+
def __init__(self, mqconf, channel, topic, msg_callback):
209+
self.mqconf = mqconf
210+
self.channel = str(channel)
211+
self.topic = topic
212+
self.msg_callback = msg_callback
213+
214+
self.conn = subscribe.Client()
215+
self.conn.on_message = self.msg_callback
216+
self.conn.on_connect = self._on_connect
217+
self.conn.on_disconnect = self._on_disconnect
218+
self.conn.user_data_set({'channel': self.channel})
219+
220+
if self.mqconf['username'] is not None:
221+
self.conn.username_pw_set(self.mqconf['username'],
222+
password=self.mqconf['password'])
223+
224+
self._connect()
225+
# subscribe is executed via _on_connect
226+
self.conn.loop_start()
227+
228+
def __del__(self):
229+
if bool(self.conn):
230+
msg("Channel%s : Closing connection" % self.channel)
231+
self.conn.loop_stop()
232+
self.conn.disconnect()
233+
del self.conn
234+
235+
def _connect(self, depth=0):
236+
try:
237+
self.conn.connect(self.mqconf['address'], port=self.mqconf['port'], keepalive=15)
238+
except Exception as e:
239+
msg("Channel%s : Connection failed : %s" % (self.channel, str(e)))
240+
depth += 1
241+
if depth <= 60:
242+
msg("Channel%s : Waiting 10 seconds before reconnecting ..." % self.channel)
243+
time.sleep(10)
244+
self._connect(depth)
245+
else:
246+
msg("Channel%s : Reconnecting was failing for too long ..." % self.channel)
247+
raise e
248+
249+
def _subscribe(self):
250+
try:
251+
self.conn.subscribe(self.topic, qos=self.mqconf['qos'])
252+
except Exception as e:
253+
msg("Channel%s: Subscription exception : %s" % (self.channel, str(e)))
254+
raise e
255+
256+
def _on_connect(self, client, userdata, flags, rc):
257+
msg("Channel%s : Connected" % self.channel)
258+
self._subscribe()
259+
260+
def _on_disconnect(self, client, userdata, rc):
261+
msg("Channel%s : Disconnected" % self.channel)
217262

218263
def consume_all(self):
219264
for ch, topic in self.ctopics.items():
220-
self._consume_topic(ch)
265+
try:
266+
c = self.Consumer(self.mqconf, ch, topic, self.callback)
267+
except Exception as e:
268+
raise e
269+
else:
270+
self.consumers[int(ch)] = c
221271

222272
def postback(self, ch, payload):
223273
auth = None
@@ -227,7 +277,7 @@ def postback(self, ch, payload):
227277
}
228278

229279
# homeassistant lack proper typing
230-
# on the other side json module is constantly puting double quotation marks around int !
280+
# on the other side json module is constantly puting double quotation marks around ints ...
231281
payload_str = "{\"state\": \"%s\", \"brightness\": %s}" % (payload['state'], payload['brightness'])
232282

233283
try:

0 commit comments

Comments
 (0)