Skip to content

Commit 24badc7

Browse files
authored
Merge pull request #548 from ydb-platform/tx_retryer
Transactional retryer
2 parents 228bb52 + ba5c216 commit 24badc7

File tree

4 files changed

+150
-0
lines changed

4 files changed

+150
-0
lines changed

tests/aio/query/test_query_session_pool.py

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,12 @@
11
import asyncio
22
import pytest
33
import ydb
4+
5+
from typing import Optional
6+
47
from ydb.aio.query.pool import QuerySessionPool
58
from ydb.aio.query.session import QuerySession, QuerySessionStateEnum
9+
from ydb.aio.query.transaction import QueryTxContext
610

711

812
class TestQuerySessionPool:
@@ -55,6 +59,43 @@ async def callee(session: QuerySession):
5559
with pytest.raises(CustomException):
5660
await pool.retry_operation_async(callee)
5761

62+
@pytest.mark.parametrize(
63+
"tx_mode",
64+
[
65+
(None),
66+
(ydb.QuerySerializableReadWrite()),
67+
(ydb.QuerySnapshotReadOnly()),
68+
(ydb.QueryOnlineReadOnly()),
69+
(ydb.QueryStaleReadOnly()),
70+
],
71+
)
72+
@pytest.mark.asyncio
73+
async def test_retry_tx_normal(self, pool: QuerySessionPool, tx_mode: Optional[ydb.BaseQueryTxMode]):
74+
retry_no = 0
75+
76+
async def callee(tx: QueryTxContext):
77+
nonlocal retry_no
78+
if retry_no < 2:
79+
retry_no += 1
80+
raise ydb.Unavailable("Fake fast backoff error")
81+
result_stream = await tx.execute("SELECT 1")
82+
return [result_set async for result_set in result_stream]
83+
84+
result = await pool.retry_tx_async(callee=callee, tx_mode=tx_mode)
85+
assert len(result) == 1
86+
assert retry_no == 2
87+
88+
@pytest.mark.asyncio
89+
async def test_retry_tx_raises(self, pool: QuerySessionPool):
90+
class CustomException(Exception):
91+
pass
92+
93+
async def callee(tx: QueryTxContext):
94+
raise CustomException()
95+
96+
with pytest.raises(CustomException):
97+
await pool.retry_tx_async(callee)
98+
5899
@pytest.mark.asyncio
59100
async def test_pool_size_limit_logic(self, pool: QuerySessionPool):
60101
target_size = 5

tests/query/test_query_session_pool.py

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,11 @@
11
import pytest
22
import ydb
3+
4+
from typing import Optional
5+
36
from ydb.query.pool import QuerySessionPool
47
from ydb.query.session import QuerySession, QuerySessionStateEnum
8+
from ydb.query.transaction import QueryTxContext
59

610

711
class TestQuerySessionPool:
@@ -46,6 +50,41 @@ def callee(session: QuerySession):
4650
with pytest.raises(CustomException):
4751
pool.retry_operation_sync(callee)
4852

53+
@pytest.mark.parametrize(
54+
"tx_mode",
55+
[
56+
(None),
57+
(ydb.QuerySerializableReadWrite()),
58+
(ydb.QuerySnapshotReadOnly()),
59+
(ydb.QueryOnlineReadOnly()),
60+
(ydb.QueryStaleReadOnly()),
61+
],
62+
)
63+
def test_retry_tx_normal(self, pool: QuerySessionPool, tx_mode: Optional[ydb.BaseQueryTxMode]):
64+
retry_no = 0
65+
66+
def callee(tx: QueryTxContext):
67+
nonlocal retry_no
68+
if retry_no < 2:
69+
retry_no += 1
70+
raise ydb.Unavailable("Fake fast backoff error")
71+
result_stream = tx.execute("SELECT 1")
72+
return [result_set for result_set in result_stream]
73+
74+
result = pool.retry_tx_sync(callee=callee, tx_mode=tx_mode)
75+
assert len(result) == 1
76+
assert retry_no == 2
77+
78+
def test_retry_tx_raises(self, pool: QuerySessionPool):
79+
class CustomException(Exception):
80+
pass
81+
82+
def callee(tx: QueryTxContext):
83+
raise CustomException()
84+
85+
with pytest.raises(CustomException):
86+
pool.retry_tx_sync(callee)
87+
4988
def test_pool_size_limit_logic(self, pool: QuerySessionPool):
5089
target_size = 5
5190
pool._size = target_size

