|
4 | 4 | import aioamqp
|
5 | 5 |
|
6 | 6 | from ... import config
|
| 7 | +from .driver_base import ExchangerBase |
7 | 8 |
|
8 | 9 | logger = logging.getLogger(__name__)
|
9 | 10 |
|
10 | 11 |
|
11 |
| -class Exchanger: |
12 |
| - |
13 |
| - ROLE_WS = 1 |
14 |
| - ROLE_APP = 2 |
15 |
| - |
16 |
| - def __init__(self, name, role, client_id=0): |
17 |
| - if role not in [self.ROLE_WS, self.ROLE_APP]: |
18 |
| - raise ValueError('bad role {}'.format(role)) |
19 |
| - self.role = role |
20 |
| - self.client_id = client_id |
21 |
| - self.name = name |
| 12 | +class Exchanger(ExchangerBase): |
22 | 13 |
|
23 | 14 | async def __aenter__(self):
|
24 | 15 | logger.debug('connecting with role {}'.format(self.role))
|
25 |
| - params = config.get_amqp_conn_params() |
| 16 | + params = config.get_amqp_conn_params(self.url) |
26 | 17 | params['login'] = params.pop('username')
|
27 | 18 | params['virtualhost'] = params.pop('virtual_host')
|
28 | 19 | self._transport, self._protocol = await aioamqp.connect(**params)
|
29 | 20 | # TODO: handle reconnect awaiting from self._conn
|
30 | 21 | self._chan = await self._protocol.channel()
|
31 |
| - app_exchange_name = 'pushpull.app' |
32 |
| - app_routing_key = '' |
33 |
| - ws_exchange_name = 'pushpull.ws' |
34 |
| - ws_routing_key = 'pushpull.ws.{}'.format(self.name) |
| 22 | + app_exchange_name = self.get_app_exchange_name() |
| 23 | + app_routing_key = self.get_app_routing_key() |
| 24 | + ws_exchange_name = self.get_ws_exchange_name() |
| 25 | + ws_routing_key = self.get_ws_routing_key() |
35 | 26 | await self._chan.exchange(app_exchange_name, 'fanout', durable=True)
|
36 | 27 | await self._chan.exchange(ws_exchange_name, 'direct', durable=True)
|
37 | 28 | if self.role == self.ROLE_WS:
|
|
0 commit comments