Skip to content

Commit 6e749ed

Browse files
committed
linter
1 parent 387c17c commit 6e749ed

File tree

3 files changed

+66
-38
lines changed

3 files changed

+66
-38
lines changed

ydb/_topic_common/common.py

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -68,13 +68,17 @@ class CallFromSyncToAsync:
6868
def __init__(self, loop: asyncio.AbstractEventLoop):
6969
self._loop = loop
7070

71-
def unsafe_call_with_future(self, coro: typing.Coroutine) -> concurrent.futures.Future:
71+
def unsafe_call_with_future(
72+
self, coro: typing.Coroutine
73+
) -> concurrent.futures.Future:
7274
"""
7375
returned result from coro may be lost
7476
"""
7577
return asyncio.run_coroutine_threadsafe(coro, self._loop)
7678

77-
def unsafe_call_with_result(self, coro: typing.Coroutine, timeout: typing.Union[int, float, None]):
79+
def unsafe_call_with_result(
80+
self, coro: typing.Coroutine, timeout: typing.Union[int, float, None]
81+
):
7882
"""
7983
returned result from coro may be lost by race future cancel by timeout and return value from coroutine
8084
"""
@@ -99,7 +103,7 @@ async def call_coro():
99103
try:
100104
res = await asyncio.wait_for(task, timeout)
101105
return res
102-
except BaseException as err:
106+
except asyncio.TimeoutError:
103107
try:
104108
res = await task
105109
return res
@@ -109,7 +113,6 @@ async def call_coro():
109113
# return builtin TimeoutError instead of asyncio.TimeoutError
110114
raise TimeoutError()
111115

112-
113116
return asyncio.run_coroutine_threadsafe(call_coro(), self._loop).result()
114117

115118
def _safe_call_fast(self, coro: typing.Coroutine):

ydb/_topic_reader/topic_reader_sync.py

Lines changed: 36 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import asyncio
22
import concurrent.futures
33
import typing
4-
from typing import List, Union, Iterable, Optional, Coroutine
4+
from typing import List, Union, Iterable, Optional
55

66
from ydb._grpc.grpcwrapper.common_utils import SupportedDriverType
77
from ydb._topic_common.common import _get_shared_event_loop, CallFromSyncToAsync
@@ -24,11 +24,11 @@ class TopicReaderSync:
2424
_closed: bool
2525

2626
def __init__(
27-
self,
28-
driver: SupportedDriverType,
29-
settings: PublicReaderSettings,
30-
*,
31-
eventloop: Optional[asyncio.AbstractEventLoop] = None,
27+
self,
28+
driver: SupportedDriverType,
29+
settings: PublicReaderSettings,
30+
*,
31+
eventloop: Optional[asyncio.AbstractEventLoop] = None,
3232
):
3333
self._closed = False
3434

@@ -70,7 +70,7 @@ async def sessions_stat(self) -> List[SessionStat]:
7070
raise NotImplementedError()
7171

