Skip to content

Commit 6f9abf8

Browse files
author
Marco Paolini
committed
Declare one fanout queue for each websocket client
- Add license - Make package installable - Add one single test
1 parent 93b0ca7 commit 6f9abf8

File tree

14 files changed

+187
-24
lines changed

14 files changed

+187
-24
lines changed

.gitignore

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,3 +22,5 @@ build
2222
*.egg
2323
*.manifest
2424
*.spec
25+
/.tox
26+
/.cache

LICENSE.txt

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
The MIT License (MIT)
2+
3+
Copyright (c) 2016 Marco Paolini
4+
5+
Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions:
6+
7+
The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.
8+
9+
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.

MANIFEST.in

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
include README.md
2+
include requirements.txt

pushpull/__init__.py

Whitespace-only changes.

pushpull/amqp/gateway.py

Lines changed: 20 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -12,28 +12,34 @@ class Exchanger:
1212
ROLE_WS = 1
1313
ROLE_APP = 2
1414

15-
def __init__(self, name, role, **connection_params):
15+
def __init__(self, name, role, client_id=0, **connection_params):
1616
self._conn_params = connection_params
1717
if role not in [self.ROLE_WS, self.ROLE_APP]:
1818
raise ValueError('bad role {}'.format(role))
1919
self.role = role
20-
self.q_names = [x.format(name) for x in ('{}.from_ws', '{}.to_ws')]
21-
if role == self.ROLE_APP:
22-
self.q_names.reverse()
20+
self.client_id = client_id
21+
self.name = name
2322

2423
async def __aenter__(self):
25-
logger.debug('connecting sender and receiver')
24+
logger.debug('connecting with role {}'.format(self.role))
2625
self._conn = await asynqp.connect(**self._conn_params)
2726
self._chan = await self._conn.open_channel()
28-
queues, exchanges = [], []
29-
for name in self.q_names:
30-
exchange = await self._chan.declare_exchange(name, 'direct')
31-
exchanges.append(exchange)
32-
queue = await self._chan.declare_queue(name)
33-
await queue.bind(exchange, name)
34-
queues.append(queue)
35-
logger.debug('connected sender and receiver')
36-
return Sender(exchanges[0], self.q_names[0]), Receiver(queues[1])
27+
app_routing_key = '{}.app'.format(self.name)
28+
app_exchange = await self._chan.declare_exchange(app_routing_key, 'fanout')
29+
ws_routing_key = '{}.ws'.format(self.name)
30+
ws_exchange = await self._chan.declare_exchange(ws_routing_key, 'direct')
31+
if self.role == self.ROLE_WS:
32+
receive_queue = await self._chan.declare_queue('{}.ws.{}'.format(self.name, self.client_id))
33+
await receive_queue.bind(app_exchange, app_routing_key)
34+
send_exchange = ws_exchange
35+
send_routing_key = ws_routing_key
36+
if self.role == self.ROLE_APP:
37+
receive_queue = await self._chan.declare_queue('{}.app'.format(self.name))
38+
await receive_queue.bind(ws_exchange, ws_routing_key)
39+
send_exchange = app_exchange
40+
send_routing_key = app_routing_key
41+
logger.debug('connected ok')
42+
return Sender(send_exchange, send_routing_key), Receiver(receive_queue)
3743

3844
async def __aexit__(self, exc_type, exc_value, traceback):
3945
logger.debug('closing connection and channel %r %r', exc_type, exc_value)

pushpull/cli/client.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,8 @@ def client():
1818
@click.argument('name')
1919
def challenge_websocket(url, name):
2020
logging.basicConfig(level=logging.DEBUG)
21-
click.echo(asyncio.get_event_loop().run_until_complete(websocket_client.challenge(url, name, sys.stdin, sys.stdout)))
21+
loop = asyncio.get_event_loop()
22+
click.echo(loop.run_until_complete(websocket_client.challenge(url, name, sys.stdin, sys.stdout, loop)))
2223
client.add_command(challenge_websocket)
2324

2425

pushpull/websocket/__init__.py

Whitespace-only changes.

pushpull/websocket/client.py

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -9,21 +9,21 @@
99
logger = logging.getLogger(__name__)
1010

1111

12-
async def challenge(url, name, fd_in, fd_out):
13-
with aiohttp.ClientSession() as session:
12+
async def challenge(url, name, fd_in, fd_out, loop=None):
13+
with aiohttp.ClientSession(loop=loop) as session:
1414
async with session.ws_connect('{}?name={}'.format(url, name)) as ws:
1515
logger.debug('opening websocket')
16-
sender = send_from_fd_to_ws(fd_in, ws)
16+
sender = send_from_fd_to_ws(fd_in, ws, loop=loop)
1717
receiver = send_from_ws_to_fd(ws, fd_out)
18-
_, pending = await asyncio.wait([sender, receiver], return_when=asyncio.FIRST_COMPLETED)
18+
done, pending = await asyncio.wait([sender, receiver], return_when=asyncio.FIRST_COMPLETED)
1919
for task in pending:
2020
task.cancel()
2121
logger.debug('closing websocket')
2222
# await ws.close()
2323

2424

25-
async def send_from_fd_to_ws(fd, ws):
26-
async for line in FdLineReader(fd):
25+
async def send_from_fd_to_ws(fd, ws, loop=None):
26+
async for line in FdLineReader(fd, loop=loop):
2727
logger.debug('sending line from fd to ws %r', line)
2828
ws.send_str(line)
2929
await ws._writer.writer.drain()

pushpull/websocket/gateway.py

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
"""
44
import logging
55
import asyncio
6+
import random
67

78
import aiohttp
89
import aiohttp.web
@@ -14,17 +15,22 @@
1415

1516

1617
async def websocket_rabbitmq_gateway(request):
18+
# TODO: AUTHENTICATION
1719
name = request.GET.get('name')
1820
if not name:
1921
return aiohttp.web.Response(body=b'name is required', status=400)
2022
ws = aiohttp.web.WebSocketResponse()
2123
logger.debug('websocket connection open')
2224
try:
2325
await ws.prepare(request)
24-
async with Exchanger(name, Exchanger.ROLE_WS) as (amqp_sender, amqp_receiver):
25-
send_coro = asyncio.ensure_future(send_from_amqp_to_websocket(amqp_receiver, ws))
26-
receive_coro = asyncio.ensure_future(send_from_websocket_to_amqp(ws, amqp_sender))
26+
# TODO: get client_id from request
27+
async with Exchanger(name, Exchanger.ROLE_WS, client_id=random.randint(1, 100)) as (amqp_sender, amqp_receiver):
28+
send_coro = send_from_amqp_to_websocket(amqp_receiver, ws)
29+
receive_coro = send_from_websocket_to_amqp(ws, amqp_sender)
2730
await asyncio.gather(receive_coro, send_coro)
31+
except Exception as exc:
32+
logger.exception('exception while handling request')
33+
raise
2834
finally:
2935
logger.debug('websocket connection closing')
3036
await ws.close()

setup.cfg

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
[flake8]
2+
max-line-length = 119
3+
exclude = migrations,tests,tests.py,.svn,CVS,.bzr,.hg,.git,__pycache__

0 commit comments

Comments
 (0)