Skip to content

Commit

Permalink
auto partitioning control plane
Browse files Browse the repository at this point in the history
  • Loading branch information
vgvoleg committed Feb 4, 2025
1 parent f765537 commit ffc0319
Show file tree
Hide file tree
Showing 5 changed files with 49 additions and 6 deletions.
34 changes: 34 additions & 0 deletions tests/topics/test_control_plane.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import pytest

import ydb
from ydb import issues


Expand Down Expand Up @@ -56,6 +57,39 @@ async def test_alter_existed_topic(self, driver, topic_path):
topic_after = await client.describe_topic(topic_path)
assert topic_after.min_active_partitions == target_min_active_partitions

async def test_alter_auto_partitioning_settings(self, driver, topic_path):
client = driver.topic_client

topic_before = await client.describe_topic(topic_path)

expected = topic_before.auto_partitioning_settings

expected.strategy = ydb.TopicAutoPartitioningStrategy.SCALE_UP

await client.alter_topic(
topic_path,
alter_auto_partitioning_settings=ydb.TopicAlterAutoPartitioningSettings(
set_strategy=ydb.TopicAutoPartitioningStrategy.SCALE_UP,
)
)

topic_after = await client.describe_topic(topic_path)

assert topic_after.auto_partitioning_settings == expected

expected.up_utilization_percent = 88

await client.alter_topic(
topic_path,
alter_auto_partitioning_settings=ydb.TopicAlterAutoPartitioningSettings(
set_up_utilization_percent=88,
)
)

topic_after = await client.describe_topic(topic_path)

assert topic_after.auto_partitioning_settings == expected


class TestTopicClientControlPlane:
def test_create_topic(self, driver_sync, database):
Expand Down
4 changes: 2 additions & 2 deletions ydb/_grpc/grpcwrapper/ydb_topic.py
Original file line number Diff line number Diff line change
Expand Up @@ -1017,7 +1017,7 @@ def from_proto(code: Optional[int]) -> Optional["AutoPartitioningStrategy"]:

def to_public(self) -> ydb_topic_public_types.PublicAutoPartitioningStrategy:
try:
ydb_topic_public_types.PublicAutoPartitioningStrategy(int(self))
return ydb_topic_public_types.PublicAutoPartitioningStrategy(int(self))
except KeyError:
return ydb_topic_public_types.PublicAutoPartitioningStrategy.UNSPECIFIED

Expand Down Expand Up @@ -1183,7 +1183,7 @@ def from_proto(code: Optional[int]) -> Optional["MeteringMode"]:

def to_public(self) -> ydb_topic_public_types.PublicMeteringMode:
try:
ydb_topic_public_types.PublicMeteringMode(int(self))
return ydb_topic_public_types.PublicMeteringMode(int(self))
except KeyError:
return ydb_topic_public_types.PublicMeteringMode.UNSPECIFIED

Expand Down
8 changes: 4 additions & 4 deletions ydb/_grpc/grpcwrapper/ydb_topic_public_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,18 +82,18 @@ class PublicAutoPartitioningStrategy(IntEnum):

@dataclass
class PublicAutoPartitioningSettings:
strategy: Optional["PublicAutoPartitioningStrategy"] = 0
strategy: Optional["PublicAutoPartitioningStrategy"] = None
stabilization_window: Optional[datetime.timedelta] = None
up_utilization_percent: Optional[int] = None
down_utilization_percent: Optional[int] = None
up_utilization_percent: Optional[int] = None


@dataclass
class PublicAlterAutoPartitioningSettings:
set_strategy: Optional["PublicAutoPartitioningStrategy"] = 0
set_strategy: Optional["PublicAutoPartitioningStrategy"] = None
set_stabilization_window: Optional[datetime.timedelta] = None
set_up_utilization_percent: Optional[int] = None
set_down_utilization_percent: Optional[int] = None
set_up_utilization_percent: Optional[int] = None


@dataclass
Expand Down
6 changes: 6 additions & 0 deletions ydb/_topic_reader/topic_reader_asyncio.py
Original file line number Diff line number Diff line change
Expand Up @@ -335,6 +335,8 @@ async def _start(self, stream: IGrpcWrapperAsyncIO, init_message: StreamReadMess
self._started = True
self._stream = stream

print(init_message)

stream.write(StreamReadMessage.FromClient(client_message=init_message))
init_response = await stream.receive() # type: StreamReadMessage.FromServer
if isinstance(init_response.server_message, StreamReadMessage.InitResponse):
Expand Down Expand Up @@ -586,6 +588,10 @@ def _on_end_partition_session(self, message: StreamReadMessage.EndPartitionSessi
f"End partition session with id: {message.partition_session_id}, child partitions: {message.child_partition_ids}"
)

print(
f"End partition session with id: {message.partition_session_id}, child partitions: {message.child_partition_ids}"
)

def _on_read_response(self, message: StreamReadMessage.ReadResponse):
self._buffer_consume_bytes(message.bytes_size)

Expand Down
3 changes: 3 additions & 0 deletions ydb/topic.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@
"TopicCodec",
"TopicConsumer",
"TopicAlterConsumer",
"TopicAlterAutoPartitioningSettings",
"TopicAutoPartitioningSettings",
"TopicAutoPartitioningStrategy",
"TopicDescription",
"TopicError",
"TopicMeteringMode",
Expand Down

0 comments on commit ffc0319

Please sign in to comment.