-
Notifications
You must be signed in to change notification settings - Fork 137
/
Copy path__init__.py
92 lines (74 loc) · 2.79 KB
/
__init__.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
from uerrno import EINPROGRESS, ETIMEDOUT
import usocket
import uasyncio as asyncio
async def _g():
pass
_type_coro = type(_g())
# If a callback is passed, run it and return.
# If a coro is passed initiate it and return.
# coros are passed by name i.e. not using function call syntax.
def launch(func, tup_args):
res = func(*tup_args)
if isinstance(res, _type_coro):
res = asyncio.create_task(res)
return res
class BaseInterface:
def __init__(self, socket=None):
# Legitimate errors while waiting on a socket. See uasyncio __init__.py open_connection().
self.BUSY_ERRORS = [EINPROGRESS, ETIMEDOUT]
self.socket = socket or usocket # support for custom socket implementations
self._subs = []
self._state = None
async def connect(self):
"""Serve connect request. Triggers callbacks if state changes"""
if await self._connect():
self._change_state(True)
return True
return False
async def _connect(self):
"""Hardware specific connect method"""
# return True # if connection is successful, otherwise False
raise NotImplementedError()
async def disconnect(self):
"""Serve disconnect request. Triggers callbacks if state changes"""
if await self._disconnect():
self._change_state(False)
return True
return False
async def _disconnect(self):
"""Hardware specific disconnect method"""
# return True # if disconnect is successful, otherwise False
raise NotImplementedError()
async def reconnect(self):
"""Serve reconnect request"""
return await self._reconnect()
async def _reconnect(self):
"""Hardware specific reconnect method"""
if await self._disconnect():
return await self._connect()
return False
def isconnected(self):
""""Checks if the interface is connected. Triggers callbacks if state changes"""
st = self._isconnected()
self._change_state(st)
return st
def _isconnected(self):
"""Hardware specific isconnected method"""
raise NotImplementedError()
def _change_state(self, state):
"""
Private method executing all callbacks or creating asyncio tasks
on connection state changes
"""
st = self._state
if st != state:
self._state = state
if st is None and state is False:
# not triggering disconnect cbs when interface state was unknown
# (probably disconnected on startup)
return
for cb in self._subs:
launch(cb, (state,))
def subscribe(self, cb):
"""Subscribe to interface connection state changes"""
self._subs.append(cb)