Skip to content

Commit 93b0318

Browse files
authored
Merge pull request #110 initial topic writer
2 parents 7a2ece5 + 2bee743 commit 93b0318

30 files changed

+3352
-22
lines changed

.github/scripts/increment_version_test.py

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -7,16 +7,17 @@
77
[
88
("0.0.0", 'patch', False, "0.0.1"),
99
("0.0.1", 'patch', False, "0.0.2"),
10-
("0.0.1a1", 'patch', False, "0.0.1"),
11-
("0.0.0", 'patch', True, "0.0.1a1"),
12-
("0.0.1", 'patch', True, "0.0.2a1"),
13-
("0.0.2a1", 'patch', True, "0.0.2a2"),
10+
("0.0.1b1", 'patch', False, "0.0.1"),
11+
("0.0.0", 'patch', True, "0.0.1b1"),
12+
("0.0.1", 'patch', True, "0.0.2b1"),
13+
("0.0.2b1", 'patch', True, "0.0.2b2"),
1414
("0.0.1", 'minor', False, "0.1.0"),
15-
("0.0.1a1", 'minor', False, "0.1.0"),
16-
("0.1.0a1", 'minor', False, "0.1.0"),
17-
("0.1.0", 'minor', True, "0.2.0a1"),
18-
("0.1.0a1", 'minor', True, "0.1.0a2"),
19-
("0.1.1a1", 'minor', True, "0.2.0a1"),
15+
("0.0.1b1", 'minor', False, "0.1.0"),
16+
("0.1.0b1", 'minor', False, "0.1.0"),
17+
("0.1.0", 'minor', True, "0.2.0b1"),
18+
("0.1.0b1", 'minor', True, "0.1.0b2"),
19+
("0.1.1b1", 'minor', True, "0.2.0b1"),
20+
("3.0.0b1", 'patch', True, "3.0.0b2"),
2021
]
2122
)
2223
def test_increment_version(source, inc_type, with_beta, result):

.github/workflows/python-publish.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,7 @@ jobs:
9393
9494
git config --global user.email "robot@umbrella";
9595
git config --global user.name "robot";
96-
git commit -m "Release: $NEW_VERSION";
96+
git commit -m "Release: $TAG";
9797
9898
git tag "$TAG"
9999
git push && git push --tags

.gitignore

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,4 +3,5 @@ ydb.egg-info/
33
/.idea
44
/tox
55
/venv
6-
/ydb_certs
6+
/ydb_certs
7+
/tmp

AUTHORS

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
1-
The following authors have created the source code of "YDB Python SDK"
1+
The following authors have created the source code of "Yandex Database Python SDK"
22
published and distributed by YANDEX LLC as the owner:
33

