Skip to content

Commit a1eae60

Browse files
committed
Using TransportConnectionFailed instead of TransportClosed
Now the reconnecting session will reconnect as soon as it detects that the connection failed. Better log messages
1 parent ed18fd4 commit a1eae60

12 files changed

+204
-118
lines changed

gql/client.py

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@
3535

3636
from .graphql_request import GraphQLRequest
3737
from .transport.async_transport import AsyncTransport
38-
from .transport.exceptions import TransportClosed, TransportQueryError
38+
from .transport.exceptions import TransportConnectionFailed, TransportQueryError
3939
from .transport.local_schema import LocalSchemaTransport
4040
from .transport.transport import Transport
4141
from .utilities import build_client_schema, get_introspection_query_ast
@@ -1730,6 +1730,7 @@ async def _connection_loop(self):
17301730
# Then wait for the reconnect event
17311731
self._reconnect_request_event.clear()
17321732
await self._reconnect_request_event.wait()
1733+
await self.transport.close()
17331734

17341735
async def start_connecting_task(self):
17351736
"""Start the task responsible to restart the connection
@@ -1758,7 +1759,7 @@ async def _execute_once(
17581759
**kwargs: Any,
17591760
) -> ExecutionResult:
17601761
"""Same Coroutine as parent method _execute but requesting a
1761-
reconnection if we receive a TransportClosed exception.
1762+
reconnection if we receive a TransportConnectionFailed exception.
17621763
"""
17631764

17641765
try:
@@ -1770,7 +1771,7 @@ async def _execute_once(
17701771
parse_result=parse_result,
17711772
**kwargs,
17721773
)
1773-
except TransportClosed:
1774+
except TransportConnectionFailed:
17741775
self._reconnect_request_event.set()
17751776
raise
17761777

@@ -1786,7 +1787,8 @@ async def _execute(
17861787
**kwargs: Any,
17871788
) -> ExecutionResult:
17881789
"""Same Coroutine as parent, but with optional retries
1789-
and requesting a reconnection if we receive a TransportClosed exception.
1790+
and requesting a reconnection if we receive a
1791+
TransportConnectionFailed exception.
17901792
"""
17911793

17921794
return await self._execute_with_retries(
@@ -1808,7 +1810,7 @@ async def _subscribe(
18081810
**kwargs: Any,
18091811
) -> AsyncGenerator[ExecutionResult, None]:
18101812
"""Same Async generator as parent method _subscribe but requesting a
1811-
reconnection if we receive a TransportClosed exception.
1813+
reconnection if we receive a TransportConnectionFailed exception.
18121814
"""
18131815

18141816
inner_generator: AsyncGenerator[ExecutionResult, None] = super()._subscribe(
@@ -1824,7 +1826,7 @@ async def _subscribe(
18241826
async for result in inner_generator:
18251827
yield result
18261828

1827-
except TransportClosed:
1829+
except TransportConnectionFailed:
18281830
self._reconnect_request_event.set()
18291831
raise
18301832

gql/transport/common/adapters/aiohttp.py

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -178,12 +178,14 @@ async def send(self, message: str) -> None:
178178
TransportConnectionFailed: If connection closed
179179
"""
180180
if self.websocket is None:
181-
raise TransportConnectionFailed("Connection is already closed")
181+
raise TransportConnectionFailed("WebSocket connection is already closed")
182182

183183
try:
184184
await self.websocket.send_str(message)
185-
except ConnectionResetError as e:
186-
raise TransportConnectionFailed("Connection was closed") from e
185+
except Exception as e:
186+
raise TransportConnectionFailed(
187+
f"Error trying to send data: {type(e).__name__}"
188+
) from e
187189

188190
async def receive(self) -> str:
189191
"""Receive message from the WebSocket server.
@@ -200,6 +202,9 @@ async def receive(self) -> str:
200202
raise TransportConnectionFailed("Connection is already closed")
201203

202204
while True:
205+
# Should not raise any exception:
206+
# https://docs.aiohttp.org/en/stable/_modules/aiohttp/client_ws.html
207+
# #ClientWebSocketResponse.receive
203208
ws_message = await self.websocket.receive()
204209

205210
# Ignore low-level ping and pong received

gql/transport/common/adapters/websockets.py

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -86,12 +86,14 @@ async def send(self, message: str) -> None:
8686
TransportConnectionFailed: If connection closed
8787
"""
8888
if self.websocket is None:
89-
raise TransportConnectionFailed("Connection is already closed")
89+
raise TransportConnectionFailed("WebSocket connection is already closed")
9090

9191
try:
9292
await self.websocket.send(message)
9393
except Exception as e:
94-
raise TransportConnectionFailed("Connection was closed") from e
94+
raise TransportConnectionFailed(
95+
f"Error trying to send data: {type(e).__name__}"
96+
) from e
9597

