Skip to content

Commit 9d69aa2

Browse files
committed
update asyncio from MicroPython v1.19.1
1 parent 18a411e commit 9d69aa2

File tree

6 files changed

+125
-73
lines changed

6 files changed

+125
-73
lines changed

asyncio/core.py

+14-8
Original file line numberDiff line numberDiff line change
@@ -62,13 +62,14 @@ def __await__(self):
6262

6363
def __next__(self):
6464
if self.state is not None:
65-
_task_queue.push_sorted(cur_task, self.state)
65+
_task_queue.push(cur_task, self.state)
6666
self.state = None
6767
return None
6868
else:
6969
self.exc.__traceback__ = None
7070
raise self.exc
7171

72+
7273
# Pause task execution for the given time (integer in milliseconds, uPy extension)
7374
# Use a SingletonGenerator to do it without allocating on the heap
7475
def sleep_ms(t, sgen=SingletonGenerator()):
@@ -178,11 +179,11 @@ def wait_io_event(self, dt):
178179
# print('poll', s, sm, ev)
179180
if ev & ~select.POLLOUT and sm[0] is not None:
180181
# POLLIN or error
181-
_task_queue.push_head(sm[0])
182+
_task_queue.push(sm[0])
182183
sm[0] = None
183184
if ev & ~select.POLLIN and sm[1] is not None:
184185
# POLLOUT or error
185-
_task_queue.push_head(sm[1])
186+
_task_queue.push(sm[1])
186187
sm[1] = None
187188
if sm[0] is None and sm[1] is None:
188189
self._dequeue(s)
@@ -210,7 +211,7 @@ def create_task(coro):
210211
if not hasattr(coro, "send"):
211212
raise TypeError("coroutine expected")
212213
t = Task(coro, globals())
213-
_task_queue.push_head(t)
214+
_task_queue.push(t)
214215
return t
215216

216217

@@ -237,7 +238,7 @@ def run_until_complete(main_task=None):
237238
_io_queue.wait_io_event(dt)
238239

239240
# Get next task to run and continue it
240-
t = _task_queue.pop_head()
241+
t = _task_queue.pop()
241242
cur_task = t
242243
try:
243244
# Continue running the coroutine, it's responsible for rescheduling itself
@@ -265,18 +266,23 @@ def run_until_complete(main_task=None):
265266
if t.state is True:
266267
# "None" indicates that the task is complete and not await'ed on (yet).
267268
t.state = None
269+
elif callable(t.state):
270+
# The task has a callback registered to be called on completion.
271+
t.state(t, er)
272+
t.state = False
273+
waiting = True
268274
else:
269275
# Schedule any other tasks waiting on the completion of this task.
270276
while t.state.peek():
271-
_task_queue.push_head(t.state.pop_head())
277+
_task_queue.push(t.state.pop())
272278
waiting = True
273279
# "False" indicates that the task is complete and has been await'ed on.
274280
t.state = False
275281
if not waiting and not isinstance(er, excs_stop):
276282
# An exception ended this detached task, so queue it for later
277283
# execution to handle the uncaught exception if no other task retrieves
278284
# the exception in the meantime (this is handled by Task.throw).
279-
_task_queue.push_head(t)
285+
_task_queue.push(t)
280286
# Save return value of coro to pass up to caller.
281287
t.data = er
282288
elif t.state is None:
@@ -338,7 +344,7 @@ def stop():
338344

339345
global _stop_task
340346
if _stop_task is not None:
341-
_task_queue.push_head(_stop_task)
347+
_task_queue.push(_stop_task)
342348
# If stop() is called again, do nothing
343349
_stop_task = None
344350

asyncio/event.py

+3-6
Original file line numberDiff line numberDiff line change
@@ -25,9 +25,7 @@ class Event:
2525

2626
def __init__(self):
2727
self.state = False # False=unset; True=set
28-
self.waiting = (
29-
core.TaskQueue()
30-
) # Queue of Tasks waiting on completion of this event
28+
self.waiting = core.TaskQueue() # Queue of Tasks waiting on completion of this event
3129

