19
19
async def websocket_rabbitmq_gateway (request ):
20
20
authorization = decode_auth_querystring_param (request .GET )
21
21
try :
22
- # TODO: reuse amqp channel: here we open one, we close it and then we reopen another one with the
23
- # Exchanger a few lines down
22
+ # TODO: reuse amqp channel and maybe share it with the Exchanger
24
23
user_info = await auth .get_user_info (authorization )
25
24
except auth .NotAuthorized as exc :
26
25
raise aiohttp .web_exceptions .HTTPUnauthorized () from exc
@@ -29,8 +28,7 @@ async def websocket_rabbitmq_gateway(request):
29
28
except auth .AuthTimeout as exc :
30
29
logger .warning ('auth backend timeout' )
31
30
raise aiohttp .web_exceptions .HTTPInternalServerError (text = 'auth backend timeout' ) from exc
32
- else :
33
- name = user_info .id
31
+ name = user_info .id
34
32
ws = aiohttp .web .WebSocketResponse ()
35
33
client_id = request .GET .get ('client-id' )
36
34
if not client_id :
@@ -45,7 +43,8 @@ async def websocket_rabbitmq_gateway(request):
45
43
asyncio .ensure_future (coro ) for coro in [
46
44
send_from_amqp_to_websocket (amqp_receiver , ws ),
47
45
send_from_websocket_to_amqp (ws , amqp_sender ),
48
- send_ping_to_websocket (ws , config .get_ws_autoping_timeout ())
46
+ send_ping_to_websocket (ws , config .get_ws_autoping_timeout ()),
47
+ check_auth_periodic (authorization , delay = config .get_periodic_auth_check_timeout ())
49
48
]
50
49
]
51
50
try :
@@ -61,9 +60,9 @@ async def websocket_rabbitmq_gateway(request):
61
60
except Exception as exc :
62
61
logger .exception ('client id %s coroutine %r done due to exception: %r' , client_id , coro , exc )
63
62
else :
64
- logger .info ('client id %s coroutine %r done, result: %r' , client_id , coro , result )
63
+ logger .debug ('client id %s coroutine %r done, result: %r' , client_id , coro , result )
65
64
for coro in pending :
66
- logger .info ('client id %s cancelling pending coroutine %r' , client_id , coro )
65
+ logger .debug ('client id %s cancelling pending coroutine %r' , client_id , coro )
67
66
coro .cancel ()
68
67
await asyncio .sleep (0 )
69
68
except Exception :
@@ -100,6 +99,18 @@ async def send_ping_to_websocket(ws, timeout):
100
99
await asyncio .sleep (timeout )
101
100
102
101
102
+ async def check_auth_periodic (authorization , delay ):
103
+ while True :
104
+ await asyncio .sleep (delay )
105
+ logger .debug ('periodic auth check start for %s' , authorization )
106
+ try :
107
+ # TODO: reuse amqp channel and maybe share it with the Exchanger
108
+ await auth .get_user_info (authorization )
109
+ except auth .AuthTimeout as exc :
110
+ logger .warning ('periodic auth check timeout for %s' , authorization )
111
+ logger .debug ('periodic auth check OK for %s' , authorization )
112
+
113
+
103
114
async def echo_websocket (ws ):
104
115
"""For testing only """
105
116
async for msg in ws :
0 commit comments