@@ -253,20 +253,15 @@ public void testKafkaSourceSinkWithTopicList() throws Exception {
253
253
tEnv .fromValues (values ).insertInto ("kafka" ).execute ().await ();
254
254
255
255
// ---------- Consume stream from Kafka -------------------
256
-
257
256
List <Row > results = collectAllRows (tEnv .sqlQuery ("SELECT * from kafka" ));
258
257
List <Row > topic1Results = collectAllRows (tEnv .sqlQuery ("SELECT * from topic1" ));
259
258
List <Row > topic2Results = collectAllRows (tEnv .sqlQuery ("SELECT * from topic2" ));
260
259
assertThat (results )
261
260
.containsExactlyInAnyOrder (
262
261
Row .of (topic1 , 1 , 1102 , "behavior 1" ),
263
262
Row .of (topic2 , 2 , 1103 , "behavior 2" ));
264
- assertThat (topic1Results )
265
- .containsExactly (
266
- Row .of (topic1 , 1 , 1102 , "behavior 1" ));
267
- assertThat (topic2Results )
268
- .containsExactly (
269
- Row .of (topic2 , 2 , 1103 , "behavior 2" ));
263
+ assertThat (topic1Results ).containsExactly (Row .of (topic1 , 1 , 1102 , "behavior 1" ));
264
+ assertThat (topic2Results ).containsExactly (Row .of (topic2 , 2 , 1103 , "behavior 2" ));
270
265
271
266
// ------------- cleanup -------------------
272
267
deleteTestTopic (topic1 );
@@ -286,28 +281,51 @@ public void testKafkaSourceSinkWithTopicPattern() throws Exception {
286
281
// ---------- Produce an event time stream into Kafka -------------------
287
282
String groupId = getStandardProps ().getProperty ("group.id" );
288
283
String bootstraps = getBootstrapServers ();
289
- final String createTable =
284
+ final String createTableTemplate =
285
+ "CREATE TABLE %s (\n "
286
+ + " `topic` STRING METADATA,\n "
287
+ + " `user_id` INT,\n "
288
+ + " `item_id` INT,\n "
289
+ + " `behavior` STRING\n "
290
+ + ") WITH (\n "
291
+ + " 'connector' = '%s',\n "
292
+ + " 'topic-pattern' = '%s',\n "
293
+ + " 'properties.bootstrap.servers' = '%s',\n "
294
+ + " 'properties.group.id' = '%s',\n "
295
+ + " 'scan.startup.mode' = 'earliest-offset',\n "
296
+ + " %s\n "
297
+ + ")\n " ;
298
+ final String createTopicPatternTable =
290
299
String .format (
291
- "CREATE TABLE kafka (\n "
292
- + " `topic` STRING METADATA,\n "
293
- + " `user_id` INT,\n "
294
- + " `item_id` INT,\n "
295
- + " `behavior` STRING\n "
296
- + ") WITH (\n "
297
- + " 'connector' = '%s',\n "
298
- + " 'topic-pattern' = '%s',\n "
299
- + " 'properties.bootstrap.servers' = '%s',\n "
300
- + " 'properties.group.id' = '%s',\n "
301
- + " 'scan.startup.mode' = 'earliest-offset',\n "
302
- + " %s\n "
303
- + ")\n " ,
300
+ createTableTemplate ,
301
+ "kafka" ,
304
302
KafkaDynamicTableFactory .IDENTIFIER ,
305
303
topicPattern ,
306
304
bootstraps ,
307
305
groupId ,
308
306
formatOptions ());
307
+ final String createTopic1Table =
308
+ String .format (
309
+ createTableTemplate ,
310
+ "topic1" ,
311
+ KafkaDynamicTableFactory .IDENTIFIER ,
312
+ topic1 ,
313
+ bootstraps ,
314
+ groupId ,
315
+ formatOptions ());
316
+ final String createTopic2Table =
317
+ String .format (
318
+ createTableTemplate ,
319
+ "topic2" ,
320
+ KafkaDynamicTableFactory .IDENTIFIER ,
321
+ topic2 ,
322
+ bootstraps ,
323
+ groupId ,
324
+ formatOptions ());
309
325
310
- tEnv .executeSql (createTable );
326
+ tEnv .executeSql (createTopicPatternTable );
327
+ tEnv .executeSql (createTopic1Table );
328
+ tEnv .executeSql (createTopic2Table );
311
329
312
330
List <Row > values =
313
331
Arrays .asList (
@@ -316,13 +334,15 @@ public void testKafkaSourceSinkWithTopicPattern() throws Exception {
316
334
tEnv .fromValues (values ).insertInto ("kafka" ).execute ().await ();
317
335
318
336
// ---------- Consume stream from Kafka -------------------
319
-
320
337
List <Row > results = collectAllRows (tEnv .sqlQuery ("SELECT * from kafka" ));
321
-
338
+ List <Row > topic1Results = collectAllRows (tEnv .sqlQuery ("SELECT * from topic1" ));
339
+ List <Row > topic2Results = collectAllRows (tEnv .sqlQuery ("SELECT * from topic2" ));
322
340
assertThat (results )
323
- .containsExactly (
341
+ .containsExactlyInAnyOrder (
324
342
Row .of (topic1 , 1 , 1102 , "behavior 1" ),
325
343
Row .of (topic2 , 2 , 1103 , "behavior 2" ));
344
+ assertThat (topic1Results ).containsExactly (Row .of (topic1 , 1 , 1102 , "behavior 1" ));
345
+ assertThat (topic2Results ).containsExactly (Row .of (topic2 , 2 , 1103 , "behavior 2" ));
326
346
327
347
// ------------- cleanup -------------------
328
348
0 commit comments