Skip to content

Commit cda5dad

Browse files
authored
Merge pull request #227 V3 fix sync call timeout
2 parents 2eeb6d3 + 2ef0026 commit cda5dad

File tree

5 files changed

+361
-78
lines changed

5 files changed

+361
-78
lines changed

ydb/_topic_common/common.py

Lines changed: 86 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
from .. import operation, issues
88
from .._grpc.grpcwrapper.common_utils import IFromProtoWithProtoType
99

10-
TimeoutType = typing.Union[int, float]
10+
TimeoutType = typing.Union[int, float, None]
1111

1212

1313
def wrap_operation(rpc_state, response_pb, driver=None):
@@ -60,3 +60,88 @@ def start_event_loop():
6060

6161
_shared_event_loop = event_loop_set_done.result()
6262
return _shared_event_loop
63+
64+
65+
class CallFromSyncToAsync:
66+
_loop: asyncio.AbstractEventLoop
67+
68+
def __init__(self, loop: asyncio.AbstractEventLoop):
69+
self._loop = loop
70+
71+
def unsafe_call_with_future(
72+
self, coro: typing.Coroutine
73+
) -> concurrent.futures.Future:
74+
"""
75+
returned result from coro may be lost
76+
"""
77+
return asyncio.run_coroutine_threadsafe(coro, self._loop)
78+
79+
def unsafe_call_with_result(self, coro: typing.Coroutine, timeout: TimeoutType):
80+
"""
81+
returned result from coro may be lost by race future cancel by timeout and return value from coroutine
82+
"""
83+
f = self.unsafe_call_with_future(coro)
84+
try:
85+
return f.result(timeout)
86+
except concurrent.futures.TimeoutError:
87+
raise TimeoutError()
88+
finally:
89+
if not f.done():
90+
f.cancel()
91+
92+
def safe_call_with_result(self, coro: typing.Coroutine, timeout: TimeoutType):
93+
"""
94+
no lost returned value from coro, but may be slower especially timeout latency - it wait coroutine cancelation.
95+
"""
96+
97+
if timeout is not None and timeout <= 0:
98+
return self._safe_call_fast(coro)
99+
100+
async def call_coro():
101+
task = self._loop.create_task(coro)
102+
try:
103+
res = await asyncio.wait_for(task, timeout)
104+
return res
105+
except asyncio.TimeoutError:
106+
try:
107+
res = await task
108+
return res
109+
except asyncio.CancelledError:
110+
pass
111+
112+
# return builtin TimeoutError instead of asyncio.TimeoutError
113+
raise TimeoutError()
114+
115+
return asyncio.run_coroutine_threadsafe(call_coro(), self._loop).result()
116+
117+
def _safe_call_fast(self, coro: typing.Coroutine):
118+
"""
119+
no lost returned value from coro, but may be slower especially timeout latency - it wait coroutine cancelation.
120+
Wait coroutine result only one loop.
121+
"""
122+
res = concurrent.futures.Future()
123+
124+
async def call_coro():
125+
try:
126+
res.set_result(await coro)
127+
except asyncio.CancelledError:
128+
res.set_exception(TimeoutError())
129+
130+
coro_future = asyncio.run_coroutine_threadsafe(call_coro(), self._loop)
131+
asyncio.run_coroutine_threadsafe(asyncio.sleep(0), self._loop).result()
132+
coro_future.cancel()
133+
return res.result()
134+
135+
def call_sync(self, callback: typing.Callable[[], typing.Any]) -> typing.Any:
136+
result = concurrent.futures.Future()
137+
138+
def call_callback():
139+
try:
140+
res = callback()
141+
result.set_result(res)
142+
except BaseException as err:
143+
result.set_exception(err)
144+
145+
self._loop.call_soon_threadsafe(call_callback)
146+
147+
return result.result()

ydb/_topic_common/common_test.py

Lines changed: 177 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,12 @@
11
import asyncio
2+
import threading
3+
import time
24
import typing
35

46
import grpc
57
import pytest
68

9+
from .common import CallFromSyncToAsync
710
from .._grpc.grpcwrapper.common_utils import (
811
GrpcWrapperAsyncIO,
912
ServerStatus,
@@ -25,6 +28,23 @@
2528
)
2629

2730

