Skip to content

Commit 4e8ca96

Browse files
committed
micropython/streampair: Package to create bi-directional linked stream objects.
Signed-off-by: Andrew Leech <[email protected]>
1 parent 60d1370 commit 4e8ca96

File tree

3 files changed

+134
-0
lines changed

3 files changed

+134
-0
lines changed

micropython/streampair/manifest.py

+5
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
metadata(
2+
description="Create a bi-directional linked pair of stream objects", version="0.0.1"
3+
)
4+
5+
module("streampair.py")

micropython/streampair/streampair.py

+85
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
import io
2+
3+
from collections import deque
4+
from micropython import const
5+
6+
try:
7+
from typing import Union, Tuple
8+
except:
9+
pass
10+
11+
# From micropython/py/stream.h
12+
_MP_STREAM_ERROR = const(-1)
13+
_MP_STREAM_FLUSH = const(1)
14+
_MP_STREAM_SEEK = const(2)
15+
_MP_STREAM_POLL = const(3)
16+
_MP_STREAM_CLOSE = const(4)
17+
_MP_STREAM_POLL_RD = const(0x0001)
18+
19+
20+
def streampair(buffer_size: Union[int, Tuple[int, int]]=256):
21+
"""
22+
Returns two bi-directional linked stream objects where writes to one can be read from the other and vice/versa.
23+
This can be used somewhat similarly to a socket.socketpair in python, like a pipe
24+
of data that can be used to connect stream consumers (eg. asyncio.StreamWriter, mock Uart)
25+
"""
26+
try:
27+
size_a, size_b = buffer_size
28+
except TypeError:
29+
size_a = size_b = buffer_size
30+
31+
a = deque(size_a)
32+
b = deque(size_b)
33+
return StreamPair(a, b), StreamPair(b, a)
34+
35+
36+
class StreamPair(io.IOBase):
37+
38+
def __init__(self, own: deque, other: deque):
39+
self.own = own
40+
self.other = other
41+
super().__init__()
42+
43+
def read(self, nbytes=-1):
44+
return self.own.read(nbytes)
45+
46+
def readline(self):
47+
return self.own.readline()
48+
49+
def readinto(self, buf, limit=-1):
50+
return self.own.readinto(buf, limit)
51+
52+
def write(self, data):
53+
return self.other.write(data)
54+
55+
def seek(self, offset, whence):
56+
return self.own.seek(offset, whence)
57+
58+
def flush(self):
59+
self.own.flush()
60+
self.other.flush()
61+
62+
def close(self):
63+
self.own.close()
64+
self.other.close()
65+
66+
def any(self):
67+
pos = self.own.tell()
68+
end = self.own.seek(0, 2)
69+
self.own.seek(pos, 0)
70+
return end - pos
71+
72+
def ioctl(self, op, arg):
73+
if op == _MP_STREAM_POLL:
74+
if self.any():
75+
return _MP_STREAM_POLL_RD
76+
77+
elif op ==_MP_STREAM_FLUSH:
78+
return self.flush()
79+
elif op ==_MP_STREAM_SEEK:
80+
return self.seek(arg[0], arg[1])
81+
elif op ==_MP_STREAM_CLOSE:
82+
return self.close()
83+
84+
else:
85+
return _MP_STREAM_ERROR
+44
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
import asyncio
2+
import unittest
3+
from streampair import streampair
4+
5+
def async_test(f):
6+
"""
7+
Decorator to run an async test function
8+
"""
9+
def wrapper(*args, **kwargs):
10+
loop = asyncio.new_event_loop()
11+
# loop.set_exception_handler(_exception_handler)
12+
t = loop.create_task(f(*args, **kwargs))
13+
loop.run_until_complete(t)
14+
15+
return wrapper
16+
17+
class StreamPairTestCase(unittest.TestCase):
18+
19+
def test_streampair(self):
20+
a, b = streampair()
21+
assert a.write("foo") == 3
22+
assert b.write("bar") == 3
23+
24+
assert (r := a.read()) == "bar", r
25+
assert (r := b.read()) == "foo", r
26+
27+
@async_test
28+
async def test_async_streampair(self):
29+
a, b = streampair()
30+
ar = asyncio.StreamReader(a)
31+
bw = asyncio.StreamWriter(b)
32+
33+
br = asyncio.StreamReader(b)
34+
aw = asyncio.StreamWriter(a)
35+
36+
assert aw.write("foo") == 3
37+
assert await br.read() == "foo"
38+
39+
assert bw.write("bar") == 3
40+
assert await ar.read() == "bar"
41+
42+
43+
if __name__ == "__main__":
44+
unittest.main()

0 commit comments

Comments
 (0)