Skip to content

Commit f68af4c

Browse files
committed
auto partitioning control plane
1 parent f765537 commit f68af4c

File tree

5 files changed

+49
-6
lines changed

5 files changed

+49
-6
lines changed

tests/topics/test_control_plane.py

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import pytest
44

5+
import ydb
56
from ydb import issues
67

78

@@ -56,6 +57,39 @@ async def test_alter_existed_topic(self, driver, topic_path):
5657
topic_after = await client.describe_topic(topic_path)
5758
assert topic_after.min_active_partitions == target_min_active_partitions
5859

60+
async def test_alter_auto_partitioning_settings(self, driver, topic_path):
61+
client = driver.topic_client
62+
63+
topic_before = await client.describe_topic(topic_path)
64+
65+
expected = topic_before.auto_partitioning_settings
66+
67+
expected.strategy = ydb.TopicAutoPartitioningStrategy.SCALE_UP
68+
69+
await client.alter_topic(
70+
topic_path,
71+
alter_auto_partitioning_settings=ydb.TopicAlterAutoPartitioningSettings(
72+
set_strategy=ydb.TopicAutoPartitioningStrategy.SCALE_UP,
73+
),
74+
)
75+
76+
topic_after = await client.describe_topic(topic_path)
77+
78+
assert topic_after.auto_partitioning_settings == expected
79+
80+
expected.up_utilization_percent = 88
81+
82+
await client.alter_topic(
83+
topic_path,
84+
alter_auto_partitioning_settings=ydb.TopicAlterAutoPartitioningSettings(
85+
set_up_utilization_percent=88,
86+
),
87+
)
88+
89+
topic_after = await client.describe_topic(topic_path)
90+
91+
assert topic_after.auto_partitioning_settings == expected
92+
5993

6094
class TestTopicClientControlPlane:
6195
def test_create_topic(self, driver_sync, database):

ydb/_grpc/grpcwrapper/ydb_topic.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1017,7 +1017,7 @@ def from_proto(code: Optional[int]) -> Optional["AutoPartitioningStrategy"]:
10171017

10181018
def to_public(self) -> ydb_topic_public_types.PublicAutoPartitioningStrategy:
10191019
try:
1020-
ydb_topic_public_types.PublicAutoPartitioningStrategy(int(self))
1020+
return ydb_topic_public_types.PublicAutoPartitioningStrategy(int(self))
10211021
except KeyError:
10221022
return ydb_topic_public_types.PublicAutoPartitioningStrategy.UNSPECIFIED
10231023

@@ -1183,7 +1183,7 @@ def from_proto(code: Optional[int]) -> Optional["MeteringMode"]:
11831183

11841184
def to_public(self) -> ydb_topic_public_types.PublicMeteringMode:
11851185
try:
1186-
ydb_topic_public_types.PublicMeteringMode(int(self))
1186+
return ydb_topic_public_types.PublicMeteringMode(int(self))
11871187
except KeyError:
11881188
return ydb_topic_public_types.PublicMeteringMode.UNSPECIFIED
11891189

ydb/_grpc/grpcwrapper/ydb_topic_public_types.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -82,18 +82,18 @@ class PublicAutoPartitioningStrategy(IntEnum):
8282

8383
@dataclass
8484
class PublicAutoPartitioningSettings:
85-
strategy: Optional["PublicAutoPartitioningStrategy"] = 0
85+
strategy: Optional["PublicAutoPartitioningStrategy"] = None
8686
stabilization_window: Optional[datetime.timedelta] = None
87-
up_utilization_percent: Optional[int] = None
8887
down_utilization_percent: Optional[int] = None
88+
up_utilization_percent: Optional[int] = None
8989

9090

9191
@dataclass
9292
class PublicAlterAutoPartitioningSettings:
93-
set_strategy: Optional["PublicAutoPartitioningStrategy"] = 0
93+
set_strategy: Optional["PublicAutoPartitioningStrategy"] = None
9494
set_stabilization_window: Optional[datetime.timedelta] = None
95-
set_up_utilization_percent: Optional[int] = None
9695
set_down_utilization_percent: Optional[int] = None
96+
set_up_utilization_percent: Optional[int] = None
9797

9898

9999
@dataclass

ydb/_topic_reader/topic_reader_asyncio.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -335,6 +335,8 @@ async def _start(self, stream: IGrpcWrapperAsyncIO, init_message: StreamReadMess
335335
self._started = True
336336
self._stream = stream
337337

338+
print(init_message)
339+
338340
stream.write(StreamReadMessage.FromClient(client_message=init_message))
339341
init_response = await stream.receive() # type: StreamReadMessage.FromServer
340342
if isinstance(init_response.server_message, StreamReadMessage.InitResponse):
@@ -586,6 +588,10 @@ def _on_end_partition_session(self, message: StreamReadMessage.EndPartitionSessi
586588
f"End partition session with id: {message.partition_session_id}, child partitions: {message.child_partition_ids}"
587589
)
588590

591+
print(
592+
f"End partition session with id: {message.partition_session_id}, child partitions: {message.child_partition_ids}"
593+
)
594+
589595
def _on_read_response(self, message: StreamReadMessage.ReadResponse):
590596
self._buffer_consume_bytes(message.bytes_size)
591597

ydb/topic.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,9 @@
77
"TopicCodec",
88
"TopicConsumer",
99
"TopicAlterConsumer",
10+
"TopicAlterAutoPartitioningSettings",
11+
"TopicAutoPartitioningSettings",
12+
"TopicAutoPartitioningStrategy",
1013
"TopicDescription",
1114
"TopicError",
1215
"TopicMeteringMode",

0 commit comments

Comments
 (0)