ydb/aio/query/pool.py

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,11 @@
1313
RetrySettings,
1414
retry_operation_async,
1515
)
16+
from ...query.base import BaseQueryTxMode
1617
from ...query.base import QueryClientSettings
1718
from ... import convert
1819
from ..._grpc.grpcwrapper import common_utils
20+
from ..._grpc.grpcwrapper import ydb_query_public_types as _ydb_query_public
1921

2022
logger = logging.getLogger(__name__)
2123

@@ -122,6 +124,39 @@ async def wrapped_callee():
122124

123125
return await retry_operation_async(wrapped_callee, retry_settings)
124126

127+
async def retry_tx_async(
128+
self,
129+
callee: Callable,
130+
tx_mode: Optional[BaseQueryTxMode] = None,
131+
retry_settings: Optional[RetrySettings] = None,
132+
*args,
133+
**kwargs,
134+
):
135+
"""Special interface to execute a bunch of commands with transaction in a safe, retriable way.
136+
137+
:param callee: A function, that works with session.
138+
:param tx_mode: Transaction mode, which is a one from the following choises:
139+
1) QuerySerializableReadWrite() which is default mode;
140+
2) QueryOnlineReadOnly(allow_inconsistent_reads=False);
141+
3) QuerySnapshotReadOnly();
142+
4) QueryStaleReadOnly().
143+
:param retry_settings: RetrySettings object.
144+
145+
:return: Result sets or exception in case of execution errors.
146+
"""
147+
148+
tx_mode = tx_mode if tx_mode else _ydb_query_public.QuerySerializableReadWrite()
149+
retry_settings = RetrySettings() if retry_settings is None else retry_settings
150+
151+
async def wrapped_callee():
152+
async with self.checkout() as session:
153+
async with session.transaction(tx_mode=tx_mode) as tx:
154+
result = await callee(tx, *args, **kwargs)
155+
await tx.commit()
156+
return result
157+
158+
return await retry_operation_async(wrapped_callee, retry_settings)
159+
125160
async def execute_with_retries(
126161
self,
127162
query: str,

ydb/query/pool.py

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
import threading
99
import queue
1010

11+
from .base import BaseQueryTxMode
1112
from .base import QueryClientSettings
1213
from .session import (
1314
QuerySession,
@@ -20,6 +21,7 @@
2021
from .. import convert
2122
from ..settings import BaseRequestSettings
2223
from .._grpc.grpcwrapper import common_utils
24+
from .._grpc.grpcwrapper import ydb_query_public_types as _ydb_query_public
2325

2426

2527
logger = logging.getLogger(__name__)
@@ -138,6 +140,39 @@ def wrapped_callee():
138140

139141
return retry_operation_sync(wrapped_callee, retry_settings)
140142

143+
def retry_tx_sync(
144+
self,
145+
callee: Callable,
146+
tx_mode: Optional[BaseQueryTxMode] = None,
147+
retry_settings: Optional[RetrySettings] = None,
148+
*args,
149+
**kwargs,
150+
):
151+
"""Special interface to execute a bunch of commands with transaction in a safe, retriable way.
152+
153+
:param callee: A function, that works with session.
154+
:param tx_mode: Transaction mode, which is a one from the following choises:
155+
1) QuerySerializableReadWrite() which is default mode;
156+
2) QueryOnlineReadOnly(allow_inconsistent_reads=False);
157+
3) QuerySnapshotReadOnly();
158+
4) QueryStaleReadOnly().
159+
:param retry_settings: RetrySettings object.
160+
161+
:return: Result sets or exception in case of execution errors.
162+
"""
163+
164+
tx_mode = tx_mode if tx_mode else _ydb_query_public.QuerySerializableReadWrite()
165+
retry_settings = RetrySettings() if retry_settings is None else retry_settings
166+
167+
def wrapped_callee():
168+
with self.checkout(timeout=retry_settings.max_session_acquire_timeout) as session:
169+
with session.transaction(tx_mode=tx_mode) as tx:
170+
result = callee(tx, *args, **kwargs)
171+
tx.commit()
172+
return result
173+
174+
return retry_operation_sync(wrapped_callee, retry_settings)
175+
141176
def execute_with_retries(
142177
self,
143178
query: str,

0 commit comments

Comments
 (0)