Skip to content

Commit 498c124

Browse files
committed
another approach
1 parent cbc0352 commit 498c124

File tree

5 files changed

+200
-113
lines changed

5 files changed

+200
-113
lines changed

tests/topics/test_topic_reader.py

+81-63
Original file line numberDiff line numberDiff line change
@@ -253,6 +253,11 @@ async def wait(fut):
253253
await reader1.close()
254254

255255

256+
@pytest.fixture()
257+
def topic_selector(topic_with_messages):
258+
return ydb.TopicReaderSelector(path=topic_with_messages, partitions=[0])
259+
260+
256261
@pytest.mark.asyncio
257262
class TestTopicNoConsumerReaderAsyncIO:
258263
async def test_reader_with_no_partition_ids_raises(self, driver, topic_with_messages):
@@ -262,57 +267,65 @@ async def test_reader_with_no_partition_ids_raises(self, driver, topic_with_mess
262267
consumer=None,
263268
)
264269

265-
async def test_reader_with_default_lambda(self, driver, topic_with_messages):
266-
reader = driver.topic_client.reader(
267-
topic_with_messages,
268-
consumer=None,
269-
partition_ids=[0],
270-
)
270+
async def test_reader_with_no_partition_ids_selector_raises(self, driver, topic_selector):
271+
topic_selector.partitions = None
272+
273+
with pytest.raises(ydb.Error):
274+
driver.topic_client.reader(
275+
topic_selector,
276+
consumer=None,
277+
)
278+
279+
async def test_reader_with_default_lambda(self, driver, topic_selector):
280+
reader = driver.topic_client.reader(topic_selector, consumer=None)
271281
msg = await reader.receive_message()
272282

273283
assert msg.seqno == 1
274284

275285
await reader.close()
276286

277-
async def test_reader_with_sync_lambda(self, driver, topic_with_messages):
278-
def sync_lambda(partition_id: int):
279-
assert partition_id == 0
280-
return 1
287+
async def test_reader_with_sync_lambda(self, driver, topic_selector):
288+
class CustomEventHandler(ydb.TopicReaderEvents.EventHandler):
289+
def on_partition_get_start_offset(self, event):
290+
assert topic_selector.path.endswith(event.topic)
291+
assert event.partition_id == 0
292+
return ydb.TopicReaderEvents.OnPartitionGetStartOffsetResponse(1)
281293

282294
reader = driver.topic_client.reader(
283-
topic_with_messages,
295+
topic_selector,
284296
consumer=None,
285-
partition_ids=[0],
286-
get_start_offset_lambda=sync_lambda,
297+
event_handler=CustomEventHandler(),
287298
)
299+
288300
msg = await reader.receive_message()
289301

290302
assert msg.seqno == 2
291303

292304
await reader.close()
293305

294-
async def test_reader_with_async_lambda(self, driver, topic_with_messages):
295-
async def async_lambda(partition_id: int) -> int:
296-
assert partition_id == 0
297-
return 1
306+
async def test_reader_with_async_lambda(self, driver, topic_selector):
307+
class CustomEventHandler(ydb.TopicReaderEvents.EventHandler):
308+
async def on_partition_get_start_offset(self, event):
309+
assert topic_selector.path.endswith(event.topic)
310+
assert event.partition_id == 0
311+
return ydb.TopicReaderEvents.OnPartitionGetStartOffsetResponse(1)
298312

299313
reader = driver.topic_client.reader(
300-
topic_with_messages,
314+
topic_selector,
301315
consumer=None,
302-
partition_ids=[0],
303-
get_start_offset_lambda=async_lambda,
316+
event_handler=CustomEventHandler(),
304317
)
318+
305319
msg = await reader.receive_message()
306320

307321
assert msg.seqno == 2
308322

309323
await reader.close()
310324

311-
async def test_commit_not_allowed(self, driver, topic_with_messages):
325+
async def test_commit_not_allowed(self, driver, topic_selector):
312326
reader = driver.topic_client.reader(
313-
topic_with_messages,
327+
topic_selector,
314328
consumer=None,
315-
partition_ids=[0],
316329
)
317330
batch = await reader.receive_batch()
318331

@@ -324,18 +337,18 @@ async def test_commit_not_allowed(self, driver, topic_with_messages):
324337

325338
await reader.close()
326339

327-
async def test_offsets_updated_after_reconnect(self, driver, topic_with_messages):
340+
async def test_offsets_updated_after_reconnect(self, driver, topic_selector):
328341
current_offset = 0
329342

330-
def get_start_offset_lambda(partition_id: int) -> int:
331-
nonlocal current_offset
332-
return current_offset
343+
class CustomEventHandler(ydb.TopicReaderEvents.EventHandler):
344+
def on_partition_get_start_offset(self, event):
345+
nonlocal current_offset
346+
return ydb.TopicReaderEvents.OnPartitionGetStartOffsetResponse(current_offset)
333347

334348
reader = driver.topic_client.reader(
335-
topic_with_messages,
349+
topic_selector,
336350
consumer=None,
337-
partition_ids=[0],
338-
get_start_offset_lambda=get_start_offset_lambda,
351+
event_handler=CustomEventHandler(),
339352
)
340353
msg = await reader.receive_message()
341354

