Skip to content

Commit 934de40

Browse files
author
Marco Paolini
committed
Avoid sending empty messages to rabbitmq
1 parent b9f450f commit 934de40

File tree

2 files changed

+14
-6
lines changed

2 files changed

+14
-6
lines changed

pushpull/amqp/client.py

+10-5
Original file line numberDiff line numberDiff line change
@@ -12,9 +12,13 @@
1212

1313
async def challenge(url, user_id, fd_in, fd_out):
1414
async with Exchanger(user_id, Exchanger.ROLE_APP, url=url) as (amqp_sender, amqp_receiver):
15-
sender = send_from_fd_to_amqp(fd_in, amqp_sender)
16-
receiver = send_from_amqp_to_fd(amqp_receiver, fd_out)
17-
_, pending = await asyncio.wait([sender, receiver], return_when=asyncio.FIRST_COMPLETED)
15+
pending = [
16+
asyncio.ensure_future(coro) for coro in [
17+
send_from_fd_to_amqp(fd_in, amqp_sender),
18+
send_from_amqp_to_fd(amqp_receiver, fd_out)
19+
]
20+
]
21+
_, pending = await asyncio.wait(pending, return_when=asyncio.FIRST_COMPLETED)
1822
for task in pending:
1923
task.cancel()
2024

@@ -38,8 +42,9 @@ async def authenticate(url, authenticator):
3842

3943
async def send_from_fd_to_amqp(fd, sender):
4044
async for line in FdLineReader(fd):
41-
logger.debug('sending message %r', line)
42-
await sender.send(line)
45+
if line:
46+
logger.debug('sending message %r', line)
47+
await sender.send(line)
4348

4449

4550
async def send_from_amqp_to_fd(receiver, fd):

pushpull/websocket/gateway.py

+4-1
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,10 @@ async def send_from_websocket_to_amqp(ws, sender):
7878
async for msg in ws:
7979
if msg.tp == aiohttp.MsgType.text:
8080
logger.debug('got data: %s', msg.data)
81-
await sender.send(msg.data)
81+
if msg.data:
82+
await sender.send(msg.data)
83+
else:
84+
logger.warning('refusing to send empty data')
8285
elif msg.tp == aiohttp.MsgType.error:
8386
logger.error('ws connection closed with exception %s', ws.exception())
8487
return

0 commit comments

Comments
 (0)