Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Separate network hardware interface from mqtt client #57

Open
wants to merge 19 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,11 @@ An effective PC client and server is [mosquitto](https://mosquitto.org/).

# This repository

This contains two separate projects:
This contains three separate projects:
1. A "resilient" asynchronous non-blocking MQTT driver.
2. A means of using a cheap ESP8266 module to bring MQTT to MicroPython
platforms which lack a WiFi interface.
3. A basic network hardware controller (WLAN) which the mqtt client uses

## 1. The "resilient" driver

Expand Down
1 change: 1 addition & 0 deletions mqtt_as/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
from .mqtt_as import MQTTClient, config # allow backwards compatible import statements
92 changes: 92 additions & 0 deletions mqtt_as/interfaces/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
from uerrno import EINPROGRESS, ETIMEDOUT
import usocket
import uasyncio as asyncio


async def _g():
pass


_type_coro = type(_g())


# If a callback is passed, run it and return.
# If a coro is passed initiate it and return.
# coros are passed by name i.e. not using function call syntax.
def launch(func, tup_args):
res = func(*tup_args)
if isinstance(res, _type_coro):
res = asyncio.create_task(res)
return res
Comment on lines +13 to +20
Copy link

@spacemanspiff2007 spacemanspiff2007 Jul 9, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While I get while you would want functions I deliberately chose to allow only async functions.
If you have a longer task and and you launch it with create task and it's duration is e.g. 10 secs you have a state change the next callback will run in parallel to the already launched tasks and anything can happen.
If you only allow async functions then you can at least try to cancel them.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry but I think you somehow mixed callbacks and coroutines in this answer and now it doesn't really make much sense but I think I get your point.
Callbacks would be the better option for wifi state changes because they have to be executed quickly without any delays in it.
Coroutines can cause the problem of multiple coroutines running in parallel if the state changes quickly, which is quite possible. Cancelling those is not implemented in this simple interface and the user can't cancel them either because the tasks are not bound anywhere.

Personally I'd go with callbacks only and not allow coroutines but I have to think about backwards compatibility and the original mqtt_as has a wifi_coro, a coroutine. So I have to allow it in order to not break existing implementations.

We have the same problem with the connected_coro in mqtt_as, which isn't a callback either. The only reason neither coroutines cause trouble in mqtt_as is that there's a 5 seconds delay in the wlan connect to ensure wlan stability. Most coroutines should have finished after 5 seconds. But if we use a different interface or reduce the wlan reconnects, this will become a problem.
But again, backwards compatibility.. can't change that. Have to keep it that way. However, the wifi connected callback is now called directly after the connection is created and not after the broker was connected. This might result in more wifi callbacks as the wifi could get disconnected quickly if the connection is bad but I think it's an acceptable compromise and not many people will do much in the wifi connected callback because in mqtt_as this doesn't really mean anything because wifi will get disconnected again if the broker connection fails. (which is generally something I'd like to change in a more advanced interface because if your broker vanishes e.g. because you changed its ip adress or switched devices, your micropython device becomes inaccessible because you only have 5 seconds to establich a webrepl connection and interrupt the reconnect process..)

This is the reason why in my project, I subclassed the mqtt client and only have one connected coroutine that launches all my custom coroutines and also cancels all those tasks if the wifi gets disconnected again before it could finish (the coroutines just subscribe to the topics again, which would get paused if the client gets disconnected, so those would easily stack up).

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry but I think you somehow mixed callbacks and coroutines in this answer and now it doesn't really make much sense but I think I get your point.

Is a coro that is called on connect not a callback? I am not sure about the correct terminology.

Personally I'd go with callbacks only and not allow coroutines but I have to think about backwards compatibility and the original mqtt_as has a wifi_coro, a coroutine. So I have to allow it in order to not break existing implementations.

If the has a long running sync callback things still can block, that's why I think a coro is better because it's immediately clear they have to be non blocking. And if it's all coros they have to be canceled by the base class.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Callbacks in python (afaik) are always synchronous functions. Even the asyncio module in Python uses callbacks on certain Task events and those are all synchronous functions. Therefore my distinction between a callback=synchronous function and a coroutine=asynchronous function.