@@ -361,57 +374,65 @@ def test_reader_with_no_partition_ids_raises(self, driver_sync, topic_with_messa
361374
consumer=None,
362375
)
363376

364-
def test_reader_with_default_lambda(self, driver_sync, topic_with_messages):
365-
reader = driver_sync.topic_client.reader(
366-
topic_with_messages,
367-
consumer=None,
368-
partition_ids=[0],
369-
)
377+
def test_reader_with_no_partition_ids_selector_raises(self, driver_sync, topic_selector):
378+
topic_selector.partitions = None
379+
380+
with pytest.raises(ydb.Error):
381+
driver_sync.topic_client.reader(
382+
topic_selector,
383+
consumer=None,
384+
)
385+
386+
def test_reader_with_default_lambda(self, driver_sync, topic_selector):
387+
reader = driver_sync.topic_client.reader(topic_selector, consumer=None)
370388
msg = reader.receive_message()
371389

372390
assert msg.seqno == 1
373391

374392
reader.close()
375393

376-
def test_reader_with_sync_lambda(self, driver_sync, topic_with_messages):
377-
def sync_lambda(partition_id: int):
378-
assert partition_id == 0
379-
return 1
394+
def test_reader_with_sync_lambda(self, driver_sync, topic_selector):
395+
class CustomEventHandler(ydb.TopicReaderEvents.EventHandler):
396+
def on_partition_get_start_offset(self, event):
397+
assert topic_selector.path.endswith(event.topic)
398+
assert event.partition_id == 0
399+
return ydb.TopicReaderEvents.OnPartitionGetStartOffsetResponse(1)
380400

381401
reader = driver_sync.topic_client.reader(
382-
topic_with_messages,
402+
topic_selector,
383403
consumer=None,
384-
partition_ids=[0],
385-
get_start_offset_lambda=sync_lambda,
404+
event_handler=CustomEventHandler(),
386405
)
406+
387407
msg = reader.receive_message()
388408

389409
assert msg.seqno == 2
390410

391411
reader.close()
392412

393-
def test_reader_with_async_lambda(self, driver_sync, topic_with_messages):
394-
async def async_lambda(partition_id: int) -> int:
395-
assert partition_id == 0
396-
return 1
413+
def test_reader_with_async_lambda(self, driver_sync, topic_selector):
414+
class CustomEventHandler(ydb.TopicReaderEvents.EventHandler):
415+
async def on_partition_get_start_offset(self, event):
416+
assert topic_selector.path.endswith(event.topic)
417+
assert event.partition_id == 0
418+
return ydb.TopicReaderEvents.OnPartitionGetStartOffsetResponse(1)
397419

398420
reader = driver_sync.topic_client.reader(
399-
topic_with_messages,
421+
topic_selector,
400422
consumer=None,
401-
partition_ids=[0],
402-
get_start_offset_lambda=async_lambda,
423+
event_handler=CustomEventHandler(),
403424
)
425+
404426
msg = reader.receive_message()
405427

406428
assert msg.seqno == 2
407429

408430
reader.close()
409431

410-
def test_commit_not_allowed(self, driver_sync, topic_with_messages):
432+
def test_commit_not_allowed(self, driver_sync, topic_selector):
411433
reader = driver_sync.topic_client.reader(
412-
topic_with_messages,
434+
topic_selector,
413435
consumer=None,
414-
partition_ids=[0],
415436
)
416437
batch = reader.receive_batch()
417438

@@ -421,23 +442,20 @@ def test_commit_not_allowed(self, driver_sync, topic_with_messages):
421442
with pytest.raises(ydb.Error):
422443
reader.commit_with_ack(batch)
423444

424-
with pytest.raises(ydb.Error):
425-
reader.async_commit_with_ack(batch)
426-
427445
reader.close()
428446

429-
def test_offsets_updated_after_reconnect(self, driver_sync, topic_with_messages):
447+
def test_offsets_updated_after_reconnect(self, driver_sync, topic_selector):
430448
current_offset = 0
431449

432-
def get_start_offset_lambda(partition_id: int) -> int:
433-
nonlocal current_offset
434-
return current_offset
450+
class CustomEventHandler(ydb.TopicReaderEvents.EventHandler):
451+
def on_partition_get_start_offset(self, event):
452+
nonlocal current_offset
453+
return ydb.TopicReaderEvents.OnPartitionGetStartOffsetResponse(current_offset)
435454

436455
reader = driver_sync.topic_client.reader(
437-
topic_with_messages,
456+
topic_selector,
438457
consumer=None,
439-
partition_ids=[0],
440-
get_start_offset_lambda=get_start_offset_lambda,
458+
event_handler=CustomEventHandler(),
441459
)
442460
msg = reader.receive_message()
443461

ydb/_topic_reader/events.py

