Skip to content

Commit 839bb35

Browse files
committed
fix: handle asyncio.CancelledError as retriable in topic writer and reader
1 parent 3f6394a commit 839bb35

File tree

5 files changed

+135
-4
lines changed

5 files changed

+135
-4
lines changed

ydb/_grpc/grpcwrapper/common_utils.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -234,6 +234,9 @@ async def get_response():
234234

235235
except (grpc.RpcError, grpc.aio.AioRpcError) as e:
236236
raise connection._rpc_error_handler(self._connection_state, e)
237+
except asyncio.CancelledError:
238+
# gRPC CancelledError - convert to YDB error for retry logic
239+
raise issues.ConnectionLost("gRPC stream cancelled")
237240

238241
if not is_coordination_calls:
239242
issues._process_response(grpc_message)

ydb/_topic_reader/topic_reader_asyncio.py

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -208,6 +208,7 @@ class ReaderReconnector:
208208
_static_reader_reconnector_counter = AtomicCounter()
209209

210210
_id: int
211+
_closed: bool
211212
_settings: topic_reader.PublicReaderSettings
212213
_driver: Driver
213214
_background_tasks: Set[Task]
@@ -224,6 +225,7 @@ def __init__(
224225
loop: Optional[asyncio.AbstractEventLoop] = None,
225226
):
226227
self._id = ReaderReconnector._static_reader_reconnector_counter.inc_and_get()
228+
self._closed = False
227229
self._settings = settings
228230
self._driver = driver
229231
self._loop = loop if loop is not None else asyncio.get_running_loop()
@@ -239,6 +241,7 @@ def __init__(
239241

240242
async def _connection_loop(self):
241243
attempt = 0
244+
retry_settings = self._settings._retry_settings()
242245
while True:
243246
try:
244247
logger.debug("reader %s connect attempt %s", self._id, attempt)
@@ -247,9 +250,17 @@ async def _connection_loop(self):
247250
attempt = 0
248251
self._state_changed.set()
249252
await self._stream_reader.wait_error()
253+
except asyncio.CancelledError:
254+
# CancelledError from close() - exit cleanly
255+
if self._closed:
256+
return
257+
# gRPC wrapper converts gRPC CancelledError to ConnectionLost (retriable).
258+
# In Python 3.11+, external task.cancel() is detected in wrapper and re-raised.
259+
# Any CancelledError reaching here is external cancellation - propagate it.
260+
raise
250261
except BaseException as err:
251262
logger.debug("reader %s, attempt %s connection loop error %s", self._id, attempt, err)
252-
retry_info = check_retriable_error(err, self._settings._retry_settings(), attempt)
263+
retry_info = check_retriable_error(err, retry_settings, attempt)
253264
if not retry_info.is_retriable:
254265
logger.debug("reader %s stop connection loop due to %s", self._id, err)
255266
self._set_first_error(err)
@@ -374,6 +385,10 @@ def commit(self, batch: datatypes.ICommittable) -> datatypes.PartitionSession.Co
374385
return self._stream_reader.commit(batch)
375386

376387
async def close(self, flush: bool):
388+
if self._closed:
389+
return
390+
self._closed = True
391+
377392
logger.debug("reader reconnector %s close", self._id)
378393
if self._stream_reader:
379394
await self._stream_reader.close(flush)

ydb/_topic_reader/topic_reader_asyncio_test.py

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1566,3 +1566,60 @@ async def stream_create(
15661566

15671567
reader_stream_mock_with_error.wait_error.assert_any_await()
15681568
reader_stream_mock_with_error.wait_messages.assert_any_await()
1569+
1570+
async def test_reconnect_on_connection_lost(self, monkeypatch):
1571+
"""Test that ConnectionLost (from gRPC CancelledError) is treated as retriable.
1572+
1573+
This tests the fix for issue #735 - gRPC wrapper converts CancelledError to
1574+
ConnectionLost, which should cause reconnection, not permanent failure.
1575+
"""
1576+
1577+
async def wait_error_with_connection_lost():
1578+
raise issues.ConnectionLost("gRPC stream cancelled")
1579+
1580+
reader_stream_mock_with_error = mock.Mock(ReaderStream)
1581+
reader_stream_mock_with_error._id = 0
1582+
reader_stream_mock_with_error.wait_error = mock.AsyncMock(side_effect=wait_error_with_connection_lost)
1583+
reader_stream_mock_with_error.close = mock.AsyncMock()
1584+
1585+
# First stream's wait_messages should also fail (simulating connection issue)
1586+
async def wait_messages_with_error():
1587+
raise issues.ConnectionLost("connection lost")
1588+
1589+
reader_stream_mock_with_error.wait_messages = mock.AsyncMock(side_effect=wait_messages_with_error)
1590+
1591+
async def wait_forever():
1592+
f = asyncio.Future()
1593+
await f
1594+
1595+
reader_stream_with_messages = mock.Mock(ReaderStream)
1596+
reader_stream_with_messages._id = 1
1597+
reader_stream_with_messages.wait_error = mock.AsyncMock(side_effect=wait_forever)
1598+
reader_stream_with_messages.wait_messages.return_value = None
1599+
reader_stream_with_messages.close = mock.AsyncMock()
1600+
1601+
stream_index = 0
1602+
1603+
async def stream_create(
1604+
reader_reconnector_id: int,
1605+
driver: SupportedDriverType,
1606+
settings: PublicReaderSettings,
1607+
):
1608+
nonlocal stream_index
1609+
stream_index += 1
1610+
if stream_index == 1:
1611+
return reader_stream_mock_with_error
1612+
elif stream_index == 2:
1613+
return reader_stream_with_messages
1614+
else:
1615+
raise Exception("unexpected create stream")
1616+
1617+
with mock.patch.object(ReaderStream, "create", stream_create):
1618+
reconnector = ReaderReconnector(mock.Mock(), PublicReaderSettings("", ""))
1619+
# This would hang/fail before the fix because gRPC errors weren't retriable
1620+
await asyncio.wait_for(reconnector.wait_message(), timeout=5)
1621+
await reconnector.close(flush=False)
1622+
1623+
# Verify that reconnection happened (ConnectionLost from wait_error triggered reconnect)
1624+
reader_stream_mock_with_error.wait_error.assert_any_await()
1625+
assert stream_index == 2, "Should have created second stream after ConnectionLost"

ydb/_topic_writer/topic_writer_asyncio.py

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -435,9 +435,9 @@ def _check_stop(self):
435435

436436
async def _connection_loop(self):
437437
retry_settings = RetrySettings(retry_cancelled=True) # todo
438+
attempt = 0
438439

439440
while True:
440-
attempt = 0 # todo calc and reset
441441
tasks = []
442442

443443
# noinspection PyBroadException
@@ -456,6 +456,7 @@ async def _connection_loop(self):
456456
self._id,
457457
stream_writer._id,
458458
)
459+
attempt = 0 # Reset after successful connect
459460
try:
460461
if self._init_info is None:
461462
self._last_known_seq_no = stream_writer.last_seqno
@@ -495,8 +496,18 @@ async def _connection_loop(self):
495496
err_info.sleep_timeout_seconds,
496497
)
497498
await asyncio.sleep(err_info.sleep_timeout_seconds)
499+
attempt += 1
498500