3230
def is_set(self):
3331
"""Returns ``True`` if the event is set, ``False`` otherwise."""
@@ -42,7 +40,7 @@ def set(self):
4240
# Note: This must not be called from anything except the thread running
4341
# the asyncio loop (i.e. neither hard or soft IRQ, or a different thread).
4442
while self.waiting.peek():
45-
core._task_queue.push_head(self.waiting.pop_head())
43+
core._task_queue.push(self.waiting.pop())
4644
self.state = True
4745

4846
def clear(self):
@@ -59,7 +57,7 @@ async def wait(self):
5957

6058
if not self.state:
6159
# Event not set, put the calling task on the event's waiting queue
62-
self.waiting.push_head(core.cur_task)
60+
self.waiting.push(core.cur_task)
6361
# Set calling task's data to the event's queue so it can be removed if needed
6462
core.cur_task.data = self.waiting
6563
await core._never()
@@ -90,6 +88,5 @@ async def wait(self):
9088
yield core._io_queue.queue_read(self)
9189
self._flag = 0
9290

93-
9491
except ImportError:
9592
pass

asyncio/funcs.py

+88-40
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
# SPDX-License-Identifier: MIT
44
#
55
# MicroPython uasyncio module
6-
# MIT license; Copyright (c) 2019-2020 Damien P. George
6+
# MIT license; Copyright (c) 2019-2022 Damien P. George
77
#
88
# This code comes from MicroPython, and has not been run through black or pylint there.
99
# Altering these files significantly would make merging difficult, so we will not use
@@ -19,6 +19,22 @@
1919
from . import core
2020

2121