If the has a long running sync callback things still can block, that's why I think a coro is better because it's immediately clear they have to be non blocking. And if it's all coros they have to be canceled by the base class.

A long sync function will block no matter where it is implemented. If someone writes blocking code, he will do so no matter if the context is a callback or a coroutine.
If someone assumes he can write blocking code while using uasyncio, he doesn't undertand the environment he's programming in.
That said, callbacks are always supposed to be short whereas when using coroutines, people might write long code, which is not blocking but can take several seconds to finish.
Of course, not a problem if that task can be cancelled, but that would require the user to be aware of it and make their code handle cancellation correctly, which could be problematic for people new to python/asyncio and therefore it's best to stay away from that. Also it would make the base class more complex, which I'd like to avoid, not many will need that feature. I myself only log the connection state to the console in my project. A very short callback.

(As mentioned elsewhere my connected coro is a lot longer and more complex and actually cancels all tasks if the connection state changes. But that isn't part of the hardware interface but mqtt_as..)



class BaseInterface:
def __init__(self, socket=None):
# Legitimate errors while waiting on a socket. See uasyncio __init__.py open_connection().
self.BUSY_ERRORS = [EINPROGRESS, ETIMEDOUT]
self.socket = socket or usocket # support for custom socket implementations
self._subs = []
self._state = None

async def connect(self):
"""Serve connect request. Triggers callbacks if state changes"""
if await self._connect():
self._change_state(True)
return True
return False
Comment on lines +31 to +36

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There should be a variable that prevents the method from being called when it's already running (e.g. self._is_connecting).
Connecting can take up huge amounts of time and it doesn't make sense to have those running in parallel.
With the variable it's possible to just spawn connect tasks.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do agree actually. It is not a problem if only one project/library is controlling the interface but if multiple interfaces try it, then it becomes a problem. A 2nd interface calling connect should just wait for the result of the 1st call.
However, it might be better to create a more advanced interface blueprint in order to keep this one as simple as possible and as close to the source as possible. The goal of the current files is to just separate mqtt_as from the hardware interface without adding features or complexity.
I do agree that more features and complexity is needed for a safe approach handling multiple libraries. So once the current changes/approach is approved, I should add such a blueprint (by making the wlan interfaces better).


async def _connect(self):
"""Hardware specific connect method"""
# return True # if connection is successful, otherwise False
raise NotImplementedError()

async def disconnect(self):
"""Serve disconnect request. Triggers callbacks if state changes"""
if await self._disconnect():
self._change_state(False)
return True
return False

async def _disconnect(self):
"""Hardware specific disconnect method"""
# return True # if disconnect is successful, otherwise False
raise NotImplementedError()

async def reconnect(self):
"""Serve reconnect request"""
return await self._reconnect()

async def _reconnect(self):
"""Hardware specific reconnect method"""
if await self._disconnect():
return await self._connect()
return False

def isconnected(self):

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

my choice would be is_connected because I think it's much more readable

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

while I don't disagree, the micropython network module uses isconnected() and so does mqtt_as so I think it makes more sense to go with that.

""""Checks if the interface is connected. Triggers callbacks if state changes"""
st = self._isconnected()
self._change_state(st)
return st

def _isconnected(self):
"""Hardware specific isconnected method"""
raise NotImplementedError()

def _change_state(self, state):
"""
Private method executing all callbacks or creating asyncio tasks
on connection state changes
"""
st = self._state
if st != state:
self._state = state
if st is None and state is False:
# not triggering disconnect cbs when interface state was unknown
# (probably disconnected on startup)
return
for cb in self._subs:
launch(cb, (state,))

def subscribe(self, cb):
"""Subscribe to interface connection state changes"""
self._subs.append(cb)
12 changes: 12 additions & 0 deletions mqtt_as/interfaces/linux.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
from . import BaseInterface


class Linux(BaseInterface):
async def _disconnect(self):
return True # just to prevent errors we'll pretend to be disconnected.

def _isconnected(self):
return True # always connected.

async def _connect(self):
return True # always connected or can't do anything about it
11 changes: 11 additions & 0 deletions mqtt_as/interfaces/wlan/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
from sys import platform

if platform == "esp8266":
from .esp8266 import WLAN
elif platform == "esp32":
from .esp32 import WLAN
elif platform == "pyboard":
from .pyboard import WLAN
else:
# just try esp32 implementation. Seems most mature.
from .esp32 import WLAN
20 changes: 20 additions & 0 deletions mqtt_as/interfaces/wlan/esp32.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
from .wlan_base import BaseWLAN
import network
import uasyncio as asyncio


class WLAN(BaseWLAN):
def __init__(self, ssid, wifi_pw):
super().__init__(ssid, wifi_pw)
# https://forum.micropython.org/viewtopic.php?f=16&t=3608&p=20942#p20942
self.BUSY_ERRORS += [118, 119] # Add in weird ESP32 errors

async def _connect(self):
s = self._sta_if
s.active(True)
if not s.isconnected():
s.connect(self._ssid, self._wifi_pw)
while s.status() == network.STAT_CONNECTING: # Break out on fail or success. Check once per sec.
await asyncio.sleep(1)

return await self._check_reliability()
30 changes: 30 additions & 0 deletions mqtt_as/interfaces/wlan/esp8266.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
from .wlan_base import BaseWLAN
import network
import uasyncio as asyncio


class WLAN(BaseWLAN):
def __init__(self, ssid=None, wifi_pw=None):
super().__init__(ssid, wifi_pw)
import esp
esp.sleep_type(0) # Improve connection integrity at cost of power consumption.

async def _connect(self):
s = self._sta_if
if s.isconnected(): # 1st attempt, already connected.
return True
s.active(True)
s.connect() # ESP8266 remembers connection.
for _ in range(60):
if s.status() != network.STAT_CONNECTING: # Break out on fail or success. Check once per sec.
break
await asyncio.sleep(1)
if s.status() == network.STAT_CONNECTING: # might hang forever awaiting dhcp lease renewal or something else
s.disconnect()
await asyncio.sleep(1)
if not s.isconnected() and self._ssid is not None and self._wifi_pw is not None:
s.connect(self._ssid, self._wifi_pw)
while s.status() == network.STAT_CONNECTING: # Break out on fail or success. Check once per sec.
await asyncio.sleep(1)

return await self._check_reliability()
17 changes: 17 additions & 0 deletions mqtt_as/interfaces/wlan/pyboard.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
from .wlan_base import BaseWLAN
import uasyncio as asyncio


class WLAN(BaseWLAN):
def __init__(self, ssid, wifi_pw):
super().__init__(ssid, wifi_pw)

async def _connect(self):
s = self._sta_if
s.active(True)
s.connect(self._ssid, self._wifi_pw)
# Pyboard doesn't yet have STAT_CONNECTING constant
while s.status() in (1, 2):
await asyncio.sleep(1)

return await self._check_reliability()
37 changes: 37 additions & 0 deletions mqtt_as/interfaces/wlan/wlan_base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
from .. import BaseInterface
import network
import uasyncio as asyncio


class BaseWLAN(BaseInterface):
def __init__(self, ssid=None, wifi_pw=None):
super().__init__()
self.DEBUG = False
self._ssid = ssid
self._wifi_pw = wifi_pw
# wifi credentials required for ESP32 / Pyboard D. Optional ESP8266
self._sta_if = network.WLAN(network.STA_IF)
self._sta_if.active(True)

async def _check_reliability(self):
s = self._sta_if
if not s.isconnected():
return False
# Ensure connection stays up for a few secs.
if self.DEBUG:
print('Checking WiFi integrity.')
for _ in range(5):
if not s.isconnected():
return False # in 1st 5 secs
await asyncio.sleep(1)
if self.DEBUG:
print('Got reliable connection')
return True

async def _disconnect(self):
self._sta_if.disconnect()
await asyncio.sleep(1)
return True # not checking if really disconnected.

def _isconnected(self):
return self._sta_if.isconnected()
Loading