+81
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
import asyncio
2+
from dataclasses import dataclass
3+
from typing import Awaitable, Union
4+
5+
from ..issues import ClientInternalError
6+
7+
__all__ = [
8+
"OnCommit",
9+
"OnPartitionGetStartOffsetRequest",
10+
"OnPartitionGetStartOffsetResponse",
11+
"OnInitPartition",
12+
"OnShutdownPartition",
13+
"EventHandler",
14+
]
15+
16+
17+
class BaseReaderEvent:
18+
pass
19+
20+
21+
@dataclass
22+
class OnCommit(BaseReaderEvent):
23+
topic: str
24+
offset: int
25+
26+
27+
@dataclass
28+
class OnPartitionGetStartOffsetRequest(BaseReaderEvent):
29+
topic: str
30+
partition_id: int
31+
32+
33+
@dataclass
34+
class OnPartitionGetStartOffsetResponse:
35+
start_offset: int
36+
37+
38+
class OnInitPartition(BaseReaderEvent):
39+
pass
40+
41+
42+
class OnShutdownPartition:
43+
pass
44+
45+
46+
TopicEventDispatchType = Union[OnPartitionGetStartOffsetResponse, None]
47+
48+
49+
class EventHandler:
50+
def on_commit(self, event: OnCommit) -> Union[None, Awaitable[None]]:
51+
pass
52+
53+
def on_partition_get_start_offset(
54+
self,
55+
event: OnPartitionGetStartOffsetRequest,
56+
) -> Union[OnPartitionGetStartOffsetResponse, Awaitable[OnPartitionGetStartOffsetResponse]]:
57+
pass
58+
59+
def on_init_partition(self, event: OnInitPartition) -> Union[None, Awaitable[None]]:
60+
pass
61+
62+
def on_shutdown_partition(self, event: OnShutdownPartition) -> Union[None, Awaitable[None]]:
63+
pass
64+
65+
async def _dispatch(self, event: BaseReaderEvent) -> Awaitable[TopicEventDispatchType]:
66+
f = None
67+
if isinstance(event, OnCommit):
68+
f = self.on_commit
69+
elif isinstance(event, OnPartitionGetStartOffsetRequest):
70+
f = self.on_partition_get_start_offset
71+
elif isinstance(event, OnInitPartition):
72+
f = self.on_init_partition
73+
elif isinstance(event, OnShutdownPartition):
74+
f = self.on_shutdown_partition
75+
else:
76+
raise ClientInternalError("Unsupported topic reader event")
77+
78+
if asyncio.iscoroutinefunction(f):
79+
return await f(event)
80+
81+
return f(event)

ydb/_topic_reader/topic_reader.py

+4-24
Original file line numberDiff line numberDiff line change
@@ -3,14 +3,14 @@
33
import datetime
44
from dataclasses import dataclass
55
from typing import (
6-
Awaitable,
76
Union,
87
Optional,
98
List,
109
Mapping,
1110
Callable,
1211
)
1312

13+
from .events import EventHandler
1414
from ..retries import RetrySettings
1515
from .._grpc.grpcwrapper.ydb_topic import StreamReadMessage, OffsetsRange
1616

@@ -21,6 +21,7 @@ class PublicTopicSelector:
2121
partitions: Optional[Union[int, List[int]]] = None
2222
read_from: Optional[datetime.datetime] = None
2323
max_lag: Optional[datetime.timedelta] = None
24+
read_offset: Optional[int] = None
2425

2526
def _to_topic_read_settings(self) -> StreamReadMessage.InitRequest.TopicReadSettings:
2627
partitions = self.partitions
@@ -54,9 +55,7 @@ class PublicReaderSettings:
5455
# decoder_executor, must be set for handle non raw messages
5556
decoder_executor: Optional[concurrent.futures.Executor] = None
5657
update_token_interval: Union[int, float] = 3600
57-
58-
partition_ids: Optional[List[int]] = None
59-
get_start_offset_lambda: Optional[Union[Callable[[int], int], Callable[[int], Awaitable[int]]]] = None
58+
event_handler: Optional[EventHandler] = None
6059

6160
def __post_init__(self):
6261
# check possible create init message
@@ -73,7 +72,7 @@ def _init_message(self) -> StreamReadMessage.InitRequest:
7372

7473
for index, selector in enumerate(selectors):
7574
if isinstance(selector, str):
76-
selectors[index] = PublicTopicSelector(path=selector, partitions=self.partition_ids)
75+
selectors[index] = PublicTopicSelector(path=selector)
7776
elif isinstance(selector, PublicTopicSelector):
7877
pass
7978
else:
@@ -89,25 +88,6 @@ def _retry_settings(self) -> RetrySettings:
8988
return RetrySettings(idempotent=True)
9089

9190

92-
class Events:
93-
class OnCommit:
94-
topic: str
95-
offset: int
96-
97-
class OnPartitionGetStartOffsetRequest:
98-
topic: str
99-
partition_id: int
100-
101-
class OnPartitionGetStartOffsetResponse:
102-
start_offset: int
103-
104-
class OnInitPartition:
105-
pass
106-
107-
class OnShutdownPatition:
108-
pass
109-
110-
11191
class RetryPolicy:
11292
connection_timeout_sec: float
11393
overload_timeout_sec: float

0 commit comments

Comments
 (0)