8
8
9
9
import asyncio
10
10
import unittest
11
+ import weakref
11
12
12
13
from uvloop import _testbase as tb
13
14
@@ -25,12 +26,11 @@ async def on_request(request):
25
26
app = aiohttp .web .Application ()
26
27
app .router .add_get ('/' , on_request )
27
28
28
- f = self .loop .create_server (
29
- app .make_handler (),
30
- '0.0.0.0' , '0' )
31
- srv = self .loop .run_until_complete (f )
32
-
33
- port = srv .sockets [0 ].getsockname ()[1 ]
29
+ runner = aiohttp .web .AppRunner (app )
30
+ self .loop .run_until_complete (runner .setup ())
31
+ site = aiohttp .web .TCPSite (runner , '0.0.0.0' , '0' )
32
+ self .loop .run_until_complete (site .start ())
33
+ port = site ._server .sockets [0 ].getsockname ()[1 ]
34
34
35
35
async def test ():
36
36
# Make sure we're using the correct event loop.
@@ -45,11 +45,61 @@ async def test():
45
45
self .assertEqual (result , PAYLOAD )
46
46
47
47
self .loop .run_until_complete (test ())
48
- self .loop .run_until_complete (app .shutdown ())
49
- self .loop .run_until_complete (app .cleanup ())
48
+ self .loop .run_until_complete (runner .cleanup ())
49
+
50
+ def test_aiohttp_graceful_shutdown (self ):
51
+ async def websocket_handler (request ):
52
+ ws = aiohttp .web .WebSocketResponse ()
53
+ await ws .prepare (request )
54
+ request .app ['websockets' ].add (ws )
55
+ try :
56
+ async for msg in ws :
57
+ await ws .send_str (msg .data )
58
+ finally :
59
+ request .app ['websockets' ].discard (ws )
60
+ return ws
61
+
62
+ async def on_shutdown (app ):
63
+ for ws in set (app ['websockets' ]):
64
+ await ws .close (
65
+ code = aiohttp .WSCloseCode .GOING_AWAY ,
66
+ message = 'Server shutdown' )
67
+
68
+ asyncio .set_event_loop (self .loop )
69
+ app = aiohttp .web .Application ()
70
+ app .router .add_get ('/' , websocket_handler )
71
+ app .on_shutdown .append (on_shutdown )
72
+ app ['websockets' ] = weakref .WeakSet ()
73
+
74
+ runner = aiohttp .web .AppRunner (app )
75
+ self .loop .run_until_complete (runner .setup ())
76
+ site = aiohttp .web .TCPSite (runner , '0.0.0.0' , '0' )
77
+ self .loop .run_until_complete (site .start ())
78
+ port = site ._server .sockets [0 ].getsockname ()[1 ]
79
+
80
+ async def client ():
81
+ async with aiohttp .ClientSession () as client :
82
+ async with client .ws_connect (
83
+ 'http://127.0.0.1:{}' .format (port )) as ws :
84
+ await ws .send_str ("hello" )
85
+ async for msg in ws :
86
+ assert msg .data == "hello"
87
+
88
+ client_task = asyncio .ensure_future (client ())
89
+
90
+ async def stop ():
91
+ await asyncio .sleep (0.1 )
92
+ try :
93
+ await asyncio .wait_for (runner .cleanup (), timeout = 0.1 )
94
+ finally :
95
+ try :
96
+ client_task .cancel ()
97
+ await client_task
98
+ except asyncio .CancelledError :
99
+ pass
100
+
101
+ self .loop .run_until_complete (stop ())
50
102
51
- srv .close ()
52
- self .loop .run_until_complete (srv .wait_closed ())
53
103
54
104
55
105
@unittest .skipIf (skip_tests , "no aiohttp module" )
0 commit comments