Skip to content

Commit 9d3e99d

Browse files
committed
rewrite reader to sync caller
1 parent d0d409f commit 9d3e99d

File tree

1 file changed

+48
-58
lines changed

1 file changed

+48
-58
lines changed

ydb/_topic_reader/topic_reader_sync.py

Lines changed: 48 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,8 @@
44
from typing import List, Union, Iterable, Optional, Coroutine
55

66
from ydb._grpc.grpcwrapper.common_utils import SupportedDriverType
7-
from ydb._topic_common.common import _get_shared_event_loop
7+
from ydb._topic_common.common import _get_shared_event_loop, CallFromSyncToAsync
8+
from ydb._topic_reader import datatypes
89
from ydb._topic_reader.datatypes import PublicMessage, PublicBatch, ICommittable
910
from ydb._topic_reader.topic_reader import (
1011
PublicReaderSettings,
@@ -18,29 +19,31 @@
1819

1920

2021
class TopicReaderSync:
21-
_loop: asyncio.AbstractEventLoop
22+
_caller: CallFromSyncToAsync
2223
_async_reader: PublicAsyncIOReader
2324
_closed: bool
2425

2526
def __init__(
26-
self,
27-
driver: SupportedDriverType,
28-
settings: PublicReaderSettings,
29-
*,
30-
eventloop: Optional[asyncio.AbstractEventLoop] = None,
27+
self,
28+
driver: SupportedDriverType,
29+
settings: PublicReaderSettings,
30+
*,
31+
eventloop: Optional[asyncio.AbstractEventLoop] = None,
3132
):
3233
self._closed = False
3334

3435
if eventloop:
35-
self._loop = eventloop
36+
loop = eventloop
3637
else:
37-
self._loop = _get_shared_event_loop()
38+
loop = _get_shared_event_loop()
39+
40+
self._caller = CallFromSyncToAsync(loop)
3841

3942
async def create_reader():
4043
return PublicAsyncIOReader(driver, settings)
4144

4245
self._async_reader = asyncio.run_coroutine_threadsafe(
43-
create_reader(), self._loop
46+
create_reader(), loop
4447
).result()
4548

4649
def __del__(self):
@@ -52,26 +55,6 @@ def __enter__(self):
5255
def __exit__(self, exc_type, exc_val, exc_tb):
5356
self.close()
5457

55-
def _call(self, coro) -> concurrent.futures.Future:
56-
"""
57-
Call async function and return future fow wait result
58-
"""
59-
if self._closed:
60-
raise TopicReaderClosedError()
61-
62-
return asyncio.run_coroutine_threadsafe(coro, self._loop)
63-
64-
def _call_sync(self, coro: Coroutine, timeout):
65-
"""
66-
Call async function, wait and return result
67-
"""
68-
f = self._call(coro)
69-
try:
70-
return f.result(timeout)
71-
except TimeoutError:
72-
f.cancel()
73-
raise
74-
7558
def async_sessions_stat(self) -> concurrent.futures.Future:
7659
"""
7760
Receive stat from the server, return feature.
@@ -87,7 +70,7 @@ async def sessions_stat(self) -> List[SessionStat]:
8770
raise NotImplementedError()
8871

8972
def messages(
90-
self, *, timeout: Union[float, None] = None
73+
self, *, timeout: Union[float, None] = None
9174
) -> Iterable[PublicMessage]:
9275
"""
9376
todo?
@@ -121,11 +104,11 @@ def async_wait_message(self) -> concurrent.futures.Future:
121104
raise NotImplementedError()
122105

123106
def batches(
124-
self,
125-
*,
126-
max_messages: Union[int, None] = None,
127-
max_bytes: Union[int, None] = None,
128-
timeout: Union[float, None] = None,
107+
self,
108+
*,
109+
max_messages: Union[int, None] = None,
110+
max_bytes: Union[int, None] = None,
111+
timeout: Union[float, None] = None,
129112
) -> Iterable[PublicBatch]:
130113
"""
131114
Block until receive new batch.
@@ -137,11 +120,11 @@ def batches(
137120
raise NotImplementedError()
138121

139122
def receive_batch(
140-
self,
141-
*,
142-
max_messages: typing.Union[int, None] = None,
143-
max_bytes: typing.Union[int, None] = None,
144-
timeout: Union[float, None] = None,
123+
self,
124+
*,
125+
max_messages: typing.Union[int, None] = None,
126+
max_bytes: typing.Union[int, None] = None,
127+
timeout: Union[float, None] = None,
145128
) -> Union[PublicBatch, None]:
146129
"""
147130
Get one messages batch from reader
@@ -150,37 +133,42 @@ def receive_batch(
150133
if no new message in timeout seconds (default - infinite): raise TimeoutError()
151134
if timeout <= 0 - it will fast non block method, get messages from internal buffer only.
152135
"""
153-
return self._call_sync(
154-
self._async_reader.receive_batch(
155-
max_messages=max_messages, max_bytes=max_bytes
156-
),
157-
timeout,
158-
)
136+
self._check_closed()
159137

160-
def commit(self, mess: ICommittable):
138+
return self._caller.safe_call_with_result(
139+
self._async_reader.receive_batch(max_messages=max_messages, max_bytes=max_bytes),
140+
timeout)
141+
142+
def commit(self, mess: typing.Union[datatypes.PublicMessage, datatypes.PublicBatch]):
161143
"""
162144
Put commit message to internal buffer.
163145
164146
For the method no way check the commit result
165147
(for example if lost connection - commits will not re-send and committed messages will receive again)
166148
"""
167-
self._call_sync(self._async_reader.commit(mess), None)
149+
self._check_closed()
150+
151+
self._caller.call_sync(self._async_reader.commit(mess))
168152

169153
def commit_with_ack(
170-
self, mess: ICommittable
154+
self, mess: ICommittable, timeout: typing.Union[int, float, None] = None
171155
) -> Union[CommitResult, List[CommitResult]]:
172156
"""
173157
write commit message to a buffer and wait ack from the server.
174158
175159
if receive in timeout seconds (default - infinite): raise TimeoutError()
176160
"""
177-
return self._call_sync(self._async_reader.commit_with_ack(mess), None)
161+
self._check_closed()
162+
163+
return self._caller.unsafe_call_with_result(self._async_reader.commit_with_ack(mess), timeout)
178164

179-
def async_commit_with_ack(self, mess: ICommittable) -> concurrent.futures.Future:
165+
def async_commit_with_ack(self, mess: typing.Union[datatypes.PublicMessage, datatypes.PublicBatch]) -> concurrent.futures.Future:
180166
"""
181167
write commit message to a buffer and return Future for wait result.
182168
"""
183-
return self._call(self._async_reader.commit_with_ack(mess), None)
169+
self._check_closed()
170+
171+
return self._caller.unsafe_call_with_future(self._async_reader.commit_with_ack(mess))
184172

185173
def async_flush(self) -> concurrent.futures.Future:
186174
"""
@@ -194,12 +182,14 @@ def flush(self):
194182
"""
195183
raise NotImplementedError()
196184

197-
def close(self):
185+
def close(self, *, timeout: typing.Union[int, float, None] = None):
198186
if self._closed:
199187
return
188+
200189
self._closed = True
201190

202-
# for no call self._call_sync on closed object
203-
asyncio.run_coroutine_threadsafe(
204-
self._async_reader.close(), self._loop
205-
).result()
191+
self._caller.safe_call_with_result(self._async_reader.close(), timeout)
192+
193+
def _check_closed(self):
194+
if self._closed:
195+
raise TopicReaderClosedError()

0 commit comments

Comments
 (0)