7272
def messages(
73-
self, *, timeout: Union[float, None] = None
73+
self, *, timeout: Union[float, None] = None
7474
) -> Iterable[PublicMessage]:
7575
"""
7676
todo?
@@ -104,11 +104,11 @@ def async_wait_message(self) -> concurrent.futures.Future:
104104
raise NotImplementedError()
105105

106106
def batches(
107-
self,
108-
*,
109-
max_messages: Union[int, None] = None,
110-
max_bytes: Union[int, None] = None,
111-
timeout: Union[float, None] = None,
107+
self,
108+
*,
109+
max_messages: Union[int, None] = None,
110+
max_bytes: Union[int, None] = None,
111+
timeout: Union[float, None] = None,
112112
) -> Iterable[PublicBatch]:
113113
"""
114114
Block until receive new batch.
@@ -120,11 +120,11 @@ def batches(
120120
raise NotImplementedError()
121121

122122
def receive_batch(
123-
self,
124-
*,
125-
max_messages: typing.Union[int, None] = None,
126-
max_bytes: typing.Union[int, None] = None,
127-
timeout: Union[float, None] = None,
123+
self,
124+
*,
125+
max_messages: typing.Union[int, None] = None,
126+
max_bytes: typing.Union[int, None] = None,
127+
timeout: Union[float, None] = None,
128128
) -> Union[PublicBatch, None]:
129129
"""
130130
Get one messages batch from reader
@@ -136,10 +136,15 @@ def receive_batch(
136136
self._check_closed()
137137

138138
return self._caller.safe_call_with_result(
139-
self._async_reader.receive_batch(max_messages=max_messages, max_bytes=max_bytes),
140-
timeout)
141-
142-
def commit(self, mess: typing.Union[datatypes.PublicMessage, datatypes.PublicBatch]):
139+
self._async_reader.receive_batch(
140+
max_messages=max_messages, max_bytes=max_bytes
141+
),
142+
timeout,
143+
)
144+
145+
def commit(
146+
self, mess: typing.Union[datatypes.PublicMessage, datatypes.PublicBatch]
147+
):
143148
"""
144149
Put commit message to internal buffer.
145150
@@ -151,7 +156,7 @@ def commit(self, mess: typing.Union[datatypes.PublicMessage, datatypes.PublicBat
151156
self._caller.call_sync(self._async_reader.commit(mess))
152157

153158
def commit_with_ack(
154-
self, mess: ICommittable, timeout: typing.Union[int, float, None] = None
159+
self, mess: ICommittable, timeout: typing.Union[int, float, None] = None
155160
) -> Union[CommitResult, List[CommitResult]]:
156161
"""
157162
write commit message to a buffer and wait ack from the server.
@@ -160,15 +165,21 @@ def commit_with_ack(
160165
"""
161166
self._check_closed()
162167

163-
return self._caller.unsafe_call_with_result(self._async_reader.commit_with_ack(mess), timeout)
168+
return self._caller.unsafe_call_with_result(
169+
self._async_reader.commit_with_ack(mess), timeout
170+
)
164171

165-
def async_commit_with_ack(self, mess: typing.Union[datatypes.PublicMessage, datatypes.PublicBatch]) -> concurrent.futures.Future:
172+
def async_commit_with_ack(
173+
self, mess: typing.Union[datatypes.PublicMessage, datatypes.PublicBatch]
174+
) -> concurrent.futures.Future:
166175
"""
167176
write commit message to a buffer and return Future for wait result.
168177
"""
169178
self._check_closed()
170179

171-
return self._caller.unsafe_call_with_future(self._async_reader.commit_with_ack(mess))
180+
return self._caller.unsafe_call_with_future(
181+
self._async_reader.commit_with_ack(mess)
182+
)
172183

173184
def async_flush(self) -> concurrent.futures.Future:
174185
"""

ydb/_topic_writer/topic_writer_sync.py

Lines changed: 23 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -3,19 +3,23 @@
33
import asyncio
44
import typing
55
from concurrent.futures import Future
6-
from typing import Union, List, Optional, Coroutine
6+
from typing import Union, List, Optional
77

88
from .._grpc.grpcwrapper.common_utils import SupportedDriverType
99
from .topic_writer import (
1010
PublicWriterSettings,
11-
TopicWriterError,
1211
PublicWriterInitInfo,
1312
PublicWriteResult,
14-
Message, TopicWriterClosedError,
13+
Message,
14+
TopicWriterClosedError,
1515
)
1616

1717
from .topic_writer_asyncio import WriterAsyncIO
18-
from .._topic_common.common import _get_shared_event_loop, TimeoutType, CallFromSyncToAsync
18+
from .._topic_common.common import (
19+
_get_shared_event_loop,
20+
TimeoutType,
21+
CallFromSyncToAsync,
22+
)
1923

2024

2125
class WriterSync:
@@ -43,21 +47,27 @@ def __init__(
4347
async def create_async_writer():
4448
return WriterAsyncIO(driver, settings)
4549

46-
self._async_writer = self._caller.safe_call_with_result(create_async_writer(), None)
50+
self._async_writer = self._caller.safe_call_with_result(
51+
create_async_writer(), None
52+
)
4753

4854
def __enter__(self):
4955
return self
5056

5157
def __exit__(self, exc_type, exc_val, exc_tb):
5258
self.close()
5359

54-
def close(self, *, flush: bool = True, timeout: typing.Union[int, float, None] = None):
60+
def close(
61+
self, *, flush: bool = True, timeout: typing.Union[int, float, None] = None
62+
):
5563
if self._closed:
5664
return
5765

5866
self._closed = True
5967

60-
self._caller.safe_call_with_result(self._async_writer.close(flush=flush), timeout)
68+
self._caller.safe_call_with_result(
69+
self._async_writer.close(flush=flush), timeout
70+
)
6171

6272
def _check_closed(self):
6373
if self._closed:
@@ -81,7 +91,9 @@ def async_wait_init(self) -> Future[PublicWriterInitInfo]:
8191
def wait_init(self, *, timeout: TimeoutType = None) -> PublicWriterInitInfo:
8292
self._check_closed()
8393

84-
return self._caller.unsafe_call_with_result(self._async_writer.wait_init(), timeout)
94+
return self._caller.unsafe_call_with_result(
95+
self._async_writer.wait_init(), timeout
96+
)
8597

8698
def write(
8799
self,
@@ -98,7 +110,9 @@ def async_write_with_ack(
98110
) -> Future[Union[PublicWriteResult, List[PublicWriteResult]]]:
99111
self._check_closed()
100112

101-
return self._caller.unsafe_call_with_future(self._async_writer.write_with_ack(messages))
113+
return self._caller.unsafe_call_with_future(
114+
self._async_writer.write_with_ack(messages)
115+
)
102116

103117
def write_with_ack(
104118
self,

0 commit comments

Comments
 (0)