Skip to content

Commit f6b93c9

Browse files
committed
Initial commit
0 parents  commit f6b93c9

File tree

8 files changed

+829
-0
lines changed

8 files changed

+829
-0
lines changed

__init__.py

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
# MicroPython uasyncio module
2+
# MIT license; Copyright (c) 2019 Damien P. George
3+
4+
from .core import *
5+
6+
__version__ = (3, 0, 0)
7+
8+
_attrs = {
9+
"wait_for": "funcs",
10+
"wait_for_ms": "funcs",
11+
"gather": "funcs",
12+
"Event": "event",
13+
"Lock": "lock",
14+
"open_connection": "stream",
15+
"start_server": "stream",
16+
"StreamReader": "stream",
17+
"StreamWriter": "stream",
18+
}
19+
20+
# Lazy loader, effectively does:
21+
# global attr
22+
# from .mod import attr
23+
def __getattr__(attr):
24+
mod = _attrs.get(attr, None)
25+
if mod is None:
26+
raise AttributeError(attr)
27+
value = getattr(__import__(mod, None, None, True, 1), attr)
28+
globals()[attr] = value
29+
return value

core.py

Lines changed: 287 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,287 @@
1+
# MicroPython uasyncio module
2+
# MIT license; Copyright (c) 2019 Damien P. George
3+
4+
from utime import ticks_ms as ticks, ticks_diff, ticks_add
5+
import usys as sys
6+
import uselect as select
7+
8+
# Import TaskQueue and Task, preferring built-in C code over Python code
9+
try:
10+
from _uasyncio import TaskQueue, Task
11+
except:
12+
from .task import TaskQueue, Task
13+
14+
15+
################################################################################
16+
# Exceptions
17+
18+
19+
class CancelledError(BaseException):
20+
pass
21+
22+
23+
class TimeoutError(Exception):
24+
pass
25+
26+
27+
# Used when calling Loop.call_exception_handler
28+
_exc_context = {"message": "Task exception wasn't retrieved", "exception": None, "future": None}
29+
30+
31+
################################################################################
32+
# Sleep functions
33+
34+
# "Yield" once, then raise StopIteration
35+
class SingletonGenerator:
36+
def __init__(self):
37+
self.state = None
38+
self.exc = StopIteration()
39+
40+
def __iter__(self):
41+
return self
42+
43+
def __next__(self):
44+
if self.state is not None:
45+
_task_queue.push_sorted(cur_task, self.state)
46+
self.state = None
47+
return None
48+
else:
49+
self.exc.__traceback__ = None
50+
raise self.exc
51+
52+
53+
# Pause task execution for the given time (integer in milliseconds, uPy extension)
54+
# Use a SingletonGenerator to do it without allocating on the heap
55+
def sleep_ms(t, sgen=SingletonGenerator()):
56+
assert sgen.state is None
57+
sgen.state = ticks_add(ticks(), max(0, t))
58+
return sgen
59+
60+
61+
# Pause task execution for the given time (in seconds)
62+
def sleep(t):
63+
return sleep_ms(int(t * 1000))
64+
65+
66+
################################################################################
67+
# Queue and poller for stream IO
68+
69+
70+
class IOQueue:
71+
def __init__(self):
72+
self.poller = select.poll()
73+
self.map = {} # maps id(stream) to [task_waiting_read, task_waiting_write, stream]
74+
75+
def _enqueue(self, s, idx):
76+
if id(s) not in self.map:
77+
entry = [None, None, s]
78+
entry[idx] = cur_task
79+
self.map[id(s)] = entry
80+
self.poller.register(s, select.POLLIN if idx == 0 else select.POLLOUT)
81+
else:
82+
sm = self.map[id(s)]
83+
assert sm[idx] is None
84+
assert sm[1 - idx] is not None
85+
sm[idx] = cur_task
86+
self.poller.modify(s, select.POLLIN | select.POLLOUT)
87+
# Link task to this IOQueue so it can be removed if needed
88+
cur_task.data = self
89+
90+
def _dequeue(self, s):
91+
del self.map[id(s)]
92+
self.poller.unregister(s)
93+
94+
def queue_read(self, s):
95+
self._enqueue(s, 0)
96+
97+
def queue_write(self, s):
98+
self._enqueue(s, 1)
99+
100+
def remove(self, task):
101+
while True:
102+
del_s = None
103+
for k in self.map: # Iterate without allocating on the heap
104+
q0, q1, s = self.map[k]
105+
if q0 is task or q1 is task:
106+
del_s = s
107+
break
108+
if del_s is not None:
109+
self._dequeue(s)
110+
else:
111+
break
112+
113+
def wait_io_event(self, dt):
114+
for s, ev in self.poller.ipoll(dt):
115+
sm = self.map[id(s)]
116+
# print('poll', s, sm, ev)
117+
if ev & ~select.POLLOUT and sm[0] is not None:
118+
# POLLIN or error
119+
_task_queue.push_head(sm[0])
120+
sm[0] = None
121+
if ev & ~select.POLLIN and sm[1] is not None:
122+
# POLLOUT or error
123+
_task_queue.push_head(sm[1])
124+
sm[1] = None
125+
if sm[0] is None and sm[1] is None:
126+
self._dequeue(s)
127+
elif sm[0] is None:
128+
self.poller.modify(s, select.POLLOUT)
129+
else:
130+
self.poller.modify(s, select.POLLIN)
131+
132+
133+
################################################################################
134+
# Main run loop
135+
136+
# Ensure the awaitable is a task
137+
def _promote_to_task(aw):
138+
return aw if isinstance(aw, Task) else create_task(aw)
139+
140+
141+
# Create and schedule a new task from a coroutine
142+
def create_task(coro):
143+
if not hasattr(coro, "send"):
144+
raise TypeError("coroutine expected")
145+
t = Task(coro, globals())
146+
_task_queue.push_head(t)
147+
return t
148+
149+
150+
# Keep scheduling tasks until there are none left to schedule
151+
def run_until_complete(main_task=None):
152+
global cur_task
153+
excs_all = (CancelledError, Exception) # To prevent heap allocation in loop
154+
excs_stop = (CancelledError, StopIteration) # To prevent heap allocation in loop
155+
while True:
156+
# Wait until the head of _task_queue is ready to run
157+
dt = 1
158+
while dt > 0:
159+
dt = -1
160+
t = _task_queue.peek()
161+
if t:
162+
# A task waiting on _task_queue; "ph_key" is time to schedule task at
163+
dt = max(0, ticks_diff(t.ph_key, ticks()))
164+
elif not _io_queue.map:
165+
# No tasks can be woken so finished running
166+
return
167+
# print('(poll {})'.format(dt), len(_io_queue.map))
168+
_io_queue.wait_io_event(dt)
169+
170+
# Get next task to run and continue it
171+
t = _task_queue.pop_head()
172+
cur_task = t
173+
try:
174+
# Continue running the coroutine, it's responsible for rescheduling itself
175+
exc = t.data
176+
if not exc:
177+
t.coro.send(None)
178+
else:
179+
t.data = None
180+
t.coro.throw(exc)
181+
except excs_all as er:
182+
# Check the task is not on any event queue
183+
assert t.data is None
184+
# This task is done, check if it's the main task and then loop should stop
185+
if t is main_task:
186+
if isinstance(er, StopIteration):
187+
return er.value
188+
raise er
189+
# Schedule any other tasks waiting on the completion of this task
190+
waiting = False
191+
if hasattr(t, "waiting"):
192+
while t.waiting.peek():
193+
_task_queue.push_head(t.waiting.pop_head())
194+
waiting = True
195+
t.waiting = None # Free waiting queue head
196+
if not waiting and not isinstance(er, excs_stop):
197+
# An exception ended this detached task, so queue it for later
198+
# execution to handle the uncaught exception if no other task retrieves
199+
# the exception in the meantime (this is handled by Task.throw).
200+
_task_queue.push_head(t)
201+
# Indicate task is done by setting coro to the task object itself
202+
t.coro = t
203+
# Save return value of coro to pass up to caller
204+
t.data = er
205+
206+
207+
# Create a new task from a coroutine and run it until it finishes
208+
def run(coro):
209+
return run_until_complete(create_task(coro))
210+
211+
212+
################################################################################
213+
# Event loop wrapper
214+
215+
216+
async def _stopper():
217+
pass
218+
219+
220+
_stop_task = None
221+
222+
223+
class Loop:
224+
_exc_handler = None
225+
226+
@staticmethod
227+
def create_task(coro):
228+
return create_task(coro)
229+
230+
@staticmethod
231+
def run_forever():
232+
global _stop_task
233+
_stop_task = Task(_stopper(), globals())
234+
run_until_complete(_stop_task)
235+
# TODO should keep running until .stop() is called, even if there're no tasks left
236+
237+
@staticmethod
238+
def run_until_complete(aw):
239+
return run_until_complete(_promote_to_task(aw))
240+
241+
@staticmethod
242+
def stop():
243+
global _stop_task
244+
if _stop_task is not None:
245+
_task_queue.push_head(_stop_task)
246+
# If stop() is called again, do nothing
247+
_stop_task = None
248+
249+
@staticmethod
250+
def close():
251+
pass
252+
253+
@staticmethod
254+
def set_exception_handler(handler):
255+
Loop._exc_handler = handler
256+
257+
@staticmethod
258+
def get_exception_handler():
259+
return Loop._exc_handler
260+
261+
@staticmethod
262+
def default_exception_handler(loop, context):
263+
print(context["message"])
264+
print("future:", context["future"], "coro=", context["future"].coro)
265+
sys.print_exception(context["exception"])
266+
267+
@staticmethod
268+
def call_exception_handler(context):
269+
(Loop._exc_handler or Loop.default_exception_handler)(Loop, context)
270+
271+
272+
# The runq_len and waitq_len arguments are for legacy uasyncio compatibility
273+
def get_event_loop(runq_len=0, waitq_len=0):
274+
return Loop
275+
276+
277+
def new_event_loop():
278+
global _task_queue, _io_queue
279+
# TaskQueue of Task instances
280+
_task_queue = TaskQueue()
281+
# Task queue and poller for stream IO
282+
_io_queue = IOQueue()
283+
return Loop
284+
285+
286+
# Initialise default event loop
287+
new_event_loop()

event.py

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
# MicroPython uasyncio module
2+
# MIT license; Copyright (c) 2019-2020 Damien P. George
3+
4+
from . import core
5+
6+
# Event class for primitive events that can be waited on, set, and cleared
7+
class Event:
8+
def __init__(self):
9+
self.state = False # False=unset; True=set
10+
self.waiting = core.TaskQueue() # Queue of Tasks waiting on completion of this event
11+
12+
def is_set(self):
13+
return self.state
14+
15+
def set(self):
16+
# Event becomes set, schedule any tasks waiting on it
17+
while self.waiting.peek():
18+
core._task_queue.push_head(self.waiting.pop_head())
19+
self.state = True
20+
21+
def clear(self):
22+
self.state = False
23+
24+
async def wait(self):
25+
if not self.state:
26+
# Event not set, put the calling task on the event's waiting queue
27+
self.waiting.push_head(core.cur_task)
28+
# Set calling task's data to the event's queue so it can be removed if needed
29+
core.cur_task.data = self.waiting
30+
yield
31+
return True

0 commit comments

Comments
 (0)