499-
except (asyncio.CancelledError, Exception) as err:
501+
except asyncio.CancelledError:
502+
# CancelledError from close() - exit cleanly
503+
if self._closed:
504+
return
505+
# gRPC wrapper converts gRPC CancelledError to ConnectionLost (retriable).
506+
# In Python 3.11+, external task.cancel() is detected in wrapper and re-raised.
507+
# Any CancelledError reaching here is external cancellation - propagate it.
508+
raise
509+
510+
except Exception as err:
500511
self._stop(err)
501512
return
502513
finally:

ydb/_topic_writer/topic_writer_asyncio_test.py

Lines changed: 46 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -320,7 +320,7 @@ async def receive(self) -> StreamWriteMessage.WriteResponse:
320320
raise Exception("read from closed StreamWriterMock")
321321

322322
item = await self.from_server.get()
323-
if isinstance(item, Exception):
323+
if isinstance(item, BaseException):
324324
raise item
325325
return item
326326

@@ -457,6 +457,51 @@ async def test_reconnect_and_resent_non_acked_messages_on_retriable_error(
457457
second_writer.from_server.put_nowait(self.make_default_ack_message(seq_no=2))
458458
await reconnector.close(flush=True)
459459

460+
async def test_reconnect_on_connection_lost(
461+
self,
462+
reconnector: WriterAsyncIOReconnector,
463+
get_stream_writer,
464+
):
465+
"""Test that ConnectionLost (from gRPC CancelledError) is treated as retriable."""
466+
now = datetime.datetime.now(datetime.timezone.utc)
467+
data = "123".encode()
468+
469+
message1 = PublicMessage(
470+
data=data,
471+
seqno=1,
472+
created_at=now,
473+
)
474+
message2 = PublicMessage(
475+
data=data,
476+
seqno=2,
477+
created_at=now,
478+
)
479+
await reconnector.write_with_ack_future([message1, message2])
480+
481+
# sent to first stream
482+
stream_writer = get_stream_writer()
483+
484+
messages = await stream_writer.from_client.get()
485+
assert [InternalMessage(message1)] == messages
486+
messages = await stream_writer.from_client.get()
487+
assert [InternalMessage(message2)] == messages
488+
489+
# ack first message
490+
stream_writer.from_server.put_nowait(self.make_default_ack_message(seq_no=1))
491+
492+
# simulate gRPC connection lost (gRPC wrapper converts CancelledError to ConnectionLost)
493+
stream_writer.from_server.put_nowait(issues.ConnectionLost("gRPC stream cancelled"))
494+
495+
# writer should reconnect and resend non-acked message
496+
second_writer = get_stream_writer()
497+
second_sent_msg = await asyncio.wait_for(second_writer.from_client.get(), timeout=5)
498+
499+
expected_messages = [InternalMessage(message2)]
500+
assert second_sent_msg == expected_messages
501+
502+
second_writer.from_server.put_nowait(self.make_default_ack_message(seq_no=2))
503+
await reconnector.close(flush=True)
504+
460505
async def test_stop_on_unexpected_exception(self, reconnector: WriterAsyncIOReconnector, get_stream_writer):
461506
class TestException(Exception):
462507
pass

0 commit comments

Comments
 (0)