Skip to content

Commit 93b0ca7

Browse files
author
Marco Paolini
committed
Enable two way communication
1 parent a9af6f2 commit 93b0ca7

File tree

5 files changed

+71
-42
lines changed

5 files changed

+71
-42
lines changed

pushpull/amqp/client.py

Lines changed: 19 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import asyncio
12
import logging
23

34
from .gateway import Exchanger
@@ -6,8 +7,21 @@
67
logger = logging.getLogger(__name__)
78

89

9-
async def challenge(fd):
10-
async with Exchanger('1') as (sender, _):
11-
async for line in FdLineReader(fd):
12-
logger.debug('sending message %r', line)
13-
await sender.send(line)
10+
async def challenge(name, fd_in, fd_out):
11+
async with Exchanger(name, Exchanger.ROLE_APP) as (amqp_sender, amqp_receiver):
12+
sender = send_from_fd_to_amqp(fd_in, amqp_sender)
13+
receiver = send_from_amqp_to_fd(amqp_receiver, fd_out)
14+
_, pending = await asyncio.wait([sender, receiver], return_when=asyncio.FIRST_COMPLETED)
15+
for task in pending:
16+
task.cancel()
17+
18+
19+
async def send_from_fd_to_amqp(fd, sender):
20+
async for line in FdLineReader(fd):
21+
logger.debug('sending message %r', line)
22+
await sender.send(line)
23+
24+
25+
async def send_from_amqp_to_fd(receiver, fd):
26+
async for msg in receiver:
27+
fd.write(msg)

pushpull/amqp/gateway.py

Lines changed: 20 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -9,19 +9,31 @@
99

1010
class Exchanger:
1111

12-
def __init__(self, name, **connection_params):
13-
self.exchange_name = name
12+
ROLE_WS = 1
13+
ROLE_APP = 2
14+
15+
def __init__(self, name, role, **connection_params):
1416
self._conn_params = connection_params
17+
if role not in [self.ROLE_WS, self.ROLE_APP]:
18+
raise ValueError('bad role {}'.format(role))
19+
self.role = role
20+
self.q_names = [x.format(name) for x in ('{}.from_ws', '{}.to_ws')]
21+
if role == self.ROLE_APP:
22+
self.q_names.reverse()
1523

1624
async def __aenter__(self):
1725
logger.debug('connecting sender and receiver')
1826
self._conn = await asynqp.connect(**self._conn_params)
1927
self._chan = await self._conn.open_channel()
20-
exchange = await self._chan.declare_exchange(self.exchange_name, 'direct')
21-
queue = await self._chan.declare_queue(self.exchange_name)
22-
await queue.bind(exchange, self.exchange_name)
28+
queues, exchanges = [], []
29+
for name in self.q_names:
30+
exchange = await self._chan.declare_exchange(name, 'direct')
31+
exchanges.append(exchange)
32+
queue = await self._chan.declare_queue(name)
33+
await queue.bind(exchange, name)
34+
queues.append(queue)
2335
logger.debug('connected sender and receiver')
24-
return Sender(exchange, self.exchange_name), Receiver(queue)
36+
return Sender(exchanges[0], self.q_names[0]), Receiver(queues[1])
2537

2638
async def __aexit__(self, exc_type, exc_value, traceback):
2739
logger.debug('closing connection and channel %r %r', exc_type, exc_value)
@@ -56,6 +68,8 @@ def __call__(self, amqp_message):
5668
self._fifo.put_nowait(amqp_message.body.decode(amqp_message.content_encoding))
5769
except asyncio.QueueFull:
5870
logger.warning('queue full')
71+
else:
72+
amqp_message.ack()
5973

6074
def on_error(self, exc):
6175
logger.error('error received: %r', exc)

pushpull/cli/client.py

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,16 +15,18 @@ def client():
1515

1616
@click.command()
1717
@click.argument('url')
18-
def challenge_websocket(url):
18+
@click.argument('name')
19+
def challenge_websocket(url, name):
1920
logging.basicConfig(level=logging.DEBUG)
20-
click.echo(asyncio.get_event_loop().run_until_complete(websocket_client.challenge(url, sys.stdin, sys.stdout)))
21+
click.echo(asyncio.get_event_loop().run_until_complete(websocket_client.challenge(url, name, sys.stdin, sys.stdout)))
2122
client.add_command(challenge_websocket)
2223

2324

2425
@click.command()
25-
def challenge_amqp():
26+
@click.argument('name')
27+
def challenge_amqp(name):
2628
logging.basicConfig(level=logging.DEBUG)
27-
click.echo(asyncio.get_event_loop().run_until_complete(amqp_client.challenge(sys.stdin)))
29+
click.echo(asyncio.get_event_loop().run_until_complete(amqp_client.challenge(name, sys.stdin, sys.stdout)))
2830
client.add_command(challenge_amqp)
2931

