Skip to content

Commit 387c17c

Browse files
committed
impl writer
1 parent 99e3ab9 commit 387c17c

File tree

3 files changed

+45
-40
lines changed

3 files changed

+45
-40
lines changed

ydb/_topic_common/common.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
from .. import operation, issues
88
from .._grpc.grpcwrapper.common_utils import IFromProtoWithProtoType
99

10-
TimeoutType = typing.Union[int, float]
10+
TimeoutType = typing.Union[int, float, None]
1111

1212

1313
def wrap_operation(rpc_state, response_pb, driver=None):
@@ -86,7 +86,7 @@ def unsafe_call_with_result(self, coro: typing.Coroutine, timeout: typing.Union[
8686
finally:
8787
f.cancel()
8888

89-
def safe_call_with_result(self, coro: typing.Coroutine, timeout: typing.Union[int, float]):
89+
def safe_call_with_result(self, coro: typing.Coroutine, timeout: TimeoutType):
9090
"""
9191
no lost returned value from coro, but may be slower especially timeout latency - it wait coroutine cancelation.
9292
"""

ydb/_topic_writer/topic_writer.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -176,6 +176,11 @@ def __init__(self, message: str):
176176
super(TopicWriterError, self).__init__(message)
177177

178178

179+
class TopicWriterClosedError(ydb.Error):
180+
def __init__(self):
181+
super(TopicWriterClosedError, self).__init__("Topic writer already closed")
182+
183+
179184
class TopicWriterRepeatableError(TopicWriterError):
180185
pass
181186

Lines changed: 38 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
from __future__ import annotations
22

33
import asyncio
4+
import typing
45
from concurrent.futures import Future
56
from typing import Union, List, Optional, Coroutine
67

@@ -10,15 +11,15 @@
1011
TopicWriterError,
1112
PublicWriterInitInfo,
1213
PublicWriteResult,
13-
Message,
14+
Message, TopicWriterClosedError,
1415
)
1516

1617
from .topic_writer_asyncio import WriterAsyncIO
17-
from .._topic_common.common import _get_shared_event_loop, TimeoutType
18+
from .._topic_common.common import _get_shared_event_loop, TimeoutType, CallFromSyncToAsync
1819

1920

2021
class WriterSync:
21-
_loop: asyncio.AbstractEventLoop
22+
_caller: CallFromSyncToAsync
2223
_async_writer: WriterAsyncIO
2324
_closed: bool
2425

@@ -33,80 +34,79 @@ def __init__(
3334
self._closed = False
3435

3536
if eventloop:
36-
self._loop = eventloop
37+
loop = eventloop
3738
else:
38-
self._loop = _get_shared_event_loop()
39+
loop = _get_shared_event_loop()
40+
41+
self._caller = CallFromSyncToAsync(loop)
3942

4043
async def create_async_writer():
4144
return WriterAsyncIO(driver, settings)
4245

43-
self._async_writer = asyncio.run_coroutine_threadsafe(
44-
create_async_writer(), self._loop
45-
).result()
46+
self._async_writer = self._caller.safe_call_with_result(create_async_writer(), None)
4647

4748
def __enter__(self):
4849
return self
4950

5051
def __exit__(self, exc_type, exc_val, exc_tb):
5152
self.close()
5253

53-
def _call(self, coro):
54+
def close(self, *, flush: bool = True, timeout: typing.Union[int, float, None] = None):
5455
if self._closed:
55-
raise TopicWriterError("writer is closed")
56+
return
5657

57-
return asyncio.run_coroutine_threadsafe(coro, self._loop)
58+
self._closed = True
5859

59-
def _call_sync(self, coro: Coroutine, timeout):
60-
f = self._call(coro)
61-
try:
62-
return f.result(timeout)
63-
except TimeoutError:
64-
f.cancel()
65-
raise
60+
self._caller.safe_call_with_result(self._async_writer.close(flush=flush), timeout)
6661

67-
def close(self, flush: bool = True):
62+
def _check_closed(self):
6863
if self._closed:
69-
return
64+
raise TopicWriterClosedError()
7065

71-
self._closed = True
66+
def async_flush(self) -> Future:
67+
self._check_closed()
7268

73-
# for no call self._call_sync on closed object
74-
asyncio.run_coroutine_threadsafe(
75-
self._async_writer.close(flush=flush), self._loop
76-
).result()
69+
return self._caller.unsafe_call_with_future(self._async_writer.flush())
7770

78-
def async_flush(self) -> Future:
79-
if self._closed:
80-
raise TopicWriterError("writer is closed")
81-
return self._call(self._async_writer.flush())
71+
def flush(self, *, timeout=None):
72+
self._check_closed()
8273

83-
def flush(self, timeout=None):
84-
self._call_sync(self._async_writer.flush(), timeout)
74+
return self._caller.unsafe_call_with_result(self._async_writer.flush(), timeout)
8575

8676
def async_wait_init(self) -> Future[PublicWriterInitInfo]:
87-
return self._call(self._async_writer.wait_init())
77+
self._check_closed()
78+
79+
return self._caller.unsafe_call_with_future(self._async_writer.wait_init())
8880

89-
def wait_init(self, timeout: Optional[TimeoutType] = None) -> PublicWriterInitInfo:
90-
return self._call_sync(self._async_writer.wait_init(), timeout)
81+
def wait_init(self, *, timeout: TimeoutType = None) -> PublicWriterInitInfo:
82+
self._check_closed()
83+
84+
return self._caller.unsafe_call_with_result(self._async_writer.wait_init(), timeout)
9185

9286
def write(
9387
self,
9488
messages: Union[Message, List[Message]],
95-
timeout: Union[float, None] = None,
89+
timeout: TimeoutType = None,
9690
):
97-
self._call_sync(self._async_writer.write(messages), timeout=timeout)
91+
self._check_closed()
92+
93+
self._caller.safe_call_with_result(self._async_writer.write(messages), timeout)
9894

9995
def async_write_with_ack(
10096
self,
10197
messages: Union[Message, List[Message]],
10298
) -> Future[Union[PublicWriteResult, List[PublicWriteResult]]]:
103-
return self._call(self._async_writer.write_with_ack(messages))
99+
self._check_closed()
100+
101+
return self._caller.unsafe_call_with_future(self._async_writer.write_with_ack(messages))
104102

105103
def write_with_ack(
106104
self,
107105
messages: Union[Message, List[Message]],
108106
timeout: Union[float, None] = None,
109107
) -> Union[PublicWriteResult, List[PublicWriteResult]]:
110-
return self._call_sync(
108+
self._check_closed()
109+
110+
return self._caller.unsafe_call_with_result(
111111
self._async_writer.write_with_ack(messages), timeout=timeout
112112
)

0 commit comments

Comments
 (0)