22+
async def _run(waiter, aw):
23+
try:
24+
result = await aw
25+
status = True
26+
except BaseException as er:
27+
result = None
28+
status = er
29+
if waiter.data is None:
30+
# The waiter is still waiting, cancel it.
31+
if waiter.cancel():
32+
# Waiter was cancelled by us, change its CancelledError to an instance of
33+
# CancelledError that contains the status and result of waiting on aw.
34+
# If the wait_for task subsequently gets cancelled externally then this
35+
# instance will be reset to a CancelledError instance without arguments.
36+
waiter.data = core.CancelledError(status, result)
37+
2238
async def wait_for(aw, timeout, sleep=core.sleep):
2339
"""Wait for the *aw* awaitable to complete, but cancel if it takes longer
2440
than *timeout* seconds. If *aw* is not a task then a task will be created
@@ -36,41 +52,26 @@ async def wait_for(aw, timeout, sleep=core.sleep):
3652
if timeout is None:
3753
return await aw
3854

39-
async def runner(waiter, aw):
40-
nonlocal status, result
41-
try:
42-
result = await aw
43-
s = True
44-
except BaseException as er:
45-
s = er
46-
if status is None:
47-
# The waiter is still waiting, set status for it and cancel it.
48-
status = s
49-
waiter.cancel()
50-
5155
# Run aw in a separate runner task that manages its exceptions.
52-
status = None
53-
result = None
54-
runner_task = core.create_task(runner(core.cur_task, aw))
56+
runner_task = core.create_task(_run(core.cur_task, aw))
5557

5658
try:
5759
# Wait for the timeout to elapse.
5860
await sleep(timeout)
5961
except core.CancelledError as er:
60-
if status is True:
61-
# aw completed successfully and cancelled the sleep, so return aw's result.
62-
return result
63-
elif status is None:
62+
status = er.value
63+
if status is None:
6464
# This wait_for was cancelled externally, so cancel aw and re-raise.
65-
status = True
6665
runner_task.cancel()
6766
raise er
67+
elif status is True:
68+
# aw completed successfully and cancelled the sleep, so return aw's result.
69+
return er.args[1]
6870
else:
6971
# aw raised an exception, propagate it out to the caller.
7072
raise status
7173

7274
# The sleep finished before aw, so cancel aw and raise TimeoutError.
73-
status = True
7475
runner_task.cancel()
7576
await runner_task
7677
raise core.TimeoutError
@@ -85,30 +86,77 @@ def wait_for_ms(aw, timeout):
8586
return wait_for(aw, timeout, core.sleep_ms)
8687

8788

88-
async def gather(*aws, return_exceptions=False):
89+
class _Remove:
90+
@staticmethod
91+
def remove(t):
92+
pass
93+
94+
95+
def gather(*aws, return_exceptions=False):
8996
"""Run all *aws* awaitables concurrently. Any *aws* that are not tasks
9097
are promoted to tasks.
9198
9299
Returns a list of return values of all *aws*
93-
94-
This is a coroutine.
95100
"""
101+
def done(t, er):
102+
# Sub-task "t" has finished, with exception "er".
103+
nonlocal state
104+
if gather_task.data is not _Remove:
105+
# The main gather task has already been scheduled, so do nothing.
106+
# This happens if another sub-task already raised an exception and
107+
# woke the main gather task (via this done function), or if the main
108+
# gather task was cancelled externally.
109+
return
110+
elif not return_exceptions and not isinstance(er, StopIteration):
111+
# A sub-task raised an exception, indicate that to the gather task.
112+
state = er
113+
else:
114+
state -= 1
115+
if state:
116+
# Still some sub-tasks running.
117+
return
118+
# Gather waiting is done, schedule the main gather task.
119+
core._task_queue.push(gather_task)
96120

97121
ts = [core._promote_to_task(aw) for aw in aws]
98122
for i in range(len(ts)):
99-
try:
100-
# TODO handle cancel of gather itself
101-
# if ts[i].coro:
102-
# iter(ts[i]).waiting.push_head(cur_task)
103-
# try:
104-
# yield
105-
# except CancelledError as er:
106-
# # cancel all waiting tasks
107-
# raise er
108-
ts[i] = await ts[i]
109-
except (core.CancelledError, Exception) as er:
110-
if return_exceptions:
111-
ts[i] = er
112-
else:
113-
raise er
123+
if ts[i].state is not True:
124+
# Task is not running, gather not currently supported for this case.
125+
raise RuntimeError("can't gather")
126+
# Register the callback to call when the task is done.
127+
ts[i].state = done
128+
129+
# Set the state for execution of the gather.
130+
gather_task = core.cur_task
131+
state = len(ts)
132+
cancel_all = False
133+
134+
# Wait for the a sub-task to need attention.
135+
gather_task.data = _Remove
136+
try:
137+
yield
138+
except core.CancelledError as er:
139+
cancel_all = True
140+
state = er
141+
142+
# Clean up tasks.
143+
for i in range(len(ts)):
144+
if ts[i].state is done:
145+
# Sub-task is still running, deregister the callback and cancel if needed.
146+
ts[i].state = True
147+
if cancel_all:
148+
ts[i].cancel()
149+
elif isinstance(ts[i].data, StopIteration):
150+
# Sub-task ran to completion, get its return value.
151+
ts[i] = ts[i].data.value
152+
else:
153+
# Sub-task had an exception with return_exceptions==True, so get its exception.
154+
ts[i] = ts[i].data
155+
156+
# Either this gather was cancelled, or one of the sub-tasks raised an exception with
157+
# return_exceptions==False, so reraise the exception here.
158+
if state is not 0:
159+
raise state
160+
161+
# Return the list of return values of each sub-task.
114162
return ts

asyncio/lock.py

+3-3
Original file line numberDiff line numberDiff line change
@@ -50,8 +50,8 @@ def release(self):
5050
raise RuntimeError("Lock not acquired")
5151
if self.waiting.peek():
5252
# Task(s) waiting on lock, schedule next Task
53-
self.state = self.waiting.pop_head()
54-
core._task_queue.push_head(self.state)
53+
self.state = self.waiting.pop()
54+
core._task_queue.push(self.state)
5555
else:
5656
# No Task waiting so unlock
5757
self.state = 0
@@ -65,7 +65,7 @@ async def acquire(self):
6565

6666
if self.state != 0:
6767
# Lock unavailable, put the calling Task on the waiting queue
68-
self.waiting.push_head(core.cur_task)
68+
self.waiting.push(core.cur_task)
6969
# Set calling task's data to the lock's queue so it can be removed if needed
7070
core.cur_task.data = self.waiting
7171
try:

asyncio/stream.py

+1-3
Original file line numberDiff line numberDiff line change
@@ -151,9 +151,7 @@ async def open_connection(host, port):
151151
from uerrno import EINPROGRESS
152152
import usocket as socket
153153

154-
ai = socket.getaddrinfo(host, port, 0, socket.SOCK_STREAM)[
155-
0
156-
] # TODO this is blocking!
154+
ai = socket.getaddrinfo(host, port, 0, socket.SOCK_STREAM)[0] # TODO this is blocking!
157155
s = socket.socket(ai[0], ai[1], ai[2])
158156
s.setblocking(False)
159157
ss = Stream(s)

asyncio/task.py

+16-13
Original file line numberDiff line numberDiff line change
@@ -113,19 +113,18 @@ def __init__(self):
113113
def peek(self):
114114
return self.heap
115115

116-
def push_sorted(self, v, key):
116+
def push(self, v, key=None):
117+
assert v.ph_child is None
118+
assert v.ph_next is None
117119
v.data = None
118-
v.ph_key = key
119-
v.ph_child = None
120-
v.ph_next = None
120+
v.ph_key = key if key is not None else core.ticks()
121121
self.heap = ph_meld(v, self.heap)
122122

123-
def push_head(self, v):
124-
self.push_sorted(v, core.ticks())
125-
126-
def pop_head(self):
123+
def pop(self):
127124
v = self.heap
128-
self.heap = ph_pairing(self.heap.ph_child)
125+
assert v.ph_next is None
126+
self.heap = ph_pairing(v.ph_child)
127+
v.ph_child = None
129128
return v
130129

131130
def remove(self, v):
@@ -144,7 +143,7 @@ class Task:
144143
def __init__(self, coro, globals=None):
145144
self.coro = coro # Coroutine of this Task
146145
self.data = None # General data for queue it is waiting on
147-
self.state = True # None, False, True or a TaskQueue instance
146+
self.state = True # None, False, True, a callable, or a TaskQueue instance
148147
self.ph_key = 0 # Pairing heap
149148
self.ph_child = None # Paring heap
150149
self.ph_child_last = None # Paring heap
@@ -158,8 +157,12 @@ def __iter__(self):
158157
elif self.state is True:
159158
# Allocated head of linked list of Tasks waiting on completion of this task.
160159
self.state = TaskQueue()
160+
elif type(self.state) is not TaskQueue:
161+
# Task has state used for another purpose, so can't also wait on it.
162+
raise RuntimeError("can't wait")
161163
return self
162164

165+
# CircuitPython needs __await()__.
163166
__await__ = __iter__
164167

165168
def __next__(self):
@@ -168,7 +171,7 @@ def __next__(self):
168171
raise self.data
169172
else:
170173
# Put calling task on waiting queue.
171-
self.state.push_head(core.cur_task)
174+
self.state.push(core.cur_task)
172175
# Set calling task's data to this task that it waits on, to double-link it.
173176
core.cur_task.data = self
174177

@@ -195,10 +198,10 @@ def cancel(self):
195198
if hasattr(self.data, "remove"):
196199
# Not on the main running queue, remove the task from the queue it's on.
197200
self.data.remove(self)
198-
core._task_queue.push_head(self)
201+
core._task_queue.push(self)
199202
elif core.ticks_diff(self.ph_key, core.ticks()) > 0:
200203
# On the main running queue but scheduled in the future, so bring it forward to now.
201204
core._task_queue.remove(self)
202-
core._task_queue.push_head(self)
205+
core._task_queue.push(self)
203206
self.data = core.CancelledError
204207
return True

0 commit comments

Comments
 (0)