31+
@pytest.fixture()
32+
def separate_loop():
33+
loop = asyncio.new_event_loop()
34+
35+
def run_loop():
36+
loop.run_forever()
37+
pass
38+
39+
t = threading.Thread(target=run_loop, name="test separate loop")
40+
t.start()
41+
42+
yield loop
43+
44+
loop.call_soon_threadsafe(lambda: loop.stop())
45+
t.join()
46+
47+
2848
@pytest.mark.asyncio
2949
class Test:
3050
async def test_callback_from_asyncio(self):
@@ -111,3 +131,160 @@ def test_failed(self):
111131
assert not status.is_success()
112132
with pytest.raises(issues.Overloaded):
113133
issues._process_response(status)
134+
135+
136+
@pytest.mark.asyncio
137+
class TestCallFromSyncToAsync:
138+
@pytest.fixture()
139+
def caller(self, separate_loop):
140+
return CallFromSyncToAsync(separate_loop)
141+
142+
def test_unsafe_call_with_future(self, separate_loop, caller):
143+
callback_loop = None
144+
145+
async def callback():
146+
nonlocal callback_loop
147+
callback_loop = asyncio.get_running_loop()
148+
return 1
149+
150+
f = caller.unsafe_call_with_future(callback())
151+
152+
assert f.result() == 1
153+
assert callback_loop is separate_loop
154+
155+
def test_unsafe_call_with_result_ok(self, separate_loop, caller):
156+
callback_loop = None
157+
158+
async def callback():
159+
nonlocal callback_loop
160+
callback_loop = asyncio.get_running_loop()
161+
return 1
162+
163+
res = caller.unsafe_call_with_result(callback(), None)
164+
165+
assert res == 1
166+
assert callback_loop is separate_loop
167+
168+
def test_unsafe_call_with_result_timeout(self, separate_loop, caller):
169+
timeout = 0.01
170+
callback_loop = None
171+
172+
async def callback():
173+
nonlocal callback_loop
174+
callback_loop = asyncio.get_running_loop()
175+
await asyncio.sleep(1)
176+
return 1
177+
178+
start = time.monotonic()
179+
with pytest.raises(TimeoutError):
180+
caller.unsafe_call_with_result(callback(), timeout)
181+
finished = time.monotonic()
182+
183+
assert callback_loop is separate_loop
184+
assert finished - start > timeout
185+
186+
def test_safe_call_with_result_ok(self, separate_loop, caller):
187+
callback_loop = None
188+
189+
async def callback():
190+
nonlocal callback_loop
191+
callback_loop = asyncio.get_running_loop()
192+
return 1
193+
194+
res = caller.safe_call_with_result(callback(), 1)
195+
196+
assert res == 1
197+
assert callback_loop is separate_loop
198+
199+
def test_safe_call_with_result_timeout(self, separate_loop, caller):
200+
timeout = 0.01
201+
callback_loop = None
202+
cancelled = False
203+
204+
async def callback():
205+
nonlocal callback_loop, cancelled
206+
callback_loop = asyncio.get_running_loop()
207+
try:
208+
await asyncio.sleep(1)
209+
except asyncio.CancelledError:
210+
cancelled = True
211+
raise
212+
213+
return 1
214+
215+
start = time.monotonic()
216+
with pytest.raises(TimeoutError):
217+
caller.safe_call_with_result(callback(), timeout)
218+
finished = time.monotonic()
219+
220+
# wait one loop for handle task cancelation
221+
asyncio.run_coroutine_threadsafe(asyncio.sleep(0), separate_loop)
222+
223+
assert callback_loop is separate_loop
224+
assert finished - start > timeout
225+
assert cancelled
226+
227+
def test_safe_callback_with_0_timeout_ok(self, separate_loop, caller):
228+
callback_loop = None
229+
230+
async def f1():
231+
return 1
232+
233+
async def f2():
234+
return await f1()
235+
236+
async def callback():
237+
nonlocal callback_loop
238+
callback_loop = asyncio.get_running_loop()
239+
return await f2()
240+
241+
res = caller.safe_call_with_result(callback(), 0)
242+
assert callback_loop is separate_loop
243+
assert res == 1
244+
245+
def test_safe_callback_with_0_timeout_timeout(self, separate_loop, caller):
246+
callback_loop = None
247+
cancelled = False
248+
249+
async def callback():
250+
try:
251+
nonlocal callback_loop, cancelled
252+
253+
callback_loop = asyncio.get_running_loop()
254+
await asyncio.sleep(1)
255+
except asyncio.CancelledError:
256+
cancelled = True
257+
raise
258+
259+
with pytest.raises(TimeoutError):
260+
caller.safe_call_with_result(callback(), 0)
261+
262+
assert callback_loop is separate_loop
263+
assert cancelled
264+
265+
def test_call_sync_ok(self, separate_loop, caller):
266+
callback_eventloop = None
267+
268+
def callback():
269+
nonlocal callback_eventloop
270+
callback_eventloop = asyncio.get_running_loop()
271+
return 1
272+
273+
res = caller.call_sync(callback)
274+
assert callback_eventloop is separate_loop
275+
assert res == 1
276+
277+
def test_call_sync_error(self, separate_loop, caller):
278+
callback_eventloop = None
279+
280+
class TestError(RuntimeError):
281+
pass
282+
283+
def callback():
284+
nonlocal callback_eventloop
285+
callback_eventloop = asyncio.get_running_loop()
286+
raise TestError
287+
288+
with pytest.raises(TestError):
289+
caller.call_sync(callback)
290+
assert callback_eventloop is separate_loop

0 commit comments

Comments
 (0)