9698
async def receive(self) -> str:
9799
"""Receive message from the WebSocket server.
@@ -111,7 +113,9 @@ async def receive(self) -> str:
111113
try:
112114
data = await self.websocket.recv()
113115
except Exception as e:
114-
raise TransportConnectionFailed("Connection was closed") from e
116+
raise TransportConnectionFailed(
117+
f"Error trying to receive data: {type(e).__name__}"
118+
) from e
115119

116120
# websocket.recv() can return either str or bytes
117121
# In our case, we should receive only str here

gql/transport/common/base.py

Lines changed: 19 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -127,11 +127,13 @@ async def _send(self, message: str) -> None:
127127
"""Send the provided message to the adapter connection and log the message"""
128128

129129
if not self._connected:
130-
raise TransportClosed(
131-
"Transport is not connected"
132-
) from self.close_exception
130+
if isinstance(self.close_exception, TransportConnectionFailed):
131+
raise self.close_exception
132+
else:
133+
raise TransportConnectionFailed() from self.close_exception
133134

134135
try:
136+
# Can raise TransportConnectionFailed
135137
await self.adapter.send(message)
136138
log.info(">>> %s", message)
137139
except TransportConnectionFailed as e:
@@ -143,7 +145,7 @@ async def _receive(self) -> str:
143145

144146
# It is possible that the connection has been already closed in another task
145147
if not self._connected:
146-
raise TransportClosed("Transport is already closed")
148+
raise TransportConnectionFailed() from self.close_exception
147149

148150
# Wait for the next frame.
149151
# Can raise TransportConnectionFailed or TransportProtocolError
@@ -214,8 +216,6 @@ async def _receive_data_loop(self) -> None:
214216
except (TransportConnectionFailed, TransportProtocolError) as e:
215217
await self._fail(e, clean_close=False)
216218
break
217-
except TransportClosed:
218-
break
219219

220220
# Parse the answer
221221
try:
@@ -503,9 +503,10 @@ async def _close_coro(self, e: Exception, clean_close: bool = True) -> None:
503503
except Exception as exc: # pragma: no cover
504504
log.warning("Ignoring exception in _clean_close: " + repr(exc))
505505

506-
log.debug(
507-
f"_close_coro: sending exception to {len(self.listeners)} listeners"
508-
)
506+
if log.isEnabledFor(logging.DEBUG):
507+
log.debug(
508+
f"_close_coro: sending exception to {len(self.listeners)} listeners"
509+
)
509510

510511
# Send an exception to all remaining listeners
511512
for query_id, listener in self.listeners.items():
@@ -532,7 +533,15 @@ async def _close_coro(self, e: Exception, clean_close: bool = True) -> None:
532533
log.debug("_close_coro: exiting")
533534

534535
async def _fail(self, e: Exception, clean_close: bool = True) -> None:
535-
log.debug("_fail: starting with exception: " + repr(e))
536+
if log.isEnabledFor(logging.DEBUG):
537+
import inspect
538+
539+
current_frame = inspect.currentframe()
540+
assert current_frame is not None
541+
caller_frame = current_frame.f_back
542+
assert caller_frame is not None
543+
caller_name = inspect.getframeinfo(caller_frame).function
544+
log.debug(f"_fail from {caller_name}: " + repr(e))
536545

537546
if self.close_task is None:
538547

tests/test_aiohttp_websocket_exceptions.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -301,6 +301,15 @@ async def test_aiohttp_websocket_server_closing_after_ack(aiohttp_client_and_ser
301301

302302
query = gql("query { hello }")
303303

304+
print("\n Trying to execute first query.\n")
305+
306+
with pytest.raises(TransportConnectionFailed):
307+
await session.execute(query)
308+
309+
await session.transport.wait_closed()
310+
311+
print("\n Trying to execute second query.\n")
312+
304313
with pytest.raises(TransportConnectionFailed):
305314
await session.execute(query)
306315

tests/test_aiohttp_websocket_graphqlws_exceptions.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@
55

66
from gql import Client, gql
77
from gql.transport.exceptions import (
8-
TransportClosed,
98
TransportConnectionFailed,
109
TransportProtocolError,
1110
TransportQueryError,
@@ -264,10 +263,14 @@ async def test_aiohttp_websocket_graphqlws_server_closing_after_ack(
264263

265264
query = gql("query { hello }")
266265

266+
print("\n Trying to execute first query.\n")
267+
267268
with pytest.raises(TransportConnectionFailed):
268269
await session.execute(query)
269270

270271
await session.transport.wait_closed()
271272

272-
with pytest.raises(TransportClosed):
273+
print("\n Trying to execute second query.\n")
274+
275+
with pytest.raises(TransportConnectionFailed):
273276
await session.execute(query)

tests/test_aiohttp_websocket_graphqlws_subscription.py

Lines changed: 48 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
from gql.client import AsyncClientSession
1212
from gql.transport.exceptions import TransportConnectionFailed, TransportServerError
1313

14-
from .conftest import MS, PyPy, WebSocketServerHelper
14+
from .conftest import MS, WebSocketServerHelper
1515

1616
# Marking all tests in this file with the aiohttp AND websockets marker
1717
pytestmark = [pytest.mark.aiohttp, pytest.mark.websockets]
@@ -821,7 +821,6 @@ async def test_aiohttp_websocket_graphqlws_subscription_reconnecting_session(
821821
):
822822

823823
from gql.transport.aiohttp_websockets import AIOHTTPWebsocketsTransport
824-
from gql.transport.exceptions import TransportClosed
825824

826825
path = "/graphql"
827826
url = f"ws://{graphqlws_server.hostname}:{graphqlws_server.port}{path}"
@@ -839,56 +838,64 @@ async def test_aiohttp_websocket_graphqlws_subscription_reconnecting_session(
839838
reconnecting=True, retry_connect=False, retry_execute=False
840839
)
841840

842-
# First we make a subscription which will cause a disconnect in the backend
843-
# (count=8)
844-
try:
845-
print("\nSUBSCRIPTION_1_WITH_DISCONNECT\n")
846-
async for result in session.subscribe(subscription_with_disconnect):
847-
pass
848-
except TransportConnectionFailed:
849-
pass
850-
851-
await asyncio.sleep(50 * MS)
852-
853-
# Then with the same session handle, we make a subscription or an execute
854-
# which will detect that the transport is closed so that the client could
855-
# try to reconnect
856-
generator = None
841+
# First we make a query or subscription which will cause a disconnect
842+
# in the backend (count=8)
857843
try:
858844
if execute_instead_of_subscribe:
859-
print("\nEXECUTION_2\n")
860-
await session.execute(subscription)
845+
print("\nEXECUTION_1\n")
846+
await session.execute(subscription_with_disconnect)
861847
else:
862-
print("\nSUBSCRIPTION_2\n")
863-
generator = session.subscribe(subscription)
864-
async for result in generator:
848+
print("\nSUBSCRIPTION_1_WITH_DISCONNECT\n")
849+
async for result in session.subscribe(subscription_with_disconnect):
865850
pass
866-
except (TransportClosed, TransportConnectionFailed):
867-
if generator:
868-
await generator.aclose()
851+
except TransportConnectionFailed:
869852
pass
870853

871-
timeout = 50
872-
873-
if PyPy:
874-
timeout = 500
854+
# Wait for disconnect
855+
for i in range(200):
856+
await asyncio.sleep(1 * MS)
857+
if not transport._connected:
858+
print(f"\nDisconnected in {i+1} MS")
859+
break
875860

876-
await asyncio.sleep(timeout * MS)
861+
assert transport._connected is False
877862

878-
# And finally with the same session handle, we make a subscription
879-
# which works correctly
880-
print("\nSUBSCRIPTION_3\n")
881-
generator = session.subscribe(subscription)
882-
async for result in generator:
863+
# Wait for reconnect
864+
for i in range(200):
865+
await asyncio.sleep(1 * MS)
866+
if transport._connected:
867+
print(f"\nConnected again in {i+1} MS")
868+
break
883869

884-
number = result["number"]
885-
print(f"Number received: {number}")
870+
assert transport._connected is True
871+
872+
# Then after the reconnection, we make a query or a subscription
873+
if execute_instead_of_subscribe:
874+
print("\nEXECUTION_2\n")
875+
result = await session.execute(subscription)
876+
assert result["number"] == 10
877+
else:
878+
print("\nSUBSCRIPTION_2\n")
879+
generator = session.subscribe(subscription)
880+
async for result in generator:
881+
number = result["number"]
882+
print(f"Number received: {number}")
886883

887-
assert number == count
888-
count -= 1
884+
assert number == count
885+
count -= 1
889886

890-
await generator.aclose()
887+
await generator.aclose()
891888

892-
assert count == -1
889+
assert count == -1
893890

891+
# Close the reconnecting session
894892
await client.close_async()
893+
894+
# Wait for disconnect
895+
for i in range(200):
896+
await asyncio.sleep(1 * MS)
897+
if not transport._connected:
898+
print(f"\nDisconnected in {i+1} MS")
899+
break
900+
901+
assert transport._connected is False

tests/test_aiohttp_websocket_query.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@
88
from gql import Client, gql
99
from gql.transport.exceptions import (
1010
TransportAlreadyConnected,
11-
TransportClosed,
1211
TransportConnectionFailed,
1312
TransportQueryError,
1413
TransportServerError,
@@ -323,7 +322,7 @@ async def test_aiohttp_websocket_server_closing_after_first_query(
323322

324323
# Now the server is closed but we don't know it yet, we have to send a query
325324
# to notice it and to receive the exception
326-
with pytest.raises(TransportClosed):
325+
with pytest.raises(TransportConnectionFailed):
327326
await session.execute(query)
328327

329328

0 commit comments

Comments
 (0)