Skip to content

Commit b9f450f

Browse files
author
Marco Paolini
committed
Improve handling of cancelled coroutines on connection close
1 parent eb2ec35 commit b9f450f

File tree

1 file changed

+26
-14
lines changed

1 file changed

+26
-14
lines changed

pushpull/websocket/gateway.py

+26-14
Original file line numberDiff line numberDiff line change
@@ -40,20 +40,32 @@ async def websocket_rabbitmq_gateway(request):
4040
try:
4141
await ws.prepare(request)
4242
async with Exchanger(name, Exchanger.ROLE_WS, client_id=client_id) as (amqp_sender, amqp_receiver):
43-
send_coro = send_from_amqp_to_websocket(amqp_receiver, ws)
44-
receive_coro = send_from_websocket_to_amqp(ws, amqp_sender)
45-
ping_coro = send_ping_to_websocket(ws, config.get_ws_autoping_timeout())
46-
done, pending = await asyncio.wait(
47-
[receive_coro, send_coro, ping_coro],
48-
return_when=asyncio.FIRST_COMPLETED
49-
)
50-
logger.info('client id %s exiting due to done coroutines %r', client_id, done)
51-
for coro in pending:
52-
logger.warning('client id %s cancelling pending coroutine %r', client_id, coro)
53-
coro.cancel()
54-
for coro in done:
55-
result = coro.result()
56-
logger.info('client id %s coroutine %r done, result: %r', client_id, coro, result)
43+
done = []
44+
pending = [
45+
asyncio.ensure_future(coro) for coro in [
46+
send_from_amqp_to_websocket(amqp_receiver, ws),
47+
send_from_websocket_to_amqp(ws, amqp_sender),
48+
send_ping_to_websocket(ws, config.get_ws_autoping_timeout())
49+
]
50+
]
51+
try:
52+
done, pending = await asyncio.wait(pending, return_when=asyncio.FIRST_COMPLETED)
53+
except asyncio.CancelledError:
54+
logger.warning('client id %s exiging due to cancelling', client_id)
55+
else:
56+
logger.info('client id %s exiting due to done coroutines %r', client_id, done)
57+
finally:
58+
for coro in done:
59+
try:
60+
result = coro.result()
61+
except Exception as exc:
62+
logger.exception('client id %s coroutine %r done due to exception: %r', client_id, coro, exc)
63+
else:
64+
logger.info('client id %s coroutine %r done, result: %r', client_id, coro, result)
65+
for coro in pending:
66+
logger.info('client id %s cancelling pending coroutine %r', client_id, coro)
67+
coro.cancel()
68+
await asyncio.sleep(0)
5769
except Exception:
5870
logger.exception('client id %s exception while handling request', client_id)
5971
finally:

0 commit comments

Comments
 (0)