@@ -53,7 +53,8 @@ public class YdbTopicsIntegrationTest {
53
53
@ ClassRule
54
54
public final static GrpcTransportRule ydbTransport = new GrpcTransportRule ();
55
55
56
- private final static String TEST_TOPIC = "integration_test_topic" ;
56
+ private final static String TEST_TOPIC1 = "integration_test_topic" ;
57
+ private final static String TEST_TOPIC2 = "integration_test_other_topic" ;
57
58
private final static String TEST_CONSUMER1 = "consumer" ;
58
59
private final static String TEST_CONSUMER2 = "other_consumer" ;
59
60
@@ -69,10 +70,10 @@ public class YdbTopicsIntegrationTest {
69
70
70
71
@ BeforeClass
71
72
public static void initTopic () {
72
- logger .info ("Create test topic {} ..." , TEST_TOPIC );
73
+ logger .info ("Create test topic {} ..." , TEST_TOPIC1 );
73
74
74
75
client = TopicClient .newClient (ydbTransport ).build ();
75
- client .createTopic (TEST_TOPIC , CreateTopicSettings .newBuilder ()
76
+ client .createTopic (TEST_TOPIC1 , CreateTopicSettings .newBuilder ()
76
77
.addConsumer (Consumer .newBuilder ().setName (TEST_CONSUMER1 ).build ())
77
78
.addConsumer (Consumer .newBuilder ().setName (TEST_CONSUMER2 ).build ())
78
79
.build ()
@@ -81,16 +82,16 @@ public static void initTopic() {
81
82
82
83
@ AfterClass
83
84
public static void dropTopic () {
84
- logger .info ("Drop test topic {} ..." , TEST_TOPIC );
85
- Status dropStatus = client .dropTopic (TEST_TOPIC ).join ();
85
+ logger .info ("Drop test topic {} ..." , TEST_TOPIC1 );
86
+ Status dropStatus = client .dropTopic (TEST_TOPIC1 ).join ();
86
87
client .close ();
87
88
dropStatus .expectSuccess ("can't drop test topic" );
88
89
}
89
90
90
91
@ Test
91
92
public void step01_writeWithoutDeduplication () throws InterruptedException , ExecutionException , TimeoutException {
92
93
WriterSettings settings = WriterSettings .newBuilder ()
93
- .setTopicPath (TEST_TOPIC )
94
+ .setTopicPath (TEST_TOPIC1 )
94
95
.build ();
95
96
SyncWriter writer = client .createSyncWriter (settings );
96
97
writer .init ();
@@ -110,7 +111,7 @@ public void step01_writeWithoutDeduplication() throws InterruptedException, Exec
110
111
@ Test
111
112
public void step02_readHalfWithoutCommit () throws InterruptedException {
112
113
ReaderSettings settings = ReaderSettings .newBuilder ()
113
- .addTopic (TopicReadSettings .newBuilder ().setPath (TEST_TOPIC ).build ())
114
+ .addTopic (TopicReadSettings .newBuilder ().setPath (TEST_TOPIC1 ).build ())
114
115
.setConsumerName (TEST_CONSUMER1 )
115
116
.build ();
116
117
@@ -128,7 +129,7 @@ public void step02_readHalfWithoutCommit() throws InterruptedException {
128
129
@ Test
129
130
public void step03_readHalfWithCommit () throws InterruptedException {
130
131
ReaderSettings settings = ReaderSettings .newBuilder ()
131
- .addTopic (TopicReadSettings .newBuilder ().setPath (TEST_TOPIC ).build ())
132
+ .addTopic (TopicReadSettings .newBuilder ().setPath (TEST_TOPIC1 ).build ())
132
133
.setConsumerName (TEST_CONSUMER1 )
133
134
.build ();
134
135
@@ -147,7 +148,7 @@ public void step03_readHalfWithCommit() throws InterruptedException {
147
148
@ Test
148
149
public void step03_readNextHalfWithoutCommit () throws InterruptedException {
149
150
ReaderSettings settings = ReaderSettings .newBuilder ()
150
- .addTopic (TopicReadSettings .newBuilder ().setPath (TEST_TOPIC ).build ())
151
+ .addTopic (TopicReadSettings .newBuilder ().setPath (TEST_TOPIC1 ).build ())
151
152
.setConsumerName (TEST_CONSUMER1 )
152
153
.build ();
153
154
@@ -169,7 +170,7 @@ public void step03_readNextHalfWithoutCommit() throws InterruptedException {
169
170
@ Test
170
171
public void step04_readNextHalfWithCommit () throws InterruptedException {
171
172
ReaderSettings settings = ReaderSettings .newBuilder ()
172
- .addTopic (TopicReadSettings .newBuilder ().setPath (TEST_TOPIC ).build ())
173
+ .addTopic (TopicReadSettings .newBuilder ().setPath (TEST_TOPIC1 ).build ())
173
174
.setConsumerName (TEST_CONSUMER1 )
174
175
.build ();
175
176
@@ -193,7 +194,7 @@ public void step04_readNextHalfWithCommit() throws InterruptedException {
193
194
194
195
@ Test
195
196
public void step05_describeTopic () {
196
- TopicDescription description = client .describeTopic (TEST_TOPIC ).join ().getValue ();
197
+ TopicDescription description = client .describeTopic (TEST_TOPIC1 ).join ().getValue ();
197
198
198
199
Assert .assertNull (description .getTopicStats ());
199
200
List <Consumer > consumers = description .getConsumers ();
@@ -206,7 +207,7 @@ public void step05_describeTopic() {
206
207
@ Test
207
208
public void step06_readAllByAsyncReader () throws InterruptedException {
208
209
ReaderSettings settings = ReaderSettings .newBuilder ()
209
- .addTopic (TopicReadSettings .newBuilder ().setPath (TEST_TOPIC ).build ())
210
+ .addTopic (TopicReadSettings .newBuilder ().setPath (TEST_TOPIC1 ).build ())
210
211
.setConsumerName (TEST_CONSUMER2 )
211
212
.build ();
212
213
@@ -245,7 +246,7 @@ public void onMessages(DataReceivedEvent dre) {
245
246
246
247
@ Test
247
248
public void step07_alterTopicWithAutoPartitioning () {
248
- client .alterTopic (TEST_TOPIC , AlterTopicSettings .newBuilder ()
249
+ client .alterTopic (TEST_TOPIC1 , AlterTopicSettings .newBuilder ()
249
250
.setAlterPartitioningSettings (AlterPartitioningSettings .newBuilder ()
250
251
.setAutoPartitioningStrategy (AutoPartitioningStrategy .SCALE_UP )
251
252
.setMaxActivePartitions (10 )
@@ -257,7 +258,7 @@ public void step07_alterTopicWithAutoPartitioning() {
257
258
.build ())
258
259
.build ()).join ().expectSuccess ("can't alter the topic" );
259
260
260
- TopicDescription description = client .describeTopic (TEST_TOPIC ).join ().getValue ();
261
+ TopicDescription description = client .describeTopic (TEST_TOPIC1 ).join ().getValue ();
261
262
262
263
PartitioningSettings actualPartitioningSettings = description .getPartitioningSettings ();
263
264
PartitioningSettings expectedPartitioningSettings = PartitioningSettings .newBuilder ()
@@ -273,4 +274,28 @@ public void step07_alterTopicWithAutoPartitioning() {
273
274
274
275
Assert .assertEquals (expectedPartitioningSettings , actualPartitioningSettings );
275
276
}
277
+
278
+ @ Test
279
+ public void step08_createTopicWithAutoPartitioning () {
280
+ PartitioningSettings expectedPartitioningSettings = PartitioningSettings .newBuilder ()
281
+ .setMaxActivePartitions (8 )
282
+ .setMinActivePartitions (4 )
283
+ .setAutoPartitioningStrategy (AutoPartitioningStrategy .SCALE_UP )
284
+ .setWriteStrategySettings (AutoPartitioningWriteStrategySettings .newBuilder ()
285
+ .setDownUtilizationPercent (5 )
286
+ .setUpUtilizationPercent (75 )
287
+ .setStabilizationWindow (Duration .ofMinutes (2 ))
288
+ .build ())
289
+ .build ();
290
+
291
+ CompletableFuture <Status > secondaryTopicCreated = client .createTopic (TEST_TOPIC2 , CreateTopicSettings .newBuilder ()
292
+ .setPartitioningSettings (expectedPartitioningSettings )
293
+ .build ());
294
+
295
+ secondaryTopicCreated .join ().expectSuccess ("can't create the topic" );
296
+
297
+ TopicDescription description = client .describeTopic (TEST_TOPIC2 ).join ().getValue ();
298
+
299
+ Assert .assertEquals (expectedPartitioningSettings , description .getPartitioningSettings ());
300
+ }
276
301
}
0 commit comments