Skip to content

Commit f877c39

Browse files
committed
Configure message buffer (#82)
1 parent d2ce97b commit f877c39

File tree

5 files changed

+212
-39
lines changed

5 files changed

+212
-39
lines changed

docs/backpressure.rst

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
Message Queues
2+
==============
3+
4+
.. currentmodule:: trio_websocket
5+
6+
.. TODO This file will grow into a "backpressure" document once #65 is complete.
7+
For now it is just deals with userspace buffers, since this is a related
8+
topic.
9+
10+
When a connection is open, it runs a background task that reads network data and
11+
automatically handles certain types of events for you. For example, if the
12+
background task receives a ping event, then it will automatically send back a
13+
pong event. When the background task receives a message, it places that message
14+
into an internal queue. When you call ``get_message()``, it returns the first
15+
item from this queue.
16+
17+
If this internal message queue does not have any size limits, then a remote
18+
endpoint could rapidly send large messages and use up all of the memory on the
19+
local machine! In almost all situations, the message queue needs to have size
20+
limits, both in terms of the number of items and the size per message. These
21+
limits create an upper bound for the amount of memory that can be used by a
22+
single WebSocket connection. For example, if the queue size is 10 and the
23+
maximum message size is 1 megabyte, then the connection will use at most 10
24+
megabytes of memory.
25+
26+
When the message queue is full, the background task pauses and waits for the
27+
user to remove a message, i.e. call ``get_message()``. When the background task
28+
is paused, it stops processing background events like replying to ping events.
29+
If a message is received that is larger than the maximum message size, then the
30+
connection is automatically closed with code 1009 and the message is discarded.
31+
32+
The library APIs each take arguments to configure the mesage buffer:
33+
``message_queue_size`` and ``max_message_size``. By default the queue size is
34+
one and the maximum message size is 1 MiB. If you set queue size to zero, then
35+
the background task will block every time it receives a message until somebody
36+
calls ``get_message()``. For an unbounded queue—which is strongly
37+
discouraged—set the queue size to ``math.inf``. Likewise, the maximum message
38+
size may also be disabled by setting it to ``math.inf``.

docs/index.rst

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ Autobahn Test Suite <https://github.com/crossbario/autobahn-testsuite>`__.
3232
getting_started
3333
clients
3434
servers
35+
backpressure
3536
timeouts
3637
api
3738
recipes

docs/servers.rst

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,9 @@ host/port to bind to. The handler function receives a
4141
:class:`WebSocketRequest` object, and it calls the request's
4242
:func:`~WebSocketRequest.accept` method to finish the handshake and obtain a
4343
:class:`WebSocketConnection` object. When the handler function exits, the
44-
connection is automatically closed.
44+
connection is automatically closed. If the handler function raises an
45+
exception, the server will silently close the connection and cancel the
46+
tasks belonging to it.
4547

4648
.. autofunction:: serve_websocket
4749

tests/test_connection.py

Lines changed: 60 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -53,14 +53,16 @@
5353

5454
HOST = '127.0.0.1'
5555
RESOURCE = '/resource'
56+
DEFAULT_TEST_MAX_DURATION = 1
5657

5758
# Timeout tests follow a general pattern: one side waits TIMEOUT seconds for an
5859
# event. The other side delays for FORCE_TIMEOUT seconds to force the timeout
5960
# to trigger. Each test also has maximum runtime (measure by Trio's clock) to
6061
# prevent a faulty test from hanging the entire suite.
6162
TIMEOUT = 1
6263
FORCE_TIMEOUT = 2
63-
MAX_TIMEOUT_TEST_DURATION = 3
64+
TIMEOUT_TEST_MAX_DURATION = 3
65+
import logging
6466

6567

6668
@pytest.fixture
@@ -89,12 +91,6 @@ async def echo_request_handler(request):
8991
Accept incoming request and then pass off to echo connection handler.
9092
'''
9193
conn = await request.accept()
92-
await echo_conn_handler(conn)
93-
94-
95-
async def echo_conn_handler(conn):
96-
''' A connection handler that reads one message, sends back the same
97-
message, then exits. '''
9894
try:
9995
msg = await conn.get_message()
10096
await conn.send_message(msg)
@@ -391,7 +387,7 @@ async def handler(stream):
391387
await client.send_message('Hello from client!')
392388

393389

394-
@fail_after(MAX_TIMEOUT_TEST_DURATION)
390+
@fail_after(TIMEOUT_TEST_MAX_DURATION)
395391
async def test_client_open_timeout(nursery, autojump_clock):
396392
'''
397393
The client times out waiting for the server to complete the opening
@@ -411,7 +407,7 @@ async def handler(request):
411407
pass
412408

413409

414-
@fail_after(MAX_TIMEOUT_TEST_DURATION)
410+
@fail_after(TIMEOUT_TEST_MAX_DURATION)
415411
async def test_client_close_timeout(nursery, autojump_clock):
416412
'''
417413
This client times out waiting for the server to complete the closing
@@ -430,15 +426,16 @@ async def handler(request):
430426
pytest.fail('Should not reach this line.')
431427

432428
server = await nursery.start(
433-
partial(serve_websocket, handler, HOST, 0, ssl_context=None))
429+
partial(serve_websocket, handler, HOST, 0, ssl_context=None,
430+
message_queue_size=0))
434431

435432
with pytest.raises(trio.TooSlowError):
436433
async with open_websocket(HOST, server.port, RESOURCE, use_ssl=False,
437434
disconnect_timeout=TIMEOUT) as client_ws:
438435
await client_ws.send_message('test')
439436

440437

441-
@fail_after(MAX_TIMEOUT_TEST_DURATION)
438+
@fail_after(TIMEOUT_TEST_MAX_DURATION)
442439
async def test_server_open_timeout(autojump_clock):
443440
'''
444441
The server times out waiting for the client to complete the opening
@@ -470,7 +467,7 @@ async def handler(request):
470467
nursery.cancel_scope.cancel()
471468

472469

473-
@fail_after(MAX_TIMEOUT_TEST_DURATION)
470+
@fail_after(TIMEOUT_TEST_MAX_DURATION)
474471
async def test_server_close_timeout(autojump_clock):
475472
'''
476473
The server times out waiting for the client to complete the closing
@@ -523,7 +520,6 @@ async def handler(request):
523520
with pytest.raises(ConnectionClosed):
524521
await server_ws.get_message()
525522
server = await nursery.start(serve_websocket, handler, HOST, 0, None)
526-
port = server.port
527523
stream = await trio.open_tcp_stream(HOST, server.port)
528524
client_ws = await wrap_client_stream(nursery, stream, HOST, RESOURCE)
529525
async with client_ws:
@@ -566,12 +562,14 @@ async def handler(request):
566562
assert exc.reason.name == 'NORMAL_CLOSURE'
567563

568564

569-
@pytest.mark.skip(reason='Hangs because channel size is hard coded to 0')
565+
@fail_after(DEFAULT_TEST_MAX_DURATION)
570566
async def test_read_messages_after_remote_close(nursery):
571567
'''
572568
When the remote endpoint closes, the local endpoint can still read all
573569
of the messages sent prior to closing. Any attempt to read beyond that will
574570
raise ConnectionClosed.
571+
572+
This test also exercises the configuration of the queue size.
575573
'''
576574
server_closed = trio.Event()
577575

@@ -585,7 +583,10 @@ async def handler(request):
585583
server = await nursery.start(
586584
partial(serve_websocket, handler, HOST, 0, ssl_context=None))
587585

588-
async with open_websocket(HOST, server.port, '/', use_ssl=False) as client:
586+
# The client needs a message queue of size 2 so that it can buffer both
587+
# incoming messages without blocking the reader task.
588+
async with open_websocket(HOST, server.port, '/', use_ssl=False,
589+
message_queue_size=2) as client:
589590
await server_closed.wait()
590591
assert await client.get_message() == '1'
591592
assert await client.get_message() == '2'
@@ -618,12 +619,53 @@ async def handler(request):
618619
client_closed.set()
619620

620621

621-
async def test_client_cm_exit_with_pending_messages(echo_server, autojump_clock):
622+
async def test_cm_exit_with_pending_messages(echo_server, autojump_clock):
623+
'''
624+
Regression test for #74, where a context manager was not able to exit when
625+
there were pending messages in the receive queue.
626+
'''
622627
with trio.fail_after(1):
623628
async with open_websocket(HOST, echo_server.port, RESOURCE,
624629
use_ssl=False) as ws:
625630
await ws.send_message('hello')
626631
# allow time for the server to respond
627632
await trio.sleep(.1)
628-
# bug: context manager exit is blocked on unconsumed message
629-
#await ws.get_message()
633+
634+
635+
@fail_after(DEFAULT_TEST_MAX_DURATION)
636+
async def test_max_message_size(nursery):
637+
'''
638+
Set the client's max message size to 100 bytes. The client can send a
639+
message larger than 100 bytes, but when it receives a message larger than
640+
100 bytes, it closes the connection with code 1009.
641+
'''
642+
import logging
643+
async def handler(request):
644+
''' Similar to the echo_request_handler fixture except it runs in a
645+
loop. '''
646+
conn = await request.accept()
647+
while True:
648+
try:
649+
msg = await conn.get_message()
650+
await conn.send_message(msg)
651+
except ConnectionClosed:
652+
break
653+
654+
server = await nursery.start(
655+
partial(serve_websocket, handler, HOST, 0, ssl_context=None))
656+
657+
async with open_websocket(HOST, server.port, RESOURCE, use_ssl=False,
658+
max_message_size=100) as client:
659+
# We can send and receive 100 bytes:
660+
await client.send_message(b'A' * 100)
661+
msg = await client.get_message()
662+
assert len(msg) == 100
663+
# We can send 101 bytes but cannot receive 101 bytes:
664+
await client.send_message(b'B' * 101)
665+
with pytest.raises(ConnectionClosed):
666+
await client.get_message()
667+
logging.debug('TEST 0')
668+
assert client.closed
669+
assert client.closed.code == 1009
670+
logging.debug('TEST 1')
671+
logging.debug('TEST 2')

0 commit comments

Comments
 (0)