3032

pushpull/websocket/client.py

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -9,30 +9,30 @@
99
logger = logging.getLogger(__name__)
1010

1111

12-
async def challenge(url, fd_in, fd_out):
12+
async def challenge(url, name, fd_in, fd_out):
1313
with aiohttp.ClientSession() as session:
14-
async with session.ws_connect(url) as ws:
14+
async with session.ws_connect('{}?name={}'.format(url, name)) as ws:
1515
logger.debug('opening websocket')
16-
await send_from_fd_to_ws(fd_in, ws)
17-
return
18-
sender = asyncio.ensure_future(send_from_fd_to_ws(fd_in, ws))
19-
receiver = asyncio.ensure_future(send_from_ws_to_fd(fd_out, ws))
20-
asyncio.gather(sender, receiver)
16+
sender = send_from_fd_to_ws(fd_in, ws)
17+
receiver = send_from_ws_to_fd(ws, fd_out)
18+
_, pending = await asyncio.wait([sender, receiver], return_when=asyncio.FIRST_COMPLETED)
19+
for task in pending:
20+
task.cancel()
2121
logger.debug('closing websocket')
2222
# await ws.close()
2323

2424

2525
async def send_from_fd_to_ws(fd, ws):
2626
async for line in FdLineReader(fd):
27-
logger.debug('sending message %r', line)
27+
logger.debug('sending line from fd to ws %r', line)
2828
ws.send_str(line)
2929
await ws._writer.writer.drain()
3030

3131

3232
async def send_from_ws_to_fd(ws, fd):
3333
async for msg in ws:
3434
if msg.tp == aiohttp.MsgType.text:
35-
logger.debug('got data: %s', msg.data)
35+
logger.debug('sending data from ws to fd: %s', msg.data)
3636
fd.write(msg.data)
3737
elif msg.tp == aiohttp.MsgType.error:
3838
logger.error('ws connection closed with exception %s', ws.exception())

pushpull/websocket/gateway.py

Lines changed: 17 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -14,35 +14,23 @@
1414

1515

1616
async def websocket_rabbitmq_gateway(request):
17-
17+
name = request.GET.get('name')
18+
if not name:
19+
return aiohttp.web.Response(body=b'name is required', status=400)
1820
ws = aiohttp.web.WebSocketResponse()
19-
await ws.prepare(request)
2021
logger.debug('websocket connection open')
2122
try:
22-
# await echo_websocket(ws)
23-
async with Exchanger('2') as (amqp_sender, amqp_receiver):
23+
await ws.prepare(request)
24+
async with Exchanger(name, Exchanger.ROLE_WS) as (amqp_sender, amqp_receiver):
2425
send_coro = asyncio.ensure_future(send_from_amqp_to_websocket(amqp_receiver, ws))
2526
receive_coro = asyncio.ensure_future(send_from_websocket_to_amqp(ws, amqp_sender))
2627
await asyncio.gather(receive_coro, send_coro)
27-
except Exception as exc:
28-
logger.exception('fatal error')
2928
finally:
3029
logger.debug('websocket connection closing')
3130
await ws.close()
3231
return ws
3332

3433

35-
async def echo_websocket(ws):
36-
"""For testing only """
37-
async for msg in ws:
38-
if msg.tp == aiohttp.MsgType.text:
39-
logger.debug('echoing data: %s', msg.data)
40-
ws.send_str(msg.data)
41-
elif msg.tp == aiohttp.MsgType.error:
42-
logger.error('ws connection closed with exception %s', ws.exception())
43-
return
44-
45-
4634
async def send_from_websocket_to_amqp(ws, sender):
4735
async for msg in ws:
4836
if msg.tp == aiohttp.MsgType.text:
@@ -56,4 +44,15 @@ async def send_from_websocket_to_amqp(ws, sender):
5644
async def send_from_amqp_to_websocket(receiver, ws):
5745
async for data in receiver:
5846
ws.send_str(data)
59-
#await ws._writer.writer.drain()
47+
# await ws._writer.writer.drain()
48+
49+
50+
async def echo_websocket(ws):
51+
"""For testing only """
52+
async for msg in ws:
53+
if msg.tp == aiohttp.MsgType.text:
54+
logger.debug('echoing data: %s', msg.data)
55+
ws.send_str(msg.data)
56+
elif msg.tp == aiohttp.MsgType.error:
57+
logger.error('ws connection closed with exception %s', ws.exception())
58+
return

0 commit comments

Comments
 (0)