asyncio with improved I/O performance #18816
Replies: 4 comments 4 replies
-
|
Test script which shows the impact of the above: import asyncio_fast as asyncio
# import asyncio
import time
import io
MP_STREAM_POLL_RD = const(1)
MP_STREAM_POLL = const(3)
MP_STREAM_ERROR = const(-1)
class MillisecTimer(io.IOBase): # Timer using I/O mechanism
def __init__(self):
self.end = 0
self.sreader = asyncio.StreamReader(self)
def __iter__(self):
await self.sreader.read(1)
def __call__(self, ms):
self.end = time.ticks_add(time.ticks_ms(), ms)
return self
def read(self, _):
return b"a"
def ioctl(self, req, arg):
ret = MP_STREAM_ERROR
if req == MP_STREAM_POLL:
ret = 0
if arg & MP_STREAM_POLL_RD:
if time.ticks_diff(time.ticks_ms(), self.end) >= 0:
ret |= MP_STREAM_POLL_RD
return ret
async def block(n): # Blocking task
while True:
time.sleep_ms(5)
await asyncio.sleep_ms(0)
async def timer_test(m):
timer = MillisecTimer()
tasks = []
for n in range(10):
tasks.append(asyncio.create_task(block(n)))
for x in range(m):
t = time.ticks_ms()
await timer(100) # Pause, measure the actual duration
print(x, time.ticks_diff(time.ticks_ms(), t))
asyncio.run(timer_test(20))On a Pyboard 1.1 with standard |
Beta Was this translation helpful? Give feedback.
-
|
I've run into the exact latency issue this solves multiple times, and I think it's important to solve this. In principle, async is an excellent abstraction for dealing with the state-management challenges that come with interrupt-driven program flow, but MicroPython's current ThreadSafeFlag isn't even close to satisfactory on performace. As a specific example where it wasn't adequate, I had a situation once with several different chips that required a firmware blob upload over I2C following their own snowflake upload logic. With multiplexing, this task should have been able to fully saturate both I2C busses on the rp2040 chip in question, but with the tens of milliseconds it always took from each 'I2C complete' IRQ to the subsequent resume-from-await on the corresponding flag, I was only ever able to reach around 40% bus utilization. The advantage of async here in being able to implement the datasheet upload routines exactly as written without manual interleaving was still useful --- but it's a seriously hamstrung feature at present. I am cautious about the risk of changing existing behavior, though, in a way that could lead to task starvation in existing codebases. As far as ways of breaking old code go, that's definitely one of the worst ways. As a way to add this in a non-breaking way, what about the already-standardized Perhaps an implementation more like this? def wait_io_event(self, dt):
for s, ev in self.poller.ipoll(dt):
sm = self.map[id(s)]
# print('poll', s, sm, ev)
due = ticks()
if ev & select.POLLPRI:
ev &= ~select.POLLPRI
due = ticks_add(due, -1000)
if ev & ~select.POLLOUT and sm[0] is not None:
# POLLIN or error
_task_queue.push(sm[0], due)
sm[0] = None
if ev & ~select.POLLIN and sm[1] is not None:
# POLLOUT or error
_task_queue.push(sm[1], due)
sm[1] = None
if sm[0] is None and sm[1] is None:
self._dequeue(s)
elif sm[0] is None:
self.poller.modify(s, select.POLLOUT)
else:
self.poller.modify(s, select.POLLIN)Would also need to update how the IOQueue sets the eventmask when registering; could potentially even extend to add a |
Beta Was this translation helpful? Give feedback.
-
|
Just submitted a PR for another possible way to improve async latency: |
Beta Was this translation helpful? Give feedback.
-
|
I have added asyncio_alt to the asyncio repo. This offers fast I/O scheduling and low power operation. It is mip-installable. Fast I/O needs some attention to detail in application design. The low power option is for experimenters and requires careful application design for best results. It is platform dependent and is incompatible with STM32. Power savings depend on the performance of |
Beta Was this translation helpful? Give feedback.
Uh oh!
There was an error while loading. Please reload this page.
Uh oh!
There was an error while loading. Please reload this page.
-
This proposes a minor (2 LOC) change to
asyncio.core.pyto give I/O tasks higher priority.Currently if an I/O event occurs - such as a
ThreadSafeFlagbeing set - the task waiting on the TSF is queued behind other pending tasks. If there are ten tasks each of which blocks for 5ms before yielding, latency is 50ms. This change causes the waiting task to be placed at the head of the queue, reducing TSF latency (to <=5ms in this example). In applications using fast I/O there is potential for using smaller buffers.The modified version passes the test suite with the exception of
asyncio_threadsafeflag: in this case the sequence of output print statements is changed as a result of the reduced latency.When an I/O task is assigned to the task queue it is now assigned a time-to-run which is overdue by 1s. From
core.py, the two changed lines being commented as overdue:This ensures that I/O tasks retain appropriate relative order, while being scheduled ahead of normal due and pending tasks.
Comments welcome!
Beta Was this translation helpful? Give feedback.
All reactions