44
Vitalii Gridnev <[email protected]>
5+
Timofey Koolin <[email protected]>
Lines changed: 190 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,190 @@
1+
import asyncio
2+
import json
3+
import time
4+
5+
import ydb
6+
7+
8+
async def connect():
9+
db = ydb.aio.Driver(
10+
connection_string="grpc://localhost:2135?database=/local",
11+
credentials=ydb.credentials.AnonymousCredentials(),
12+
)
13+
reader = ydb.TopicClientAsyncIO(db).topic_reader(
14+
"/local/topic", consumer="consumer"
15+
)
16+
return reader
17+
18+
19+
async def create_reader_and_close_with_context_manager(db: ydb.aio.Driver):
20+
with ydb.TopicClientAsyncIO(db).topic_reader(
21+
"/database/topic/path", consumer="consumer"
22+
) as reader:
23+
async for message in reader.messages():
24+
pass
25+
26+
27+
async def print_message_content(reader: ydb.TopicReaderAsyncIO):
28+
async for message in reader.messages():
29+
print("text", message.data.read().decode("utf-8"))
30+
# await and async_commit need only for sync commit mode - for wait ack from servr
31+
await reader.commit(message)
32+
33+
34+
async def process_messages_batch_explicit_commit(reader: ydb.TopicReaderAsyncIO):
35+
# Explicit commit example
36+
async for batch in reader.batches(max_messages=100, timeout=2):
37+
async with asyncio.TaskGroup() as tg:
38+
for message in batch.messages:
39+
tg.create_task(_process(message))
40+
41+
# wait complete of process all messages from batch be taskgroup context manager
42+
# and commit complete batch
43+
await reader.commit(batch)
44+
45+
46+
async def process_messages_batch_context_manager_commit(reader: ydb.TopicReaderAsyncIO):
47+
# Commit with context manager
48+
async for batch in reader.batches():
49+
async with reader.commit_on_exit(batch), asyncio.TaskGroup() as tg:
50+
for message in batch.messages:
51+
tg.create_task(_process(message))
52+
53+
54+
async def get_message_with_timeout(reader: ydb.TopicReaderAsyncIO):
55+
try:
56+
message = await asyncio.wait_for(reader.receive_message(), timeout=1)
57+
except TimeoutError:
58+
print("Have no new messages in a second")
59+
return
60+
61+
print("mess", message.data)
62+
63+
64+
async def get_all_messages_with_small_wait(reader: ydb.TopicReaderAsyncIO):
65+
async for message in reader.messages(timeout=1):
66+
await _process(message)
67+
print("Have no new messages in a second")
68+
69+
70+
async def get_a_message_from_external_loop(reader: ydb.TopicReaderAsyncIO):
71+
for i in range(10):
72+
try:
73+
message = await asyncio.wait_for(reader.receive_message(), timeout=1)
74+
except TimeoutError:
75+
return
76+
await _process(message)
77+
78+
79+
async def get_one_batch_from_external_loop_async(reader: ydb.TopicReaderAsyncIO):
80+
for i in range(10):
81+
try:
82+
batch = await asyncio.wait_for(reader.receive_batch(), timeout=2)
83+
except TimeoutError:
84+
return
85+
86+
for message in batch.messages:
87+
await _process(message)
88+
await reader.commit(batch)
89+
90+
91+
async def auto_deserialize_message(db: ydb.aio.Driver):
92+
# async, batch work similar to this
93+
94+
async with ydb.TopicClientAsyncIO(db).topic_reader(
95+
"/database/topic/path", consumer="asd", deserializer=json.loads
96+
) as reader:
97+
async for message in reader.messages():
98+
print(
99+
message.data.Name
100+
) # message.data replaces by json.loads(message.data) of raw message
101+
reader.commit(message)
102+
103+
104+
async def commit_batch_with_context(reader: ydb.TopicReaderAsyncIO):
105+
async for batch in reader.batches():
106+
async with reader.commit_on_exit(batch):
107+
for message in batch.messages:
108+
if not batch.is_alive:
109+
break
110+
await _process(message)
111+
112+
113+
async def handle_partition_stop(reader: ydb.TopicReaderAsyncIO):
114+
async for message in reader.messages():
115+
time.sleep(1) # some work
116+
if message.is_alive:
117+
time.sleep(123) # some other work
118+
await reader.commit(message)
119+
120+
121+
async def handle_partition_stop_batch(reader: ydb.TopicReaderAsyncIO):
122+
def process_batch(batch):
123+
for message in batch.messages:
124+
if not batch.is_alive:
125+
# no reason work with expired batch
126+
# go read next - good batch
127+
return
128+
await _process(message)
129+
await reader.commit(batch)
130+
131+
async for batch in reader.batches():
132+
process_batch(batch)
133+
134+
135+
async def connect_and_read_few_topics(db: ydb.aio.Driver):
136+
with ydb.TopicClientAsyncIO(db).topic_reader(
137+
[
138+
"/database/topic/path",
139+
ydb.TopicSelector("/database/second-topic", partitions=3),
140+
]
141+
) as reader:
142+
async for message in reader.messages():
143+
await _process(message)
144+
await reader.commit(message)
145+
146+
147+
async def handle_partition_graceful_stop_batch(reader: ydb.TopicReaderAsyncIO):
148+
# no special handle, but batch will contain less than prefer count messages
149+
async for batch in reader.batches():
150+
await _process(batch)
151+
reader.commit(batch)
152+
153+
154+
async def advanced_commit_notify(db: ydb.aio.Driver):
155+
def on_commit(event: ydb.TopicReaderEvents.OnCommit) -> None:
156+
print(event.topic)
157+
print(event.offset)
158+
159+
async with ydb.TopicClientAsyncIO(db).topic_reader(
160+
"/local", consumer="consumer", commit_batch_time=4, on_commit=on_commit
161+
) as reader:
162+
async for message in reader.messages():
163+
await _process(message)
164+
await reader.commit(message)
165+
166+
167+
async def advanced_read_with_own_progress_storage(db: ydb.TopicReaderAsyncIO):
168+
async def on_get_partition_start_offset(
169+
req: ydb.TopicReaderEvents.OnPartitionGetStartOffsetRequest,
170+
) -> ydb.TopicReaderEvents.OnPartitionGetStartOffsetResponse:
171+
# read current progress from database
172+
resp = ydb.TopicReaderEvents.OnPartitionGetStartOffsetResponse()
173+
resp.start_offset = 123
174+
return resp
175+
176+
async with ydb.TopicClient(db).topic_reader(
177+
"/local/test",
178+
consumer="consumer",
179+
on_get_partition_start_offset=on_get_partition_start_offset,
180+
) as reader:
181+
async for mess in reader.messages():
182+
await _process(mess)
183+
# save progress to own database
184+
185+
# no commit progress to topic service
186+
# reader.commit(mess)
187+
188+
189+
async def _process(msg):
190+
raise NotImplementedError()

0 commit comments

Comments
 (0)